From de66a2f9aa29fe7e1691a406dd860ed0c5e70cfc Mon Sep 17 00:00:00 2001 From: tguenneguez <91618382+tguenneguez@users.noreply.github.com> Date: Mon, 12 Feb 2024 21:55:02 +0100 Subject: [PATCH] feat(inputs.prometheus): Add internal metrics (#14424) --- plugins/inputs/prometheus/README.md | 13 ++ plugins/inputs/prometheus/prometheus.go | 54 +++++-- plugins/inputs/prometheus/prometheus_test.go | 150 ++++++++++++++++-- plugins/inputs/prometheus/sample.conf | 3 + .../prometheus_client_v1_test.go | 7 +- .../prometheus_client_v2_test.go | 5 +- 6 files changed, 203 insertions(+), 29 deletions(-) diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index d77ea8cad..ef7eb8690 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -166,6 +166,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## enable TLS only if any of the other options are specified. # tls_enable = true + ## This option allows you to report the status of prometheus requests. + # enable_request_metrics = false + ## Control pod scraping based on pod namespace annotations ## Pass and drop here act like tagpass and tagdrop, but instead ## of filtering metrics they filters pod candidates for scraping @@ -356,6 +359,14 @@ All metrics receive the `url` tag indicating the related URL specified in the Telegraf configuration. If using Kubernetes service discovery the `address` tag is also added indicating the discovered ip address. +* prometheus_request + * tags: + * url + * address + * fields: + * response_time (float, seconds) + * content_length (int, response body length) + ## Example Output ### Source @@ -390,6 +401,7 @@ cpu_usage_user,cpu=cpu0,url=http://example.org:9273/metrics gauge=1.513622603430 cpu_usage_user,cpu=cpu1,url=http://example.org:9273/metrics gauge=5.829145728641773 1505776751000000000 cpu_usage_user,cpu=cpu2,url=http://example.org:9273/metrics gauge=2.119071644805144 1505776751000000000 cpu_usage_user,cpu=cpu3,url=http://example.org:9273/metrics gauge=1.5228426395944945 1505776751000000000 +prometheus_request,result=success,url=http://example.org:9273/metrics content_length=179013i,http_response_code=200i,response_time=0.051521601 1505776751000000000 ``` ### Output (when metric_version = 2) @@ -406,6 +418,7 @@ prometheus,cpu=cpu0,url=http://example.org:9273/metrics cpu_usage_user=1.5136226 prometheus,cpu=cpu1,url=http://example.org:9273/metrics cpu_usage_user=5.829145728641773 1505776751000000000 prometheus,cpu=cpu2,url=http://example.org:9273/metrics cpu_usage_user=2.119071644805144 1505776751000000000 prometheus,cpu=cpu3,url=http://example.org:9273/metrics cpu_usage_user=1.5228426395944945 1505776751000000000 +prometheus_request,result=success,url=http://example.org:9273/metrics content_length=179013i,http_response_code=200i,response_time=0.051521601 1505776751000000000 ``` ### Output with timestamp included diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 5e2186d81..6775b7585 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -75,8 +75,9 @@ type Prometheus struct { HTTPHeaders map[string]string `toml:"http_headers"` - ResponseTimeout config.Duration `toml:"response_timeout" deprecated:"1.26.0;use 'timeout' instead"` - ContentLengthLimit config.Size `toml:"content_length_limit"` + ResponseTimeout config.Duration `toml:"response_timeout" deprecated:"1.26.0;use 'timeout' instead"` + ContentLengthLimit config.Size `toml:"content_length_limit"` + EnableRequestMetrics bool `toml:"enable_request_metrics"` MetricVersion int `toml:"metric_version"` @@ -351,7 +352,13 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(serviceURL URLAndAddress) { defer wg.Done() - acc.AddError(p.gatherURL(serviceURL, acc)) + requestFields, tags, err := p.gatherURL(serviceURL, acc) + acc.AddError(err) + + // Add metrics + if p.EnableRequestMetrics { + acc.AddFields("prometheus_request", requestFields, tags) + } }(URL) } @@ -360,9 +367,21 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error { return nil } -func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error { +func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[string]interface{}, map[string]string, error) { var req *http.Request var uClient *http.Client + requestFields := make(map[string]interface{}) + tags := map[string]string{} + if p.URLTag != "" { + tags[p.URLTag] = u.OriginalURL.String() + } + if u.Address != "" { + tags["address"] = u.Address + } + for k, v := range u.Tags { + tags[k] = v + } + if u.URL.Scheme == "unix" { path := u.URL.Query().Get("path") if path == "" { @@ -373,7 +392,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error addr := "http://localhost" + path req, err = http.NewRequest("GET", addr, nil) if err != nil { - return fmt.Errorf("unable to create new request %q: %w", addr, err) + return nil, nil, fmt.Errorf("unable to create new request %q: %w", addr, err) } // ignore error because it's been handled before getting here @@ -398,7 +417,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error var err error req, err = http.NewRequest("GET", u.URL.String(), nil) if err != nil { - return fmt.Errorf("unable to create new request %q: %w", u.URL.String(), err) + return nil, nil, fmt.Errorf("unable to create new request %q: %w", u.URL.String(), err) } } @@ -407,7 +426,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error if p.BearerToken != "" { token, err := os.ReadFile(p.BearerToken) if err != nil { - return err + return nil, nil, err } req.Header.Set("Authorization", "Bearer "+string(token)) } else if p.BearerTokenString != "" { @@ -422,20 +441,26 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error var err error var resp *http.Response + var start time.Time if u.URL.Scheme != "unix" { + start = time.Now() //nolint:bodyclose // False positive (because of if-else) - body will be closed in `defer` resp, err = p.client.Do(req) } else { + start = time.Now() //nolint:bodyclose // False positive (because of if-else) - body will be closed in `defer` resp, err = uClient.Do(req) } + end := time.Since(start).Seconds() if err != nil { - return fmt.Errorf("error making HTTP request to %q: %w", u.URL, err) + return requestFields, tags, fmt.Errorf("error making HTTP request to %q: %w", u.URL, err) } + requestFields["response_time"] = end + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return fmt.Errorf("%q returned HTTP status %q", u.URL, resp.Status) + return requestFields, tags, fmt.Errorf("%q returned HTTP status %q", u.URL, resp.Status) } var body []byte @@ -449,19 +474,20 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error body, err = io.ReadAll(lr) if err != nil { - return fmt.Errorf("error reading body: %w", err) + return requestFields, tags, fmt.Errorf("error reading body: %w", err) } if int64(len(body)) > limit { p.Log.Infof("skipping %s: content length exceeded maximum body size (%d)", u.URL, limit) - return nil + return requestFields, tags, nil } } else { body, err = io.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("error reading body: %w", err) + return requestFields, tags, fmt.Errorf("error reading body: %w", err) } } + requestFields["content_length"] = len(body) // Parse the metrics metricParser := parser.Parser{ Header: resp.Header, @@ -470,7 +496,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error } metrics, err := metricParser.Parse(body) if err != nil { - return fmt.Errorf("error reading metrics for %q: %w", u.URL, err) + return requestFields, tags, fmt.Errorf("error reading metrics for %q: %w", u.URL, err) } for _, metric := range metrics { @@ -501,7 +527,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error } } - return nil + return requestFields, tags, nil } func (p *Prometheus) addHeaders(req *http.Request) { diff --git a/plugins/inputs/prometheus/prometheus_test.go b/plugins/inputs/prometheus/prometheus_test.go index 0ecfa621a..fe6bc56e7 100644 --- a/plugins/inputs/prometheus/prometheus_test.go +++ b/plugins/inputs/prometheus/prometheus_test.go @@ -75,7 +75,7 @@ func TestPrometheusGeneratesMetrics(t *testing.T) { require.True(t, acc.HasFloatField("test_metric", "value")) require.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0))) require.False(t, acc.HasTag("test_metric", "address")) - require.Equal(t, acc.TagValue("test_metric", "url"), ts.URL+"/metrics") + require.Equal(t, ts.URL+"/metrics", acc.TagValue("test_metric", "url")) } func TestPrometheusCustomHeader(t *testing.T) { @@ -163,8 +163,8 @@ func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) { require.True(t, acc.HasFloatField("go_goroutines", "gauge")) require.True(t, acc.HasFloatField("test_metric", "value")) require.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0))) - require.Equal(t, acc.TagValue("test_metric", "address"), tsAddress) - require.Equal(t, acc.TagValue("test_metric", "url"), ts.URL) + require.Equal(t, tsAddress, acc.TagValue("test_metric", "address")) + require.Equal(t, ts.URL, acc.TagValue("test_metric", "url")) } func TestPrometheusWithTimestamp(t *testing.T) { @@ -199,7 +199,7 @@ test_counter{label="test"} 1 1685443805885` var acc testutil.Accumulator require.NoError(t, acc.GatherError(p.Gather)) - testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) + testutil.RequireMetricsSubset(t, expected, acc.GetTelegrafMetrics()) } func TestPrometheusGeneratesMetricsAlthoughFirstDNSFailsIntegration(t *testing.T) { @@ -404,10 +404,11 @@ go_gc_duration_seconds_count 42` defer ts.Close() p := &Prometheus{ - Log: &testutil.Logger{}, - URLs: []string{ts.URL}, - URLTag: "", - MetricVersion: 2, + Log: &testutil.Logger{}, + URLs: []string{ts.URL}, + URLTag: "", + MetricVersion: 2, + EnableRequestMetrics: true, } err := p.Init() require.NoError(t, err) @@ -444,16 +445,24 @@ go_gc_duration_seconds_count 42` "prometheus", map[string]string{}, map[string]interface{}{ - "go_gc_duration_seconds_sum": 42.0, - "go_gc_duration_seconds_count": 42.0, - }, + "go_gc_duration_seconds_sum": float64(42.0), + "go_gc_duration_seconds_count": float64(42)}, time.Unix(0, 0), telegraf.Summary, ), + testutil.MustMetric( + "prometheus_request", + map[string]string{}, + map[string]interface{}{ + "content_length": int64(1), + "response_time": float64(0)}, + time.Unix(0, 0), + telegraf.Untyped, + ), } testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), - testutil.IgnoreTime(), testutil.SortMetrics()) + testutil.IgnoreTime(), testutil.SortMetrics(), testutil.IgnoreFields("content_length", "response_time")) } func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) { @@ -577,3 +586,120 @@ func TestInitConfigSelectors(t *testing.T) { require.NotNil(t, p.podLabelSelector) require.NotNil(t, p.podFieldSelector) } + +func TestPrometheusInternalOk(t *testing.T) { + prommetric := `# HELP test_counter A sample test counter. +# TYPE test_counter counter +test_counter{label="test"} 1 1685443805885` + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := fmt.Fprintln(w, prommetric) + require.NoError(t, err) + })) + defer ts.Close() + + p := &Prometheus{ + Log: testutil.Logger{}, + KubernetesServices: []string{ts.URL}, + EnableRequestMetrics: true, + } + require.NoError(t, p.Init()) + + u, err := url.Parse(ts.URL) + require.NoError(t, err) + tsAddress := u.Hostname() + + expected := []telegraf.Metric{ + metric.New( + "prometheus_request", + map[string]string{ + "address": tsAddress}, + map[string]interface{}{ + "content_length": int64(1), + "response_time": float64(0)}, + time.UnixMilli(0), + telegraf.Untyped, + ), + } + + var acc testutil.Accumulator + testutil.PrintMetrics(acc.GetTelegrafMetrics()) + + require.NoError(t, acc.GatherError(p.Gather)) + testutil.RequireMetricsSubset(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreFields("content_length", "response_time"), testutil.IgnoreTime()) +} + +func TestPrometheusInternalContentBadFormat(t *testing.T) { + prommetric := `# HELP test_counter A sample test counter. +# TYPE test_counter counter +Flag test` + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := fmt.Fprintln(w, prommetric) + require.NoError(t, err) + })) + defer ts.Close() + + p := &Prometheus{ + Log: testutil.Logger{}, + KubernetesServices: []string{ts.URL}, + EnableRequestMetrics: true, + } + require.NoError(t, p.Init()) + + u, err := url.Parse(ts.URL) + require.NoError(t, err) + tsAddress := u.Hostname() + + expected := []telegraf.Metric{ + metric.New( + "prometheus_request", + map[string]string{ + "address": tsAddress}, + map[string]interface{}{ + "content_length": int64(94), + "response_time": float64(0)}, + time.UnixMilli(0), + telegraf.Untyped, + ), + } + + var acc testutil.Accumulator + require.Error(t, acc.GatherError(p.Gather)) + testutil.RequireMetricsSubset(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreFields("content_length", "response_time"), testutil.IgnoreTime()) +} + +func TestPrometheusInternalNoWeb(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(404) + })) + defer ts.Close() + + p := &Prometheus{ + Log: testutil.Logger{}, + KubernetesServices: []string{ts.URL}, + EnableRequestMetrics: true, + } + require.NoError(t, p.Init()) + + u, err := url.Parse(ts.URL) + require.NoError(t, err) + tsAddress := u.Hostname() + + expected := []telegraf.Metric{ + metric.New( + "prometheus_request", + map[string]string{ + "address": tsAddress}, + map[string]interface{}{ + "content_length": int64(94), + "response_time": float64(0)}, + time.UnixMilli(0), + telegraf.Untyped, + ), + } + + var acc testutil.Accumulator + testutil.PrintMetrics(acc.GetTelegrafMetrics()) + + require.Error(t, acc.GatherError(p.Gather)) + testutil.RequireMetricsSubset(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreFields("content_length", "response_time"), testutil.IgnoreTime()) +} diff --git a/plugins/inputs/prometheus/sample.conf b/plugins/inputs/prometheus/sample.conf index 51d853b28..3548dee95 100644 --- a/plugins/inputs/prometheus/sample.conf +++ b/plugins/inputs/prometheus/sample.conf @@ -149,6 +149,9 @@ ## enable TLS only if any of the other options are specified. # tls_enable = true + ## This option allows you to report the status of prometheus requests. + # enable_request_metrics = false + ## Control pod scraping based on pod namespace annotations ## Pass and drop here act like tagpass and tagdrop, but instead ## of filtering metrics they filters pod candidates for scraping diff --git a/plugins/outputs/prometheus_client/prometheus_client_v1_test.go b/plugins/outputs/prometheus_client/prometheus_client_v1_test.go index 43923a0bc..08555ed1d 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_v1_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_v1_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "regexp" "strings" "testing" "time" @@ -356,6 +357,8 @@ cpu_time_idle{host="example.org"} 42 func TestRoundTripMetricVersion1(t *testing.T) { logger := testutil.Logger{Name: "outputs.prometheus_client"} + regxPattern := regexp.MustCompile(`.*prometheus_request_.*`) + tests := []struct { name string data []byte @@ -483,10 +486,10 @@ rpc_duration_seconds_count 2693 actual, err := io.ReadAll(resp.Body) require.NoError(t, err) - + current := regxPattern.ReplaceAllLiteralString(string(actual), "") require.Equal(t, strings.TrimSpace(string(tt.data)), - strings.TrimSpace(string(actual))) + strings.TrimSpace(current)) }) } } diff --git a/plugins/outputs/prometheus_client/prometheus_client_v2_test.go b/plugins/outputs/prometheus_client/prometheus_client_v2_test.go index 5d4f13c85..7c187ce29 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_v2_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_v2_test.go @@ -5,6 +5,7 @@ import ( "io" "net/http" "net/http/httptest" + "regexp" "strings" "testing" "time" @@ -386,6 +387,7 @@ cpu_time_idle{host="example.org"} 42 func TestRoundTripMetricVersion2(t *testing.T) { logger := testutil.Logger{Name: "outputs.prometheus_client"} + regxPattern := regexp.MustCompile(`.*prometheus_request_.*`) tests := []struct { name string data []byte @@ -514,9 +516,10 @@ rpc_duration_seconds_count 2693 actual, err := io.ReadAll(resp.Body) require.NoError(t, err) + current := regxPattern.ReplaceAllLiteralString(string(actual), "") require.Equal(t, strings.TrimSpace(string(tt.data)), - strings.TrimSpace(string(actual))) + strings.TrimSpace(current)) }) } }