For Prometheus Input add ability to query Consul Service catalog (#5464)

This commit is contained in:
Martin Molnar 2021-07-27 23:23:01 +02:00 committed by GitHub
parent f241f91112
commit 1a42c7d289
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 280 additions and 11 deletions

View File

@ -59,6 +59,19 @@ in Prometheus format.
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
## Scrape Services available in Consul Catalog
# [inputs.prometheus.consul]
# enabled = true
# agent = "http://localhost:8500"
# query_interval = "5m"
# [[inputs.prometheus.consul.query]]
# name = "a service name"
# tag = "a service tag"
# url = 'http://{{if ne .ServiceAddress ""}}{{.ServiceAddress}}{{else}}{{.Address}}{{end}}:{{.ServicePort}}/{{with .ServiceMeta.metrics_path}}{{.}}{{else}}metrics{{end}}'
# [inputs.prometheus.consul.query.tags]
# host = "{{.Node}}"
## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
## OR
@ -117,6 +130,26 @@ env:
If using node level scrape scope, `pod_scrape_interval` specifies how often (in seconds) the pod list for scraping should updated. If not specified, the default is 60 seconds.
#### Consul Service Discovery
Enabling this option and configuring consul `agent` url will allow the plugin to query
consul catalog for available services. Using `query_interval` the plugin will periodically
query the consul catalog for services with `name` and `tag` and refresh the list of scraped urls.
It can use the information from the catalog to build the scraped url and additional tags from a template.
Multiple consul queries can be configured, each for different service.
The following example fields can be used in url or tag templates:
* Node
* Address
* NodeMeta
* ServicePort
* ServiceAddress
* ServiceTags
* ServiceMeta
For full list of available fields and their type see struct CatalogService in
https://github.com/hashicorp/consul/blob/master/api/catalog.go
#### Bearer Token
If set, the file specified by the `bearer_token` parameter will be read on

View File

@ -0,0 +1,208 @@
package prometheus
import (
"bytes"
"context"
"fmt"
"net/url"
"strings"
"text/template"
"time"
"github.com/hashicorp/consul/api"
"github.com/influxdata/telegraf/config"
)
type ConsulConfig struct {
// Address of the Consul agent. The address must contain a hostname or an IP address
// and optionally a port (format: "host:port").
Enabled bool `toml:"enabled"`
Agent string `toml:"agent"`
QueryInterval config.Duration `toml:"query_interval"`
Queries []*ConsulQuery `toml:"query"`
}
// One Consul service discovery query
type ConsulQuery struct {
// A name of the searched services (not ID)
ServiceName string `toml:"name"`
// A tag of the searched services
ServiceTag string `toml:"tag"`
// A DC of the searched services
ServiceDc string `toml:"dc"`
// A template URL of the Prometheus gathering interface. The hostname part
// of the URL will be replaced by discovered address and port.
ServiceURL string `toml:"url"`
// Extra tags to add to metrics found in Consul
ServiceExtraTags map[string]string `toml:"tags"`
serviceURLTemplate *template.Template
serviceExtraTagsTemplate map[string]*template.Template
// Store last error status and change log level depending on repeated occurence
lastQueryFailed bool
}
func (p *Prometheus) startConsul(ctx context.Context) error {
consulAPIConfig := api.DefaultConfig()
if p.ConsulConfig.Agent != "" {
consulAPIConfig.Address = p.ConsulConfig.Agent
}
consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return fmt.Errorf("cannot connect to the Consul agent: %v", err)
}
// Parse the template for metrics URL, drop queries with template parse errors
i := 0
for _, q := range p.ConsulConfig.Queries {
serviceURLTemplate, err := template.New("URL").Parse(q.ServiceURL)
if err != nil {
p.Log.Errorf("Could not parse the Consul query URL template (%s), skipping it. Error: %s", q.ServiceURL, err)
continue
}
q.serviceURLTemplate = serviceURLTemplate
// Allow to use join function in tags
templateFunctions := template.FuncMap{"join": strings.Join}
// Parse the tag value templates
q.serviceExtraTagsTemplate = make(map[string]*template.Template)
for tagName, tagTemplateString := range q.ServiceExtraTags {
tagTemplate, err := template.New(tagName).Funcs(templateFunctions).Parse(tagTemplateString)
if err != nil {
p.Log.Errorf("Could not parse the Consul query Extra Tag template (%s), skipping it. Error: %s", tagTemplateString, err)
continue
}
q.serviceExtraTagsTemplate[tagName] = tagTemplate
}
p.ConsulConfig.Queries[i] = q
i++
}
// Prevent memory leak by erasing truncated values
for j := i; j < len(p.ConsulConfig.Queries); j++ {
p.ConsulConfig.Queries[j] = nil
}
p.ConsulConfig.Queries = p.ConsulConfig.Queries[:i]
catalog := consul.Catalog()
p.wg.Add(1)
go func() {
// Store last error status and change log level depending on repeated occurence
var refreshFailed = false
defer p.wg.Done()
err := p.refreshConsulServices(catalog)
if err != nil {
refreshFailed = true
p.Log.Errorf("Unable to refreh Consul services: %v", err)
}
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(p.ConsulConfig.QueryInterval)):
err := p.refreshConsulServices(catalog)
if err != nil {
message := fmt.Sprintf("Unable to refreh Consul services: %v", err)
if refreshFailed {
p.Log.Debug(message)
} else {
p.Log.Warn(message)
}
refreshFailed = true
} else if refreshFailed {
refreshFailed = false
p.Log.Info("Successfully refreshed Consul services after previous errors")
}
}
}
}()
return nil
}
func (p *Prometheus) refreshConsulServices(c *api.Catalog) error {
consulServiceURLs := make(map[string]URLAndAddress)
p.Log.Debugf("Refreshing Consul services")
for _, q := range p.ConsulConfig.Queries {
queryOptions := api.QueryOptions{}
if q.ServiceDc != "" {
queryOptions.Datacenter = q.ServiceDc
}
// Request services from Consul
consulServices, _, err := c.Service(q.ServiceName, q.ServiceTag, &queryOptions)
if err != nil {
return err
}
if len(consulServices) == 0 {
p.Log.Debugf("Queried Consul for Service (%s, %s) but did not find any instances", q.ServiceName, q.ServiceTag)
continue
}
p.Log.Debugf("Queried Consul for Service (%s, %s) and found %d instances", q.ServiceName, q.ServiceTag, len(consulServices))
for _, consulService := range consulServices {
uaa, err := p.getConsulServiceURL(q, consulService)
if err != nil {
message := fmt.Sprintf("Unable to get scrape URLs from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, err)
if q.lastQueryFailed {
p.Log.Debug(message)
} else {
p.Log.Warn(message)
}
q.lastQueryFailed = true
break
}
if q.lastQueryFailed {
p.Log.Infof("Created scrape URLs from Consul for Service (%s, %s)", q.ServiceName, q.ServiceTag)
}
q.lastQueryFailed = false
p.Log.Debugf("Adding scrape URL from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, uaa.URL.String())
consulServiceURLs[uaa.URL.String()] = *uaa
}
}
p.lock.Lock()
p.consulServices = consulServiceURLs
p.lock.Unlock()
return nil
}
func (p *Prometheus) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService) (*URLAndAddress, error) {
var buffer bytes.Buffer
buffer.Reset()
err := q.serviceURLTemplate.Execute(&buffer, s)
if err != nil {
return nil, err
}
serviceURL, err := url.Parse(buffer.String())
if err != nil {
return nil, err
}
extraTags := make(map[string]string)
for tagName, tagTemplate := range q.serviceExtraTagsTemplate {
buffer.Reset()
err = tagTemplate.Execute(&buffer, s)
if err != nil {
return nil, err
}
extraTags[tagName] = buffer.String()
}
p.Log.Debugf("Will scrape metrics from Consul Service %s", serviceURL.String())
return &URLAndAddress{
URL: serviceURL,
OriginalURL: serviceURL,
Tags: extraTags,
}, nil
}

View File

@ -12,7 +12,6 @@ import (
"net/url"
"os/user"
"path/filepath"
"sync"
"time"
"github.com/ghodss/yaml"
@ -55,7 +54,7 @@ func loadClient(kubeconfigPath string) (*kubernetes.Clientset, error) {
return kubernetes.NewForConfig(&config)
}
func (p *Prometheus) start(ctx context.Context) error {
func (p *Prometheus) startK8s(ctx context.Context) error {
config, err := rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to get InClusterConfig - %v", err)
@ -77,8 +76,6 @@ func (p *Prometheus) start(ctx context.Context) error {
}
}
p.wg = sync.WaitGroup{}
p.wg.Add(1)
go func() {
defer p.wg.Done()

View File

@ -41,6 +41,9 @@ type Prometheus struct {
// Field Selector/s for Kubernetes
KubernetesFieldSelector string `toml:"kubernetes_field_selector"`
// Consul SD configuration
ConsulConfig ConsulConfig `toml:"consul"`
// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`
@ -77,6 +80,9 @@ type Prometheus struct {
podLabelSelector labels.Selector
podFieldSelector fields.Selector
isNodeScrapeScope bool
// List of consul services to scrape
consulServices map[string]URLAndAddress
}
var sampleConfig = `
@ -127,6 +133,19 @@ var sampleConfig = `
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
## Scrape Services available in Consul Catalog
# [inputs.prometheus.consul]
# enabled = true
# agent = "http://localhost:8500"
# query_interval = "5m"
# [[inputs.prometheus.consul.query]]
# name = "a service name"
# tag = "a service tag"
# url = 'http://{{if ne .ServiceAddress ""}}{{.ServiceAddress}}{{else}}{{.Address}}{{end}}:{{.ServicePort}}/{{with .ServiceMeta.metrics_path}}{{.}}{{else}}metrics{{end}}'
# [inputs.prometheus.consul.query.tags]
# host = "{{.Node}}"
## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
## OR
@ -238,6 +257,10 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
p.lock.Lock()
defer p.lock.Unlock()
// add all services collected from consul
for k, v := range p.consulServices {
allURLs[k] = v
}
// loop through all pods scraped via the prometheus annotation on the pods
for k, v := range p.kubernetesPods {
allURLs[k] = v
@ -463,20 +486,27 @@ func fieldSelectorIsSupported(fieldSelector fields.Selector) (bool, string) {
return true, ""
}
// Start will start the Kubernetes scraping if enabled in the configuration
// Start will start the Kubernetes and/or Consul scraping if enabled in the configuration
func (p *Prometheus) Start(_ telegraf.Accumulator) error {
var ctx context.Context
p.wg = sync.WaitGroup{}
ctx, p.cancel = context.WithCancel(context.Background())
if p.ConsulConfig.Enabled && len(p.ConsulConfig.Queries) > 0 {
if err := p.startConsul(ctx); err != nil {
return err
}
}
if p.MonitorPods {
var ctx context.Context
ctx, p.cancel = context.WithCancel(context.Background())
return p.start(ctx)
if err := p.startK8s(ctx); err != nil {
return err
}
}
return nil
}
func (p *Prometheus) Stop() {
if p.MonitorPods {
p.cancel()
}
p.cancel()
p.wg.Wait()
}
@ -485,6 +515,7 @@ func init() {
return &Prometheus{
ResponseTimeout: config.Duration(time.Second * 3),
kubernetesPods: map[string]URLAndAddress{},
consulServices: map[string]URLAndAddress{},
URLTag: "url",
}
})