feat(inputs.prometheus): Add internal metrics (#14424)

This commit is contained in:
tguenneguez 2024-02-12 21:55:02 +01:00 committed by GitHub
parent 8393d1b589
commit de66a2f9aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 203 additions and 29 deletions

View File

@ -166,6 +166,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## enable TLS only if any of the other options are specified.
# tls_enable = true
## This option allows you to report the status of prometheus requests.
# enable_request_metrics = false
## Control pod scraping based on pod namespace annotations
## Pass and drop here act like tagpass and tagdrop, but instead
## of filtering metrics they filters pod candidates for scraping
@ -356,6 +359,14 @@ All metrics receive the `url` tag indicating the related URL specified in the
Telegraf configuration. If using Kubernetes service discovery the `address`
tag is also added indicating the discovered ip address.
* prometheus_request
* tags:
* url
* address
* fields:
* response_time (float, seconds)
* content_length (int, response body length)
## Example Output
### Source
@ -390,6 +401,7 @@ cpu_usage_user,cpu=cpu0,url=http://example.org:9273/metrics gauge=1.513622603430
cpu_usage_user,cpu=cpu1,url=http://example.org:9273/metrics gauge=5.829145728641773 1505776751000000000
cpu_usage_user,cpu=cpu2,url=http://example.org:9273/metrics gauge=2.119071644805144 1505776751000000000
cpu_usage_user,cpu=cpu3,url=http://example.org:9273/metrics gauge=1.5228426395944945 1505776751000000000
prometheus_request,result=success,url=http://example.org:9273/metrics content_length=179013i,http_response_code=200i,response_time=0.051521601 1505776751000000000
```
### Output (when metric_version = 2)
@ -406,6 +418,7 @@ prometheus,cpu=cpu0,url=http://example.org:9273/metrics cpu_usage_user=1.5136226
prometheus,cpu=cpu1,url=http://example.org:9273/metrics cpu_usage_user=5.829145728641773 1505776751000000000
prometheus,cpu=cpu2,url=http://example.org:9273/metrics cpu_usage_user=2.119071644805144 1505776751000000000
prometheus,cpu=cpu3,url=http://example.org:9273/metrics cpu_usage_user=1.5228426395944945 1505776751000000000
prometheus_request,result=success,url=http://example.org:9273/metrics content_length=179013i,http_response_code=200i,response_time=0.051521601 1505776751000000000
```
### Output with timestamp included

View File

@ -75,8 +75,9 @@ type Prometheus struct {
HTTPHeaders map[string]string `toml:"http_headers"`
ResponseTimeout config.Duration `toml:"response_timeout" deprecated:"1.26.0;use 'timeout' instead"`
ContentLengthLimit config.Size `toml:"content_length_limit"`
ResponseTimeout config.Duration `toml:"response_timeout" deprecated:"1.26.0;use 'timeout' instead"`
ContentLengthLimit config.Size `toml:"content_length_limit"`
EnableRequestMetrics bool `toml:"enable_request_metrics"`
MetricVersion int `toml:"metric_version"`
@ -351,7 +352,13 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
wg.Add(1)
go func(serviceURL URLAndAddress) {
defer wg.Done()
acc.AddError(p.gatherURL(serviceURL, acc))
requestFields, tags, err := p.gatherURL(serviceURL, acc)
acc.AddError(err)
// Add metrics
if p.EnableRequestMetrics {
acc.AddFields("prometheus_request", requestFields, tags)
}
}(URL)
}
@ -360,9 +367,21 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
return nil
}
func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error {
func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[string]interface{}, map[string]string, error) {
var req *http.Request
var uClient *http.Client
requestFields := make(map[string]interface{})
tags := map[string]string{}
if p.URLTag != "" {
tags[p.URLTag] = u.OriginalURL.String()
}
if u.Address != "" {
tags["address"] = u.Address
}
for k, v := range u.Tags {
tags[k] = v
}
if u.URL.Scheme == "unix" {
path := u.URL.Query().Get("path")
if path == "" {
@ -373,7 +392,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
addr := "http://localhost" + path
req, err = http.NewRequest("GET", addr, nil)
if err != nil {
return fmt.Errorf("unable to create new request %q: %w", addr, err)
return nil, nil, fmt.Errorf("unable to create new request %q: %w", addr, err)
}
// ignore error because it's been handled before getting here
@ -398,7 +417,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
var err error
req, err = http.NewRequest("GET", u.URL.String(), nil)
if err != nil {
return fmt.Errorf("unable to create new request %q: %w", u.URL.String(), err)
return nil, nil, fmt.Errorf("unable to create new request %q: %w", u.URL.String(), err)
}
}
@ -407,7 +426,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
if p.BearerToken != "" {
token, err := os.ReadFile(p.BearerToken)
if err != nil {
return err
return nil, nil, err
}
req.Header.Set("Authorization", "Bearer "+string(token))
} else if p.BearerTokenString != "" {
@ -422,20 +441,26 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
var err error
var resp *http.Response
var start time.Time
if u.URL.Scheme != "unix" {
start = time.Now()
//nolint:bodyclose // False positive (because of if-else) - body will be closed in `defer`
resp, err = p.client.Do(req)
} else {
start = time.Now()
//nolint:bodyclose // False positive (because of if-else) - body will be closed in `defer`
resp, err = uClient.Do(req)
}
end := time.Since(start).Seconds()
if err != nil {
return fmt.Errorf("error making HTTP request to %q: %w", u.URL, err)
return requestFields, tags, fmt.Errorf("error making HTTP request to %q: %w", u.URL, err)
}
requestFields["response_time"] = end
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%q returned HTTP status %q", u.URL, resp.Status)
return requestFields, tags, fmt.Errorf("%q returned HTTP status %q", u.URL, resp.Status)
}
var body []byte
@ -449,19 +474,20 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
body, err = io.ReadAll(lr)
if err != nil {
return fmt.Errorf("error reading body: %w", err)
return requestFields, tags, fmt.Errorf("error reading body: %w", err)
}
if int64(len(body)) > limit {
p.Log.Infof("skipping %s: content length exceeded maximum body size (%d)", u.URL, limit)
return nil
return requestFields, tags, nil
}
} else {
body, err = io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading body: %w", err)
return requestFields, tags, fmt.Errorf("error reading body: %w", err)
}
}
requestFields["content_length"] = len(body)
// Parse the metrics
metricParser := parser.Parser{
Header: resp.Header,
@ -470,7 +496,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
}
metrics, err := metricParser.Parse(body)
if err != nil {
return fmt.Errorf("error reading metrics for %q: %w", u.URL, err)
return requestFields, tags, fmt.Errorf("error reading metrics for %q: %w", u.URL, err)
}
for _, metric := range metrics {
@ -501,7 +527,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
}
}
return nil
return requestFields, tags, nil
}
func (p *Prometheus) addHeaders(req *http.Request) {

View File

@ -75,7 +75,7 @@ func TestPrometheusGeneratesMetrics(t *testing.T) {
require.True(t, acc.HasFloatField("test_metric", "value"))
require.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
require.False(t, acc.HasTag("test_metric", "address"))
require.Equal(t, acc.TagValue("test_metric", "url"), ts.URL+"/metrics")
require.Equal(t, ts.URL+"/metrics", acc.TagValue("test_metric", "url"))
}
func TestPrometheusCustomHeader(t *testing.T) {
@ -163,8 +163,8 @@ func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) {
require.True(t, acc.HasFloatField("go_goroutines", "gauge"))
require.True(t, acc.HasFloatField("test_metric", "value"))
require.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
require.Equal(t, acc.TagValue("test_metric", "address"), tsAddress)
require.Equal(t, acc.TagValue("test_metric", "url"), ts.URL)
require.Equal(t, tsAddress, acc.TagValue("test_metric", "address"))
require.Equal(t, ts.URL, acc.TagValue("test_metric", "url"))
}
func TestPrometheusWithTimestamp(t *testing.T) {
@ -199,7 +199,7 @@ test_counter{label="test"} 1 1685443805885`
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(p.Gather))
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
testutil.RequireMetricsSubset(t, expected, acc.GetTelegrafMetrics())
}
func TestPrometheusGeneratesMetricsAlthoughFirstDNSFailsIntegration(t *testing.T) {
@ -404,10 +404,11 @@ go_gc_duration_seconds_count 42`
defer ts.Close()
p := &Prometheus{
Log: &testutil.Logger{},
URLs: []string{ts.URL},
URLTag: "",
MetricVersion: 2,
Log: &testutil.Logger{},
URLs: []string{ts.URL},
URLTag: "",
MetricVersion: 2,
EnableRequestMetrics: true,
}
err := p.Init()
require.NoError(t, err)
@ -444,16 +445,24 @@ go_gc_duration_seconds_count 42`
"prometheus",
map[string]string{},
map[string]interface{}{
"go_gc_duration_seconds_sum": 42.0,
"go_gc_duration_seconds_count": 42.0,
},
"go_gc_duration_seconds_sum": float64(42.0),
"go_gc_duration_seconds_count": float64(42)},
time.Unix(0, 0),
telegraf.Summary,
),
testutil.MustMetric(
"prometheus_request",
map[string]string{},
map[string]interface{}{
"content_length": int64(1),
"response_time": float64(0)},
time.Unix(0, 0),
telegraf.Untyped,
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(),
testutil.IgnoreTime(), testutil.SortMetrics())
testutil.IgnoreTime(), testutil.SortMetrics(), testutil.IgnoreFields("content_length", "response_time"))
}
func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) {
@ -577,3 +586,120 @@ func TestInitConfigSelectors(t *testing.T) {
require.NotNil(t, p.podLabelSelector)
require.NotNil(t, p.podFieldSelector)
}
func TestPrometheusInternalOk(t *testing.T) {
prommetric := `# HELP test_counter A sample test counter.
# TYPE test_counter counter
test_counter{label="test"} 1 1685443805885`
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := fmt.Fprintln(w, prommetric)
require.NoError(t, err)
}))
defer ts.Close()
p := &Prometheus{
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
EnableRequestMetrics: true,
}
require.NoError(t, p.Init())
u, err := url.Parse(ts.URL)
require.NoError(t, err)
tsAddress := u.Hostname()
expected := []telegraf.Metric{
metric.New(
"prometheus_request",
map[string]string{
"address": tsAddress},
map[string]interface{}{
"content_length": int64(1),
"response_time": float64(0)},
time.UnixMilli(0),
telegraf.Untyped,
),
}
var acc testutil.Accumulator
testutil.PrintMetrics(acc.GetTelegrafMetrics())
require.NoError(t, acc.GatherError(p.Gather))
testutil.RequireMetricsSubset(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreFields("content_length", "response_time"), testutil.IgnoreTime())
}
func TestPrometheusInternalContentBadFormat(t *testing.T) {
prommetric := `# HELP test_counter A sample test counter.
# TYPE test_counter counter
<body>Flag test</body>`
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := fmt.Fprintln(w, prommetric)
require.NoError(t, err)
}))
defer ts.Close()
p := &Prometheus{
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
EnableRequestMetrics: true,
}
require.NoError(t, p.Init())
u, err := url.Parse(ts.URL)
require.NoError(t, err)
tsAddress := u.Hostname()
expected := []telegraf.Metric{
metric.New(
"prometheus_request",
map[string]string{
"address": tsAddress},
map[string]interface{}{
"content_length": int64(94),
"response_time": float64(0)},
time.UnixMilli(0),
telegraf.Untyped,
),
}
var acc testutil.Accumulator
require.Error(t, acc.GatherError(p.Gather))
testutil.RequireMetricsSubset(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreFields("content_length", "response_time"), testutil.IgnoreTime())
}
func TestPrometheusInternalNoWeb(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
}))
defer ts.Close()
p := &Prometheus{
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
EnableRequestMetrics: true,
}
require.NoError(t, p.Init())
u, err := url.Parse(ts.URL)
require.NoError(t, err)
tsAddress := u.Hostname()
expected := []telegraf.Metric{
metric.New(
"prometheus_request",
map[string]string{
"address": tsAddress},
map[string]interface{}{
"content_length": int64(94),
"response_time": float64(0)},
time.UnixMilli(0),
telegraf.Untyped,
),
}
var acc testutil.Accumulator
testutil.PrintMetrics(acc.GetTelegrafMetrics())
require.Error(t, acc.GatherError(p.Gather))
testutil.RequireMetricsSubset(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreFields("content_length", "response_time"), testutil.IgnoreTime())
}

View File

@ -149,6 +149,9 @@
## enable TLS only if any of the other options are specified.
# tls_enable = true
## This option allows you to report the status of prometheus requests.
# enable_request_metrics = false
## Control pod scraping based on pod namespace annotations
## Pass and drop here act like tagpass and tagdrop, but instead
## of filtering metrics they filters pod candidates for scraping

View File

@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"strings"
"testing"
"time"
@ -356,6 +357,8 @@ cpu_time_idle{host="example.org"} 42
func TestRoundTripMetricVersion1(t *testing.T) {
logger := testutil.Logger{Name: "outputs.prometheus_client"}
regxPattern := regexp.MustCompile(`.*prometheus_request_.*`)
tests := []struct {
name string
data []byte
@ -483,10 +486,10 @@ rpc_duration_seconds_count 2693
actual, err := io.ReadAll(resp.Body)
require.NoError(t, err)
current := regxPattern.ReplaceAllLiteralString(string(actual), "")
require.Equal(t,
strings.TrimSpace(string(tt.data)),
strings.TrimSpace(string(actual)))
strings.TrimSpace(current))
})
}
}

View File

@ -5,6 +5,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"regexp"
"strings"
"testing"
"time"
@ -386,6 +387,7 @@ cpu_time_idle{host="example.org"} 42
func TestRoundTripMetricVersion2(t *testing.T) {
logger := testutil.Logger{Name: "outputs.prometheus_client"}
regxPattern := regexp.MustCompile(`.*prometheus_request_.*`)
tests := []struct {
name string
data []byte
@ -514,9 +516,10 @@ rpc_duration_seconds_count 2693
actual, err := io.ReadAll(resp.Body)
require.NoError(t, err)
current := regxPattern.ReplaceAllLiteralString(string(actual), "")
require.Equal(t,
strings.TrimSpace(string(tt.data)),
strings.TrimSpace(string(actual)))
strings.TrimSpace(current))
})
}
}