diff --git a/plugins/inputs/prometheus/parser_test.go b/plugins/inputs/prometheus/parser_test.go deleted file mode 100644 index 48bba9759..000000000 --- a/plugins/inputs/prometheus/parser_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package prometheus - -import ( - "fmt" - "net/http" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -const validUniqueGauge = ` -# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. -# TYPE cadvisor_version_info gauge -cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",` + - `kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 -` - -const validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source -# TYPE get_token_fail_count counter -get_token_fail_count 0 -` - -const validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. -# TYPE http_request_duration_microseconds summary -http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 -http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 -http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 -http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 -http_request_duration_microseconds_count{handler="prometheus"} 9 -` - -const validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. -# TYPE apiserver_request_latencies histogram -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 -apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 -apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 -` - -func TestParseValidPrometheus(t *testing.T) { - // Gauge value - metrics, err := Parse([]byte(validUniqueGauge), http.Header{}, false) - require.NoError(t, err) - require.Len(t, metrics, 1) - require.Equal(t, "cadvisor_version_info", metrics[0].Name()) - require.Equal(t, map[string]interface{}{ - "gauge": float64(1), - }, metrics[0].Fields()) - require.Equal(t, map[string]string{ - "osVersion": "CentOS Linux 7 (Core)", - "cadvisorRevision": "", - "cadvisorVersion": "", - "dockerVersion": "1.8.2", - "kernelVersion": "3.10.0-229.20.1.el7.x86_64", - }, metrics[0].Tags()) - - // Counter value - metrics, err = Parse([]byte(validUniqueCounter), http.Header{}, false) - require.NoError(t, err) - require.Len(t, metrics, 1) - require.Equal(t, "get_token_fail_count", metrics[0].Name()) - require.Equal(t, map[string]interface{}{ - "counter": float64(0), - }, metrics[0].Fields()) - require.Equal(t, map[string]string{}, metrics[0].Tags()) - - // Summary data - //SetDefaultTags(map[string]string{}) - metrics, err = Parse([]byte(validUniqueSummary), http.Header{}, false) - require.NoError(t, err) - require.Len(t, metrics, 1) - require.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) - require.Equal(t, map[string]interface{}{ - "0.5": 552048.506, - "0.9": 5.876804288e+06, - "0.99": 5.876804288e+06, - "count": 9.0, - "sum": 1.8909097205e+07, - }, metrics[0].Fields()) - require.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) - - // histogram data - metrics, err = Parse([]byte(validUniqueHistogram), http.Header{}, false) - require.NoError(t, err) - require.Len(t, metrics, 1) - require.Equal(t, "apiserver_request_latencies", metrics[0].Name()) - require.Equal(t, map[string]interface{}{ - "500000": 2000.0, - "count": 2025.0, - "sum": 1.02726334e+08, - "250000": 1997.0, - "2e+06": 2012.0, - "4e+06": 2017.0, - "8e+06": 2024.0, - "+Inf": 2025.0, - "125000": 1994.0, - "1e+06": 2005.0, - }, metrics[0].Fields()) - require.Equal(t, - map[string]string{"verb": "POST", "resource": "bindings"}, - metrics[0].Tags()) -} - -func TestMetricsWithTimestamp(t *testing.T) { - testTime := time.Date(2020, time.October, 4, 17, 0, 0, 0, time.UTC) - testTimeUnix := testTime.UnixNano() / int64(time.Millisecond) - metricsWithTimestamps := fmt.Sprintf(` -# TYPE test_counter counter -test_counter{label="test"} 1 %d -`, testTimeUnix) - - // IgnoreTimestamp is false - metrics, err := Parse([]byte(metricsWithTimestamps), http.Header{}, false) - require.NoError(t, err) - require.Len(t, metrics, 1) - require.Equal(t, "test_counter", metrics[0].Name()) - require.Equal(t, map[string]interface{}{ - "counter": float64(1), - }, metrics[0].Fields()) - require.Equal(t, map[string]string{ - "label": "test", - }, metrics[0].Tags()) - require.Equal(t, testTime, metrics[0].Time().UTC()) - - // IgnoreTimestamp is true - metrics, err = Parse([]byte(metricsWithTimestamps), http.Header{}, true) - require.NoError(t, err) - require.Len(t, metrics, 1) - require.Equal(t, "test_counter", metrics[0].Name()) - require.Equal(t, map[string]interface{}{ - "counter": float64(1), - }, metrics[0].Fields()) - require.Equal(t, map[string]string{ - "label": "test", - }, metrics[0].Tags()) - require.WithinDuration(t, time.Now(), metrics[0].Time().UTC(), 5*time.Second) -} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 798b13058..d82eeb407 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -27,7 +27,7 @@ import ( "github.com/influxdata/telegraf/internal" httpconfig "github.com/influxdata/telegraf/plugins/common/http" "github.com/influxdata/telegraf/plugins/inputs" - parserV2 "github.com/influxdata/telegraf/plugins/parsers/prometheus" + parser "github.com/influxdata/telegraf/plugins/parsers/prometheus" ) //go:embed sample.conf @@ -207,6 +207,10 @@ func (p *Prometheus) Init() error { return err } + if p.MetricVersion == 0 { + p.MetricVersion = 1 + } + ctx := context.Background() if p.ResponseTimeout != 0 { p.HTTPClientConfig.Timeout = p.ResponseTimeout @@ -357,14 +361,14 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error { func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error { var req *http.Request - var err error var uClient *http.Client - var metrics []telegraf.Metric if u.URL.Scheme == "unix" { path := u.URL.Query().Get("path") if path == "" { path = "/metrics" } + + var err error addr := "http://localhost" + path req, err = http.NewRequest("GET", addr, nil) if err != nil { @@ -390,6 +394,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error if u.URL.Path == "" { u.URL.Path = "/metrics" } + var err error req, err = http.NewRequest("GET", u.URL.String(), nil) if err != nil { return fmt.Errorf("unable to create new request %q: %w", u.URL.String(), err) @@ -414,6 +419,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error req.Header.Set(key, value) } + var err error var resp *http.Response if u.URL.Scheme != "unix" { //nolint:bodyclose // False positive (because of if-else) - body will be closed in `defer` @@ -436,16 +442,13 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error return fmt.Errorf("error reading body: %w", err) } - if p.MetricVersion == 2 { - parser := parserV2.Parser{ - Header: resp.Header, - IgnoreTimestamp: p.IgnoreTimestamp, - } - metrics, err = parser.Parse(body) - } else { - metrics, err = Parse(body, resp.Header, p.IgnoreTimestamp) + // Parse the metrics + metricParser := parser.Parser{ + Header: resp.Header, + MetricVersion: p.MetricVersion, + IgnoreTimestamp: p.IgnoreTimestamp, } - + metrics, err := metricParser.Parse(body) if err != nil { return fmt.Errorf("error reading metrics for %q: %w", u.URL, err) } diff --git a/plugins/inputs/prometheus/prometheus_test.go b/plugins/inputs/prometheus/prometheus_test.go index 2b1c37252..b28dfb1ea 100644 --- a/plugins/inputs/prometheus/prometheus_test.go +++ b/plugins/inputs/prometheus/prometheus_test.go @@ -476,15 +476,14 @@ func TestPrometheusGeneratesMetricsWithIgnoreTimestamp(t *testing.T) { URLTag: "url", IgnoreTimestamp: true, } - err := p.Init() - require.NoError(t, err) + require.NoError(t, p.Init()) var acc testutil.Accumulator + require.NoError(t, acc.GatherError(p.Gather)) - err = acc.GatherError(p.Gather) - require.NoError(t, err) - - m, _ := acc.Get("test_metric") + m, found := acc.Get("test_metric") + require.True(t, found) + require.NotNil(t, m) require.WithinDuration(t, time.Now(), m.Time, 5*time.Second) } diff --git a/plugins/parsers/prometheus/common/helpers.go b/plugins/parsers/prometheus/common.go similarity index 73% rename from plugins/parsers/prometheus/common/helpers.go rename to plugins/parsers/prometheus/common.go index bc1be0339..91b61da26 100644 --- a/plugins/parsers/prometheus/common/helpers.go +++ b/plugins/parsers/prometheus/common.go @@ -1,4 +1,4 @@ -package common +package prometheus import ( "github.com/influxdata/telegraf" @@ -20,16 +20,15 @@ func ValueType(mt dto.MetricType) telegraf.ValueType { } } -// Get labels from metric -func MakeLabels(m *dto.Metric, defaultTags map[string]string) map[string]string { +func GetTagsFromLabels(m *dto.Metric, defaultTags map[string]string) map[string]string { result := map[string]string{} for key, value := range defaultTags { result[key] = value } - for _, lp := range m.Label { - result[lp.GetName()] = lp.GetValue() + for _, label := range m.Label { + result[label.GetName()] = label.GetValue() } return result diff --git a/plugins/parsers/prometheus/parser.go b/plugins/parsers/prometheus/parser.go index 58c4e683a..6a481f471 100644 --- a/plugins/parsers/prometheus/parser.go +++ b/plugins/parsers/prometheus/parser.go @@ -1,217 +1,38 @@ package prometheus import ( - "bufio" - "bytes" - "errors" "fmt" - "io" - "math" - "mime" "net/http" - "time" - - "github.com/matttproud/golang_protobuf_extensions/pbutil" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers" - "github.com/influxdata/telegraf/plugins/parsers/prometheus/common" ) type Parser struct { - DefaultTags map[string]string `toml:"-"` - Header http.Header `toml:"-"` // set by the prometheus input IgnoreTimestamp bool `toml:"prometheus_ignore_timestamp"` -} - -func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { - var parser expfmt.TextParser - var metrics []telegraf.Metric - var err error - - // Make sure we have a finishing newline but no trailing one - buf = bytes.TrimPrefix(buf, []byte("\n")) - if !bytes.HasSuffix(buf, []byte("\n")) { - buf = append(buf, []byte("\n")...) - } - - // Read raw data - buffer := bytes.NewBuffer(buf) - reader := bufio.NewReader(buffer) - - // Prepare output - metricFamilies := make(map[string]*dto.MetricFamily) - mediatype, params, err := mime.ParseMediaType(p.Header.Get("Content-Type")) - if err == nil && mediatype == "application/vnd.google.protobuf" && - params["encoding"] == "delimited" && - params["proto"] == "io.prometheus.client.MetricFamily" { - for { - mf := &dto.MetricFamily{} - if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { - if errors.Is(ierr, io.EOF) { - break - } - return nil, fmt.Errorf("reading metric family protocol buffer failed: %w", ierr) - } - metricFamilies[mf.GetName()] = mf - } - } else { - metricFamilies, err = parser.TextToMetricFamilies(reader) - if err != nil { - return nil, fmt.Errorf("reading text format failed: %w", err) - } - } - - now := time.Now() - - // read metrics - for metricName, mf := range metricFamilies { - for _, m := range mf.Metric { - // reading tags - tags := common.MakeLabels(m, p.DefaultTags) - t := p.GetTimestamp(m, now) - - if mf.GetType() == dto.MetricType_SUMMARY { - // summary metric - telegrafMetrics := makeQuantiles(m, tags, metricName, mf.GetType(), t) - metrics = append(metrics, telegrafMetrics...) - } else if mf.GetType() == dto.MetricType_HISTOGRAM { - // histogram metric - telegrafMetrics := makeBuckets(m, tags, metricName, mf.GetType(), t) - metrics = append(metrics, telegrafMetrics...) - } else { - // standard metric - // reading fields - fields := getNameAndValue(m, metricName) - // converting to telegraf metric - if len(fields) > 0 { - m := metric.New("prometheus", tags, fields, t, common.ValueType(mf.GetType())) - metrics = append(metrics, m) - } - } - } - } - - return metrics, err -} - -func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { - metrics, err := p.Parse([]byte(line)) - if err != nil { - return nil, err - } - - if len(metrics) < 1 { - return nil, fmt.Errorf("no metrics in line") - } - - if len(metrics) > 1 { - return nil, fmt.Errorf("more than one metric in line") - } - - return metrics[0], nil + MetricVersion int `toml:"prometheus_metric_version"` + Header http.Header `toml:"-"` // set by the prometheus input + DefaultTags map[string]string `toml:"-"` } func (p *Parser) SetDefaultTags(tags map[string]string) { p.DefaultTags = tags } -// Get Quantiles for summary metric & Buckets for histogram -func makeQuantiles(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric { - metrics := make([]telegraf.Metric, 0, len(m.GetSummary().Quantile)+1) - fields := make(map[string]interface{}) - - fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount()) - fields[metricName+"_sum"] = m.GetSummary().GetSampleSum() - met := metric.New("prometheus", tags, fields, t, common.ValueType(metricType)) - metrics = append(metrics, met) - - for _, q := range m.GetSummary().Quantile { - newTags := tags - fields = make(map[string]interface{}) - - newTags["quantile"] = fmt.Sprint(q.GetQuantile()) - fields[metricName] = q.GetValue() - - quantileMetric := metric.New("prometheus", newTags, fields, t, common.ValueType(metricType)) - metrics = append(metrics, quantileMetric) +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + switch p.MetricVersion { + case 0, 2: + return p.parseV2(buf) + case 1: + return p.parseV1(buf) } - return metrics -} - -// Get Buckets from histogram metric -func makeBuckets(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric { - metrics := make([]telegraf.Metric, 0, len(m.GetHistogram().Bucket)+2) - fields := make(map[string]interface{}) - - fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount()) - fields[metricName+"_sum"] = m.GetHistogram().GetSampleSum() - - met := metric.New("prometheus", tags, fields, t, common.ValueType(metricType)) - metrics = append(metrics, met) - - infSeen := false - for _, b := range m.GetHistogram().Bucket { - newTags := tags - fields = make(map[string]interface{}) - newTags["le"] = fmt.Sprint(b.GetUpperBound()) - fields[metricName+"_bucket"] = float64(b.GetCumulativeCount()) - - histogramMetric := metric.New("prometheus", newTags, fields, t, common.ValueType(metricType)) - metrics = append(metrics, histogramMetric) - if math.IsInf(b.GetUpperBound(), +1) { - infSeen = true - } - } - // Infinity bucket is required for proper function of histogram in prometheus - if !infSeen { - newTags := tags - newTags["le"] = "+Inf" - - fields = make(map[string]interface{}) - fields[metricName+"_bucket"] = float64(m.GetHistogram().GetSampleCount()) - - histogramInfMetric := metric.New("prometheus", newTags, fields, t, common.ValueType(metricType)) - metrics = append(metrics, histogramInfMetric) - } - return metrics -} - -// Get name and value from metric -func getNameAndValue(m *dto.Metric, metricName string) map[string]interface{} { - fields := make(map[string]interface{}) - if m.Gauge != nil { - if !math.IsNaN(m.GetGauge().GetValue()) { - fields[metricName] = m.GetGauge().GetValue() - } - } else if m.Counter != nil { - if !math.IsNaN(m.GetCounter().GetValue()) { - fields[metricName] = m.GetCounter().GetValue() - } - } else if m.Untyped != nil { - if !math.IsNaN(m.GetUntyped().GetValue()) { - fields[metricName] = m.GetUntyped().GetValue() - } - } - return fields -} - -func (p *Parser) GetTimestamp(m *dto.Metric, now time.Time) time.Time { - var t time.Time - if !p.IgnoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, m.GetTimestampMs()*1000000) - } else { - t = now - } - return t + return nil, fmt.Errorf("unknown prometheus metric version %d", p.MetricVersion) } func init() { parsers.Add("prometheus", func(defaultMetricName string) telegraf.Parser { return &Parser{} - }) + }, + ) } diff --git a/plugins/parsers/prometheus/parser_test.go b/plugins/parsers/prometheus/parser_test.go index 6d66be4c2..b3f5685dd 100644 --- a/plugins/parsers/prometheus/parser_test.go +++ b/plugins/parsers/prometheus/parser_test.go @@ -14,43 +14,44 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/plugins/parsers/prometheus/common" "github.com/influxdata/telegraf/testutil" ) const ( - //nolint:lll // conditionally long lines allowed - validUniqueGauge = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. -# TYPE cadvisor_version_info gauge -cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 -` + validUniqueGauge = ` + # HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. + # TYPE cadvisor_version_info gauge + cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",` + + `kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 + ` + validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source -# TYPE get_token_fail_count counter -get_token_fail_count 0 -` + # TYPE get_token_fail_count counter + get_token_fail_count 0 + ` validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. -# TYPE http_request_duration_microseconds summary -http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 -http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 -http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 -http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 -http_request_duration_microseconds_count{handler="prometheus"} 9 -` + # TYPE http_request_duration_microseconds summary + http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 + http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 + http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 + http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 + http_request_duration_microseconds_count{handler="prometheus"} 9 + ` validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. -# TYPE apiserver_request_latencies histogram -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 -apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 -apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 -` + # TYPE apiserver_request_latencies histogram + apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 + apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 + apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 + apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 + apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 + apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 + apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 + apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 + apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 + apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 + ` validUniqueHistogramJSON = `{ "name": "apiserver_request_latencies", "help": "Response latency distribution in microseconds for each verb, resource and client.", @@ -643,12 +644,116 @@ func TestHistogramInfBucketPresence(t *testing.T) { require.NoError(t, err) m := metricFamily.Metric[0] - tags := common.MakeLabels(m, map[string]string{}) - metrics := makeBuckets(m, tags, *metricFamily.Name, metricFamily.GetType(), time.Now()) + tags := GetTagsFromLabels(m, map[string]string{}) + metrics := makeBucketsV2(m, tags, *metricFamily.Name, metricFamily.GetType(), time.Now()) testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) } +func TestParseValidPrometheusV1(t *testing.T) { + // Gauge value + plugin := &Parser{MetricVersion: 1} + metrics, err := plugin.Parse([]byte(validUniqueGauge)) + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "cadvisor_version_info", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ + "gauge": float64(1), + }, metrics[0].Fields()) + require.Equal(t, map[string]string{ + "osVersion": "CentOS Linux 7 (Core)", + "cadvisorRevision": "", + "cadvisorVersion": "", + "dockerVersion": "1.8.2", + "kernelVersion": "3.10.0-229.20.1.el7.x86_64", + }, metrics[0].Tags()) + + // Counter value + plugin = &Parser{MetricVersion: 1} + metrics, err = plugin.Parse([]byte(validUniqueCounter)) + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "get_token_fail_count", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ + "counter": float64(0), + }, metrics[0].Fields()) + require.Equal(t, map[string]string{}, metrics[0].Tags()) + + // Summary data + plugin = &Parser{MetricVersion: 1} + metrics, err = plugin.Parse([]byte(validUniqueSummary)) + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ + "0.5": 552048.506, + "0.9": 5.876804288e+06, + "0.99": 5.876804288e+06, + "count": 9.0, + "sum": 1.8909097205e+07, + }, metrics[0].Fields()) + require.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) + + // histogram data + plugin = &Parser{MetricVersion: 1} + metrics, err = plugin.Parse([]byte(validUniqueHistogram)) + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "apiserver_request_latencies", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ + "500000": 2000.0, + "count": 2025.0, + "sum": 1.02726334e+08, + "250000": 1997.0, + "2e+06": 2012.0, + "4e+06": 2017.0, + "8e+06": 2024.0, + "+Inf": 2025.0, + "125000": 1994.0, + "1e+06": 2005.0, + }, metrics[0].Fields()) + require.Equal(t, + map[string]string{"verb": "POST", "resource": "bindings"}, + metrics[0].Tags()) +} + +func TestMetricsWithTimestampV1(t *testing.T) { + testTime := time.Date(2020, time.October, 4, 17, 0, 0, 0, time.UTC) + testTimeUnix := testTime.UnixNano() / int64(time.Millisecond) + metricsWithTimestamps := fmt.Sprintf(` +# TYPE test_counter counter +test_counter{label="test"} 1 %d +`, testTimeUnix) + + // IgnoreTimestamp is false + plugin := &Parser{MetricVersion: 1} + metrics, err := plugin.Parse([]byte(metricsWithTimestamps)) + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "test_counter", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ + "counter": float64(1), + }, metrics[0].Fields()) + require.Equal(t, map[string]string{ + "label": "test", + }, metrics[0].Tags()) + require.Equal(t, testTime, metrics[0].Time().UTC()) + + // IgnoreTimestamp is true + plugin = &Parser{MetricVersion: 1, IgnoreTimestamp: true} + metrics, err = plugin.Parse([]byte(metricsWithTimestamps)) + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "test_counter", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ + "counter": float64(1), + }, metrics[0].Fields()) + require.Equal(t, map[string]string{ + "label": "test", + }, metrics[0].Tags()) + require.WithinDuration(t, time.Now(), metrics[0].Time().UTC(), 5*time.Second) +} + const benchmarkData = ` # HELP benchmark_a Test metric for benchmarking # TYPE benchmark_a gauge diff --git a/plugins/inputs/prometheus/parser.go b/plugins/parsers/prometheus/parser_v1.go similarity index 85% rename from plugins/inputs/prometheus/parser.go rename to plugins/parsers/prometheus/parser_v1.go index 6223a8ebd..a3fe30fd1 100644 --- a/plugins/inputs/prometheus/parser.go +++ b/plugins/parsers/prometheus/parser_v1.go @@ -17,10 +17,9 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/plugins/parsers/prometheus/common" ) -func Parse(buf []byte, header http.Header, ignoreTimestamp bool) ([]telegraf.Metric, error) { +func (p *Parser) parseV1(buf []byte) ([]telegraf.Metric, error) { var parser expfmt.TextParser var metrics []telegraf.Metric var err error @@ -33,7 +32,7 @@ func Parse(buf []byte, header http.Header, ignoreTimestamp bool) ([]telegraf.Met // Prepare output metricFamilies := make(map[string]*dto.MetricFamily) - if isProtobuf(header) { + if isProtobuf(p.Header) { for { mf := &dto.MetricFamily{} if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { @@ -56,35 +55,35 @@ func Parse(buf []byte, header http.Header, ignoreTimestamp bool) ([]telegraf.Met for metricName, mf := range metricFamilies { for _, m := range mf.Metric { // reading tags - tags := common.MakeLabels(m, nil) + tags := GetTagsFromLabels(m, nil) // reading fields var fields map[string]interface{} if mf.GetType() == dto.MetricType_SUMMARY { // summary metric - fields = makeQuantiles(m) + fields = makeQuantilesV1(m) fields["count"] = float64(m.GetSummary().GetSampleCount()) //nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40 fields["sum"] = float64(m.GetSummary().GetSampleSum()) } else if mf.GetType() == dto.MetricType_HISTOGRAM { // histogram metric - fields = makeBuckets(m) + fields = makeBucketsV1(m) fields["count"] = float64(m.GetHistogram().GetSampleCount()) //nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40 fields["sum"] = float64(m.GetHistogram().GetSampleSum()) } else { // standard metric - fields = getNameAndValue(m) + fields = getNameAndValueV1(m) } // converting to telegraf metric if len(fields) > 0 { var t time.Time - if !ignoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 { + if !p.IgnoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 { t = time.Unix(0, *m.TimestampMs*1000000) } else { t = now } - m := metric.New(metricName, tags, fields, t, common.ValueType(mf.GetType())) + m := metric.New(metricName, tags, fields, t, ValueType(mf.GetType())) metrics = append(metrics, m) } } @@ -105,7 +104,7 @@ func isProtobuf(header http.Header) bool { } // Get Quantiles from summary metric -func makeQuantiles(m *dto.Metric) map[string]interface{} { +func makeQuantilesV1(m *dto.Metric) map[string]interface{} { fields := make(map[string]interface{}) for _, q := range m.GetSummary().Quantile { if !math.IsNaN(q.GetValue()) { @@ -117,7 +116,7 @@ func makeQuantiles(m *dto.Metric) map[string]interface{} { } // Get Buckets from histogram metric -func makeBuckets(m *dto.Metric) map[string]interface{} { +func makeBucketsV1(m *dto.Metric) map[string]interface{} { fields := make(map[string]interface{}) for _, b := range m.GetHistogram().Bucket { fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount()) @@ -126,7 +125,7 @@ func makeBuckets(m *dto.Metric) map[string]interface{} { } // Get name and value from metric -func getNameAndValue(m *dto.Metric) map[string]interface{} { +func getNameAndValueV1(m *dto.Metric) map[string]interface{} { fields := make(map[string]interface{}) if m.Gauge != nil { if !math.IsNaN(m.GetGauge().GetValue()) { diff --git a/plugins/parsers/prometheus/parser_v2.go b/plugins/parsers/prometheus/parser_v2.go new file mode 100644 index 000000000..edb096430 --- /dev/null +++ b/plugins/parsers/prometheus/parser_v2.go @@ -0,0 +1,197 @@ +package prometheus + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "math" + "mime" + "time" + + "github.com/matttproud/golang_protobuf_extensions/pbutil" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +func (p *Parser) parseV2(buf []byte) ([]telegraf.Metric, error) { + var parser expfmt.TextParser + var metrics []telegraf.Metric + var err error + + // Make sure we have a finishing newline but no trailing one + buf = bytes.TrimPrefix(buf, []byte("\n")) + if !bytes.HasSuffix(buf, []byte("\n")) { + buf = append(buf, []byte("\n")...) + } + + // Read raw data + buffer := bytes.NewBuffer(buf) + reader := bufio.NewReader(buffer) + + // Prepare output + metricFamilies := make(map[string]*dto.MetricFamily) + mediatype, params, err := mime.ParseMediaType(p.Header.Get("Content-Type")) + if err == nil && mediatype == "application/vnd.google.protobuf" && + params["encoding"] == "delimited" && + params["proto"] == "io.prometheus.client.MetricFamily" { + for { + mf := &dto.MetricFamily{} + if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { + if errors.Is(ierr, io.EOF) { + break + } + return nil, fmt.Errorf("reading metric family protocol buffer failed: %w", ierr) + } + metricFamilies[mf.GetName()] = mf + } + } else { + metricFamilies, err = parser.TextToMetricFamilies(reader) + if err != nil { + return nil, fmt.Errorf("reading text format failed: %w", err) + } + } + + now := time.Now() + + // read metrics + for metricName, mf := range metricFamilies { + for _, m := range mf.Metric { + // reading tags + tags := GetTagsFromLabels(m, p.DefaultTags) + t := p.getTimestampV2(m, now) + + if mf.GetType() == dto.MetricType_SUMMARY { + // summary metric + telegrafMetrics := makeQuantilesV2(m, tags, metricName, mf.GetType(), t) + metrics = append(metrics, telegrafMetrics...) + } else if mf.GetType() == dto.MetricType_HISTOGRAM { + // histogram metric + telegrafMetrics := makeBucketsV2(m, tags, metricName, mf.GetType(), t) + metrics = append(metrics, telegrafMetrics...) + } else { + // standard metric + // reading fields + fields := getNameAndValueV2(m, metricName) + // converting to telegraf metric + if len(fields) > 0 { + m := metric.New("prometheus", tags, fields, t, ValueType(mf.GetType())) + metrics = append(metrics, m) + } + } + } + } + + return metrics, err +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line)) + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf("no metrics in line") + } + + if len(metrics) > 1 { + return nil, fmt.Errorf("more than one metric in line") + } + + return metrics[0], nil +} + +// Get Quantiles for summary metric & Buckets for histogram +func makeQuantilesV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric { + metrics := make([]telegraf.Metric, 0, len(m.GetSummary().Quantile)+1) + fields := make(map[string]interface{}) + + fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount()) + fields[metricName+"_sum"] = m.GetSummary().GetSampleSum() + met := metric.New("prometheus", tags, fields, t, ValueType(metricType)) + metrics = append(metrics, met) + + for _, q := range m.GetSummary().Quantile { + newTags := tags + fields = make(map[string]interface{}) + + newTags["quantile"] = fmt.Sprint(q.GetQuantile()) + fields[metricName] = q.GetValue() + + quantileMetric := metric.New("prometheus", newTags, fields, t, ValueType(metricType)) + metrics = append(metrics, quantileMetric) + } + return metrics +} + +// Get Buckets from histogram metric +func makeBucketsV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric { + metrics := make([]telegraf.Metric, 0, len(m.GetHistogram().Bucket)+2) + fields := make(map[string]interface{}) + + fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount()) + fields[metricName+"_sum"] = m.GetHistogram().GetSampleSum() + + met := metric.New("prometheus", tags, fields, t, ValueType(metricType)) + metrics = append(metrics, met) + + infSeen := false + for _, b := range m.GetHistogram().Bucket { + newTags := tags + fields = make(map[string]interface{}) + newTags["le"] = fmt.Sprint(b.GetUpperBound()) + fields[metricName+"_bucket"] = float64(b.GetCumulativeCount()) + + histogramMetric := metric.New("prometheus", newTags, fields, t, ValueType(metricType)) + metrics = append(metrics, histogramMetric) + if math.IsInf(b.GetUpperBound(), +1) { + infSeen = true + } + } + // Infinity bucket is required for proper function of histogram in prometheus + if !infSeen { + newTags := tags + newTags["le"] = "+Inf" + + fields = make(map[string]interface{}) + fields[metricName+"_bucket"] = float64(m.GetHistogram().GetSampleCount()) + + histogramInfMetric := metric.New("prometheus", newTags, fields, t, ValueType(metricType)) + metrics = append(metrics, histogramInfMetric) + } + return metrics +} + +// Get name and value from metric +func getNameAndValueV2(m *dto.Metric, metricName string) map[string]interface{} { + fields := make(map[string]interface{}) + if m.Gauge != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields[metricName] = m.GetGauge().GetValue() + } + } else if m.Counter != nil { + if !math.IsNaN(m.GetCounter().GetValue()) { + fields[metricName] = m.GetCounter().GetValue() + } + } else if m.Untyped != nil { + if !math.IsNaN(m.GetUntyped().GetValue()) { + fields[metricName] = m.GetUntyped().GetValue() + } + } + return fields +} + +func (p *Parser) getTimestampV2(m *dto.Metric, now time.Time) time.Time { + var t time.Time + if !p.IgnoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, m.GetTimestampMs()*1000000) + } else { + t = now + } + return t +}