diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index d8ae39e4f..36178e543 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -28,6 +28,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/inputs/conntrack" _ "github.com/influxdata/telegraf/plugins/inputs/consul" + _ "github.com/influxdata/telegraf/plugins/inputs/consul_metrics" _ "github.com/influxdata/telegraf/plugins/inputs/couchbase" _ "github.com/influxdata/telegraf/plugins/inputs/couchdb" _ "github.com/influxdata/telegraf/plugins/inputs/cpu" diff --git a/plugins/inputs/consul_metrics/README.md b/plugins/inputs/consul_metrics/README.md new file mode 100644 index 000000000..bbdcd4ec7 --- /dev/null +++ b/plugins/inputs/consul_metrics/README.md @@ -0,0 +1,34 @@ +# Hashicorp Consul Metrics Input Plugin + +This plugin grabs metrics from a Consul agent. Telegraf may be present in every node and connect to the agent locally. In this case should be something like `http://127.0.0.1:8500`. + +> Tested on Consul 1.10.4 + +## Configuration + +```toml +[[inputs.consul]] + ## URL for the Consul agent + # url = "http://127.0.0.1:8500" + + ## Use auth token for authorization. + ## If both are set, an error is thrown. + ## If both are empty, no token will be used. + # token_file = "/path/to/auth/token" + ## OR + # token = "a1234567-40c7-9048-7bae-378687048181" + + ## Set response_timeout (default 5 seconds) + # timeout = "5s" + + ## Optional TLS Config + # tls_ca = /path/to/cafile + # tls_cert = /path/to/certfile + # tls_key = /path/to/keyfile +``` + +## Metrics + +Consul collects various metrics. For every details, please have a look at Consul following documentation: + +- [https://www.consul.io/api/agent#view-metrics](https://www.consul.io/api/agent#view-metrics) diff --git a/plugins/inputs/consul_metrics/consul_metrics.go b/plugins/inputs/consul_metrics/consul_metrics.go new file mode 100644 index 000000000..3a2dbce5c --- /dev/null +++ b/plugins/inputs/consul_metrics/consul_metrics.go @@ -0,0 +1,196 @@ +package consul_metrics + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "os" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Consul_metrics configuration object +type ConsulMetrics struct { + URL string `toml:"url"` + + TokenFile string `toml:"token_file"` + Token string `toml:"token"` + + ResponseTimeout config.Duration `toml:"timeout"` + + tls.ClientConfig + + roundTripper http.RoundTripper +} + +const timeLayout = "2006-01-02 15:04:05 -0700 MST" + +const sampleConfig = ` + ## URL for the Consul agent + # url = "http://127.0.0.1:8500" + + ## Use auth token for authorization. + ## Only one of the options can be set. Leave empty to not use any token. + # token_file = "/path/to/auth/token" + ## OR + # token = "a1234567-40c7-9048-7bae-378687048181" + + ## Set timeout (default 5 seconds) + # timeout = "5s" + + ## Optional TLS Config + # tls_ca = /path/to/cafile + # tls_cert = /path/to/certfile + # tls_key = /path/to/keyfile +` + +func init() { + inputs.Add("consul_metrics", func() telegraf.Input { + return &ConsulMetrics{ + ResponseTimeout: config.Duration(5 * time.Second), + } + }) +} + +// SampleConfig returns a sample config +func (n *ConsulMetrics) SampleConfig() string { + return sampleConfig +} + +// Description returns a description of the plugin +func (n *ConsulMetrics) Description() string { + return "Read metrics from the Consul API" +} + +func (n *ConsulMetrics) Init() error { + if n.URL == "" { + n.URL = "http://127.0.0.1:8500" + } + + if n.TokenFile != "" && n.Token != "" { + return errors.New("config error: both token_file and token are set") + } + + if n.TokenFile != "" { + token, err := os.ReadFile(n.TokenFile) + if err != nil { + return fmt.Errorf("reading file failed: %v", err) + } + n.Token = strings.TrimSpace(string(token)) + } + + tlsCfg, err := n.ClientConfig.TLSConfig() + if err != nil { + return fmt.Errorf("setting up TLS configuration failed: %v", err) + } + + n.roundTripper = &http.Transport{ + TLSHandshakeTimeout: time.Duration(n.ResponseTimeout), + TLSClientConfig: tlsCfg, + ResponseHeaderTimeout: time.Duration(n.ResponseTimeout), + } + + return nil +} + +// Gather, collects metrics from Consul endpoint +func (n *ConsulMetrics) Gather(acc telegraf.Accumulator) error { + summaryMetrics, err := n.loadJSON(n.URL + "/v1/agent/metrics") + if err != nil { + return err + } + + return buildConsulMetrics(acc, summaryMetrics) +} + +func (n *ConsulMetrics) loadJSON(url string) (*MetricsInfo, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + req.Header.Set("Authorization", "X-Consul-Token "+n.Token) + req.Header.Add("Accept", "application/json") + + resp, err := n.roundTripper.RoundTrip(req) + if err != nil { + return nil, fmt.Errorf("error making HTTP request to %s: %s", url, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("%s returned HTTP status %s", url, resp.Status) + } + + var metrics MetricsInfo + err = json.NewDecoder(resp.Body).Decode(&metrics) + if err != nil { + return nil, fmt.Errorf("error parsing json response: %s", err) + } + + return &metrics, nil +} + +// buildConsulMetrics, it builds all the metrics and adds them to the accumulator) +func buildConsulMetrics(acc telegraf.Accumulator, metricsInfo *MetricsInfo) error { + t, err := time.Parse(timeLayout, metricsInfo.Timestamp) + if err != nil { + return fmt.Errorf("error parsing time: %s", err) + } + + for _, counters := range metricsInfo.Counters { + fields := map[string]interface{}{ + "count": counters.Count, + "sum": counters.Sum, + "max": counters.Max, + "mean": counters.Mean, + "min": counters.Min, + "rate": counters.Rate, + "stddev": counters.Stddev, + } + tags := counters.Labels + + acc.AddCounter(counters.Name, fields, tags, t) + } + + for _, gauges := range metricsInfo.Gauges { + fields := map[string]interface{}{ + "value": gauges.Value, + } + tags := gauges.Labels + + acc.AddGauge(gauges.Name, fields, tags, t) + } + + for _, points := range metricsInfo.Points { + fields := map[string]interface{}{ + "value": points.Points, + } + tags := make(map[string]string) + + acc.AddFields(points.Name, fields, tags, t) + } + + for _, samples := range metricsInfo.Samples { + fields := map[string]interface{}{ + "count": samples.Count, + "sum": samples.Sum, + "max": samples.Max, + "mean": samples.Mean, + "min": samples.Min, + "rate": samples.Rate, + "stddev": samples.Stddev, + } + tags := samples.Labels + + acc.AddCounter(samples.Name, fields, tags, t) + } + + return nil +} diff --git a/plugins/inputs/consul_metrics/consul_metrics_test.go b/plugins/inputs/consul_metrics/consul_metrics_test.go new file mode 100644 index 000000000..417bf52d1 --- /dev/null +++ b/plugins/inputs/consul_metrics/consul_metrics_test.go @@ -0,0 +1,97 @@ +package consul_metrics + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConsulStats(t *testing.T) { + var applyTests = []struct { + name string + expected []telegraf.Metric + }{ + { + name: "Metrics", + expected: []telegraf.Metric{ + testutil.MustMetric( + "consul.rpc.request", + map[string]string{}, + map[string]interface{}{ + "count": int(5), + "max": float64(1), + "mean": float64(1), + "min": float64(1), + "rate": float64(0.5), + "stddev": float64(0), + "sum": float64(5), + }, + time.Unix(1639218930, 0), + 1, + ), + testutil.MustMetric( + "consul.consul.members.clients", + map[string]string{ + "datacenter": "dc1", + }, + map[string]interface{}{ + "value": float64(0), + }, + time.Unix(1639218930, 0), + 2, + ), + testutil.MustMetric( + "consul.api.http", + map[string]string{ + "method": "GET", + "path": "v1_agent_self", + }, + map[string]interface{}{ + "count": int(1), + "max": float64(4.14815616607666), + "mean": float64(4.14815616607666), + "min": float64(4.14815616607666), + "rate": float64(0.414815616607666), + "stddev": float64(0), + "sum": float64(4.14815616607666), + }, + time.Unix(1639218930, 0), + 1, + ), + }, + }, + } + + for _, tt := range applyTests { + t.Run(tt.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.RequestURI == "/v1/agent/metrics" { + w.WriteHeader(http.StatusOK) + responseKeyMetrics, _ := ioutil.ReadFile("testdata/response_key_metrics.json") + _, err := fmt.Fprintln(w, string(responseKeyMetrics)) + require.NoError(t, err) + } + })) + defer ts.Close() + + plugin := &ConsulMetrics{ + URL: ts.URL, + } + err := plugin.Init() + require.NoError(t, err) + + acc := testutil.Accumulator{} + err = plugin.Gather(&acc) + require.NoError(t, err) + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics()) + }) + } +} diff --git a/plugins/inputs/consul_metrics/consul_structs.go b/plugins/inputs/consul_metrics/consul_structs.go new file mode 100644 index 000000000..c4585329b --- /dev/null +++ b/plugins/inputs/consul_metrics/consul_structs.go @@ -0,0 +1,32 @@ +package consul_metrics + +type MetricsInfo struct { + Timestamp string + Gauges []GaugeValue + Points []PointValue + Counters []SampledValue + Samples []SampledValue +} + +type GaugeValue struct { + Name string + Value float32 + Labels map[string]string +} + +type PointValue struct { + Name string + Points []float32 +} + +type SampledValue struct { + Name string + Count int + Sum float64 + Min float64 + Max float64 + Mean float64 + Rate float64 + Stddev float64 + Labels map[string]string +} diff --git a/plugins/inputs/consul_metrics/testdata/response_key_metrics.json b/plugins/inputs/consul_metrics/testdata/response_key_metrics.json new file mode 100644 index 000000000..0234d17f4 --- /dev/null +++ b/plugins/inputs/consul_metrics/testdata/response_key_metrics.json @@ -0,0 +1,42 @@ +{ + "Timestamp": "2021-12-11 10:35:30 +0000 UTC", + "Gauges": [ + { + "Name": "consul.consul.members.clients", + "Value": 0, + "Labels": { + "datacenter": "dc1" + } + } + ], + "Points": [], + "Counters": [ + { + "Name": "consul.rpc.request", + "Count": 5, + "Rate": 0.5, + "Sum": 5, + "Min": 1, + "Max": 1, + "Mean": 1, + "Stddev": 0, + "Labels": {} + } + ], + "Samples": [ + { + "Name": "consul.api.http", + "Count": 1, + "Rate": 0.414815616607666, + "Sum": 4.14815616607666, + "Min": 4.14815616607666, + "Max": 4.14815616607666, + "Mean": 4.14815616607666, + "Stddev": 0, + "Labels": { + "method": "GET", + "path": "v1_agent_self" + } + } + ] + }