diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index b71650168..ff660ab20 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -5,15 +5,16 @@ using a configurable parser into [metrics][]. This allows, for example, the `kafka_consumer` input plugin to process messages in either InfluxDB Line Protocol or in JSON format. -- [InfluxDB Line Protocol](/plugins/parsers/influx) - [Collectd](/plugins/parsers/collectd) - [CSV](/plugins/parsers/csv) - [Dropwizard](/plugins/parsers/dropwizard) - [Graphite](/plugins/parsers/graphite) - [Grok](/plugins/parsers/grok) +- [InfluxDB Line Protocol](/plugins/parsers/influx) - [JSON](/plugins/parsers/json) - [Logfmt](/plugins/parsers/logfmt) - [Nagios](/plugins/parsers/nagios) +- [Prometheus](/plugins/parsers/prometheus) - [Value](/plugins/parsers/value), ie: 45 or "booyah" - [Wavefront](/plugins/parsers/wavefront) diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go index 0726c8771..c2235c692 100644 --- a/plugins/inputs/prometheus/parser.go +++ b/plugins/inputs/prometheus/parser.go @@ -1,8 +1,5 @@ package prometheus -// Parser inspired from -// https://github.com/prometheus/prom2json/blob/master/main.go - import ( "bufio" "bytes" @@ -15,168 +12,27 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + . "github.com/influxdata/telegraf/plugins/parsers/prometheus/common" + "github.com/matttproud/golang_protobuf_extensions/pbutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" ) -// Parse returns a slice of Metrics from a text representation of a -// metrics -func ParseV2(buf []byte, header http.Header) ([]telegraf.Metric, error) { - var metrics []telegraf.Metric - var parser expfmt.TextParser - // parse even if the buffer begins with a newline - buf = bytes.TrimPrefix(buf, []byte("\n")) - // Read raw data - buffer := bytes.NewBuffer(buf) - reader := bufio.NewReader(buffer) - - mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) - // Prepare output - metricFamilies := make(map[string]*dto.MetricFamily) - - if err == nil && mediatype == "application/vnd.google.protobuf" && - params["encoding"] == "delimited" && - params["proto"] == "io.prometheus.client.MetricFamily" { - for { - mf := &dto.MetricFamily{} - if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { - if ierr == io.EOF { - break - } - return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr) - } - metricFamilies[mf.GetName()] = mf - } - } else { - metricFamilies, err = parser.TextToMetricFamilies(reader) - if err != nil { - return nil, fmt.Errorf("reading text format failed: %s", err) - } - } - - // make sure all metrics have a consistent timestamp so that metrics don't straddle two different seconds - now := time.Now() - // read metrics - for metricName, mf := range metricFamilies { - for _, m := range mf.Metric { - // reading tags - tags := makeLabels(m) - - if mf.GetType() == dto.MetricType_SUMMARY { - // summary metric - telegrafMetrics := makeQuantilesV2(m, tags, metricName, mf.GetType(), now) - metrics = append(metrics, telegrafMetrics...) - } else if mf.GetType() == dto.MetricType_HISTOGRAM { - // histogram metric - telegrafMetrics := makeBucketsV2(m, tags, metricName, mf.GetType(), now) - metrics = append(metrics, telegrafMetrics...) - } else { - // standard metric - // reading fields - fields := getNameAndValueV2(m, metricName) - // converting to telegraf metric - if len(fields) > 0 { - var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, *m.TimestampMs*1000000) - } else { - t = now - } - metric, err := metric.New("prometheus", tags, fields, t, valueType(mf.GetType())) - if err == nil { - metrics = append(metrics, metric) - } - } - } - } - } - - return metrics, err -} - -// Get Quantiles for summary metric & Buckets for histogram -func makeQuantilesV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric { - var metrics []telegraf.Metric - fields := make(map[string]interface{}) - var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, *m.TimestampMs*1000000) - } else { - t = now - } - fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount()) - fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum()) - met, err := metric.New("prometheus", tags, fields, t, valueType(metricType)) - if err == nil { - metrics = append(metrics, met) - } - - for _, q := range m.GetSummary().Quantile { - newTags := tags - fields = make(map[string]interface{}) - - newTags["quantile"] = fmt.Sprint(q.GetQuantile()) - fields[metricName] = float64(q.GetValue()) - - quantileMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType)) - if err == nil { - metrics = append(metrics, quantileMetric) - } - } - return metrics -} - -// Get Buckets from histogram metric -func makeBucketsV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric { - var metrics []telegraf.Metric - fields := make(map[string]interface{}) - var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, *m.TimestampMs*1000000) - } else { - t = now - } - fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount()) - fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum()) - - met, err := metric.New("prometheus", tags, fields, t, valueType(metricType)) - if err == nil { - metrics = append(metrics, met) - } - - for _, b := range m.GetHistogram().Bucket { - newTags := tags - fields = make(map[string]interface{}) - newTags["le"] = fmt.Sprint(b.GetUpperBound()) - fields[metricName+"_bucket"] = float64(b.GetCumulativeCount()) - - histogramMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType)) - if err == nil { - metrics = append(metrics, histogramMetric) - } - } - return metrics -} - -// Parse returns a slice of Metrics from a text representation of a -// metrics func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { - var metrics []telegraf.Metric var parser expfmt.TextParser + var metrics []telegraf.Metric + var err error // parse even if the buffer begins with a newline buf = bytes.TrimPrefix(buf, []byte("\n")) // Read raw data buffer := bytes.NewBuffer(buf) reader := bufio.NewReader(buffer) - mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) // Prepare output metricFamilies := make(map[string]*dto.MetricFamily) - if err == nil && mediatype == "application/vnd.google.protobuf" && - params["encoding"] == "delimited" && - params["proto"] == "io.prometheus.client.MetricFamily" { + if isProtobuf(header) { for { mf := &dto.MetricFamily{} if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { @@ -194,13 +50,13 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { } } - // make sure all metrics have a consistent timestamp so that metrics don't straddle two different seconds now := time.Now() // read metrics for metricName, mf := range metricFamilies { for _, m := range mf.Metric { // reading tags - tags := makeLabels(m) + tags := MakeLabels(m, nil) + // reading fields var fields map[string]interface{} if mf.GetType() == dto.MetricType_SUMMARY { @@ -226,7 +82,7 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { } else { t = now } - metric, err := metric.New(metricName, tags, fields, t, valueType(mf.GetType())) + metric, err := metric.New(metricName, tags, fields, t, ValueType(mf.GetType())) if err == nil { metrics = append(metrics, metric) } @@ -237,19 +93,16 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { return metrics, err } -func valueType(mt dto.MetricType) telegraf.ValueType { - switch mt { - case dto.MetricType_COUNTER: - return telegraf.Counter - case dto.MetricType_GAUGE: - return telegraf.Gauge - case dto.MetricType_SUMMARY: - return telegraf.Summary - case dto.MetricType_HISTOGRAM: - return telegraf.Histogram - default: - return telegraf.Untyped +func isProtobuf(header http.Header) bool { + mediatype, params, error := mime.ParseMediaType(header.Get("Content-Type")) + + if error != nil { + return false } + + return mediatype == "application/vnd.google.protobuf" && + params["encoding"] == "delimited" && + params["proto"] == "io.prometheus.client.MetricFamily" } // Get Quantiles from summary metric @@ -272,15 +125,6 @@ func makeBuckets(m *dto.Metric) map[string]interface{} { return fields } -// Get labels from metric -func makeLabels(m *dto.Metric) map[string]string { - result := map[string]string{} - for _, lp := range m.Label { - result[lp.GetName()] = lp.GetValue() - } - return result -} - // Get name and value from metric func getNameAndValue(m *dto.Metric) map[string]interface{} { fields := make(map[string]interface{}) @@ -299,22 +143,3 @@ func getNameAndValue(m *dto.Metric) map[string]interface{} { } return fields } - -// Get name and value from metric -func getNameAndValueV2(m *dto.Metric, metricName string) map[string]interface{} { - fields := make(map[string]interface{}) - if m.Gauge != nil { - if !math.IsNaN(m.GetGauge().GetValue()) { - fields[metricName] = float64(m.GetGauge().GetValue()) - } - } else if m.Counter != nil { - if !math.IsNaN(m.GetCounter().GetValue()) { - fields[metricName] = float64(m.GetCounter().GetValue()) - } - } else if m.Untyped != nil { - if !math.IsNaN(m.GetUntyped().GetValue()) { - fields[metricName] = float64(m.GetUntyped().GetValue()) - } - } - return fields -} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 70d72e0b0..5a7891ceb 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" + parser_v2 "github.com/influxdata/telegraf/plugins/parsers/prometheus" ) const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1` @@ -329,7 +330,8 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error } if p.MetricVersion == 2 { - metrics, err = ParseV2(body, resp.Header) + parser := parser_v2.Parser{} + metrics, err = parser.Parse(body) } else { metrics, err = Parse(body, resp.Header) } diff --git a/plugins/parsers/prometheus/README.md b/plugins/parsers/prometheus/README.md new file mode 100644 index 000000000..931008e88 --- /dev/null +++ b/plugins/parsers/prometheus/README.md @@ -0,0 +1,17 @@ +# Prometheus Text-Based Format + +There are no additional configuration options for [Prometheus Text-Based Format][]. The metrics are parsed directly into Telegraf metrics. It is used internally in [prometheus input](/plugins/inputs/prometheus) or can be used in [http_listener_v2](/plugins/inputs/http_listener_v2) to simulate Pushgateway. + +[Prometheus Text-Based Format]: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format + +```toml +[[inputs.file]] + files = ["example"] + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "prometheus" + +``` diff --git a/plugins/parsers/prometheus/common/helpers.go b/plugins/parsers/prometheus/common/helpers.go new file mode 100644 index 000000000..bc1be0339 --- /dev/null +++ b/plugins/parsers/prometheus/common/helpers.go @@ -0,0 +1,36 @@ +package common + +import ( + "github.com/influxdata/telegraf" + dto "github.com/prometheus/client_model/go" +) + +func ValueType(mt dto.MetricType) telegraf.ValueType { + switch mt { + case dto.MetricType_COUNTER: + return telegraf.Counter + case dto.MetricType_GAUGE: + return telegraf.Gauge + case dto.MetricType_SUMMARY: + return telegraf.Summary + case dto.MetricType_HISTOGRAM: + return telegraf.Histogram + default: + return telegraf.Untyped + } +} + +// Get labels from metric +func MakeLabels(m *dto.Metric, defaultTags map[string]string) map[string]string { + result := map[string]string{} + + for key, value := range defaultTags { + result[key] = value + } + + for _, lp := range m.Label { + result[lp.GetName()] = lp.GetValue() + } + + return result +} diff --git a/plugins/parsers/prometheus/parser.go b/plugins/parsers/prometheus/parser.go new file mode 100644 index 000000000..c5355ffe0 --- /dev/null +++ b/plugins/parsers/prometheus/parser.go @@ -0,0 +1,179 @@ +package prometheus + +import ( + "bufio" + "bytes" + "fmt" + "math" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + . "github.com/influxdata/telegraf/plugins/parsers/prometheus/common" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" +) + +type Parser struct { + DefaultTags map[string]string +} + +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + var parser expfmt.TextParser + var metrics []telegraf.Metric + var err error + // parse even if the buffer begins with a newline + buf = bytes.TrimPrefix(buf, []byte("\n")) + // Read raw data + buffer := bytes.NewBuffer(buf) + reader := bufio.NewReader(buffer) + + // Prepare output + metricFamilies := make(map[string]*dto.MetricFamily) + metricFamilies, err = parser.TextToMetricFamilies(reader) + if err != nil { + return nil, fmt.Errorf("reading text format failed: %s", err) + } + + now := time.Now() + + // read metrics + for metricName, mf := range metricFamilies { + for _, m := range mf.Metric { + // reading tags + tags := MakeLabels(m, p.DefaultTags) + + if mf.GetType() == dto.MetricType_SUMMARY { + // summary metric + telegrafMetrics := makeQuantiles(m, tags, metricName, mf.GetType(), now) + metrics = append(metrics, telegrafMetrics...) + } else if mf.GetType() == dto.MetricType_HISTOGRAM { + // histogram metric + telegrafMetrics := makeBuckets(m, tags, metricName, mf.GetType(), now) + metrics = append(metrics, telegrafMetrics...) + } else { + // standard metric + // reading fields + fields := make(map[string]interface{}) + fields = getNameAndValue(m, metricName) + // converting to telegraf metric + if len(fields) > 0 { + t := getTimestamp(m, now) + metric, err := metric.New("prometheus", tags, fields, t, ValueType(mf.GetType())) + if err == nil { + metrics = append(metrics, metric) + } + } + } + } + } + + return metrics, err +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line)) + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf("No metrics in line") + } + + if len(metrics) > 1 { + return nil, fmt.Errorf("More than one metric in line") + } + + return metrics[0], nil +} + +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} + +// Get Quantiles for summary metric & Buckets for histogram +func makeQuantiles(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric { + var metrics []telegraf.Metric + fields := make(map[string]interface{}) + t := getTimestamp(m, now) + + fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount()) + fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum()) + met, err := metric.New("prometheus", tags, fields, t, ValueType(metricType)) + if err == nil { + metrics = append(metrics, met) + } + + for _, q := range m.GetSummary().Quantile { + newTags := tags + fields = make(map[string]interface{}) + + newTags["quantile"] = fmt.Sprint(q.GetQuantile()) + fields[metricName] = float64(q.GetValue()) + + quantileMetric, err := metric.New("prometheus", newTags, fields, t, ValueType(metricType)) + if err == nil { + metrics = append(metrics, quantileMetric) + } + } + return metrics +} + +// Get Buckets from histogram metric +func makeBuckets(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric { + var metrics []telegraf.Metric + fields := make(map[string]interface{}) + t := getTimestamp(m, now) + + fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount()) + fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum()) + + met, err := metric.New("prometheus", tags, fields, t, ValueType(metricType)) + if err == nil { + metrics = append(metrics, met) + } + + for _, b := range m.GetHistogram().Bucket { + newTags := tags + fields = make(map[string]interface{}) + newTags["le"] = fmt.Sprint(b.GetUpperBound()) + fields[metricName+"_bucket"] = float64(b.GetCumulativeCount()) + + histogramMetric, err := metric.New("prometheus", newTags, fields, t, ValueType(metricType)) + if err == nil { + metrics = append(metrics, histogramMetric) + } + } + return metrics +} + +// Get name and value from metric +func getNameAndValue(m *dto.Metric, metricName string) map[string]interface{} { + fields := make(map[string]interface{}) + if m.Gauge != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields[metricName] = float64(m.GetGauge().GetValue()) + } + } else if m.Counter != nil { + if !math.IsNaN(m.GetCounter().GetValue()) { + fields[metricName] = float64(m.GetCounter().GetValue()) + } + } else if m.Untyped != nil { + if !math.IsNaN(m.GetUntyped().GetValue()) { + fields[metricName] = float64(m.GetUntyped().GetValue()) + } + } + return fields +} + +func getTimestamp(m *dto.Metric, now time.Time) time.Time { + var t time.Time + if m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, m.GetTimestampMs()*1000000) + } else { + t = now + } + return t +} diff --git a/plugins/parsers/prometheus/parser_test.go b/plugins/parsers/prometheus/parser_test.go new file mode 100644 index 000000000..74530ef1b --- /dev/null +++ b/plugins/parsers/prometheus/parser_test.go @@ -0,0 +1,346 @@ +package prometheus + +import ( + "fmt" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +const ( + validUniqueGauge = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. +# TYPE cadvisor_version_info gauge +cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 +` + validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +# TYPE get_token_fail_count counter +get_token_fail_count 0 +` + + validUniqueLine = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +` + + validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. +# TYPE http_request_duration_microseconds summary +http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 +http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 +http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 +http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 +http_request_duration_microseconds_count{handler="prometheus"} 9 +` + + validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. +# TYPE apiserver_request_latencies histogram +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 +apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 +apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 +` +) + +func TestParsingValidGauge(t *testing.T) { + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{ + "osVersion": "CentOS Linux 7 (Core)", + "cadvisorRevision": "", + "cadvisorVersion": "", + "dockerVersion": "1.8.2", + "kernelVersion": "3.10.0-229.20.1.el7.x86_64", + }, + map[string]interface{}{ + "cadvisor_version_info": float64(1), + }, + time.Unix(0, 0), + telegraf.Gauge, + ), + } + + metrics, err := parse([]byte(validUniqueGauge)) + + assert.NoError(t, err) + assert.Len(t, metrics, 1) + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) +} + +func TestParsingValieCounter(t *testing.T) { + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{}, + map[string]interface{}{ + "get_token_fail_count": float64(0), + }, + time.Unix(0, 0), + telegraf.Counter, + ), + } + + metrics, err := parse([]byte(validUniqueCounter)) + + assert.NoError(t, err) + assert.Len(t, metrics, 1) + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) +} + +func TestParsingValidSummary(t *testing.T) { + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{ + "handler": "prometheus", + }, + map[string]interface{}{ + "http_request_duration_microseconds_sum": float64(1.8909097205e+07), + "http_request_duration_microseconds_count": float64(9.0), + }, + time.Unix(0, 0), + telegraf.Summary, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "handler": "prometheus", + "quantile": "0.5", + }, + map[string]interface{}{ + "http_request_duration_microseconds": float64(552048.506), + }, + time.Unix(0, 0), + telegraf.Summary, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "handler": "prometheus", + "quantile": "0.9", + }, + map[string]interface{}{ + "http_request_duration_microseconds": float64(5.876804288e+06), + }, + time.Unix(0, 0), + telegraf.Summary, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "handler": "prometheus", + "quantile": "0.99", + }, + map[string]interface{}{ + "http_request_duration_microseconds": float64(5.876804288e+6), + }, + time.Unix(0, 0), + telegraf.Summary, + ), + } + + metrics, err := parse([]byte(validUniqueSummary)) + + assert.NoError(t, err) + assert.Len(t, metrics, 4) + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) +} + +func TestParsingValidHistogram(t *testing.T) { + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{ + "verb": "POST", + "resource": "bindings", + }, + map[string]interface{}{ + "apiserver_request_latencies_count": float64(2025.0), + "apiserver_request_latencies_sum": float64(1.02726334e+08), + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "verb": "POST", + "resource": "bindings", + "le": "125000", + }, + map[string]interface{}{ + "apiserver_request_latencies_bucket": float64(1994.0), + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "verb": "POST", + "resource": "bindings", + "le": "250000", + }, + map[string]interface{}{ + "apiserver_request_latencies_bucket": float64(1997.0), + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "verb": "POST", + "resource": "bindings", + "le": "500000", + }, + map[string]interface{}{ + "apiserver_request_latencies_bucket": float64(2000.0), + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "verb": "POST", + "resource": "bindings", + "le": "1e+06", + }, + map[string]interface{}{ + "apiserver_request_latencies_bucket": float64(2005.0), + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "verb": "POST", + "resource": "bindings", + "le": "2e+06", + }, + map[string]interface{}{ + "apiserver_request_latencies_bucket": float64(2012.0), + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "verb": "POST", + "resource": "bindings", + "le": "4e+06", + }, + map[string]interface{}{ + "apiserver_request_latencies_bucket": float64(2017.0), + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "verb": "POST", + "resource": "bindings", + "le": "8e+06", + }, + map[string]interface{}{ + "apiserver_request_latencies_bucket": float64(2024.0), + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "verb": "POST", + "resource": "bindings", + "le": "+Inf", + }, + map[string]interface{}{ + "apiserver_request_latencies_bucket": float64(2025.0), + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + } + + metrics, err := parse([]byte(validUniqueHistogram)) + + assert.NoError(t, err) + assert.Len(t, metrics, 9) + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) +} + +func TestDefautTags(t *testing.T) { + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{ + "osVersion": "CentOS Linux 7 (Core)", + "cadvisorRevision": "", + "cadvisorVersion": "", + "dockerVersion": "1.8.2", + "kernelVersion": "3.10.0-229.20.1.el7.x86_64", + "defaultTag": "defaultTagValue", + }, + map[string]interface{}{ + "cadvisor_version_info": float64(1), + }, + time.Unix(0, 0), + telegraf.Gauge, + ), + } + + parser := Parser{ + DefaultTags: map[string]string{ + "defaultTag": "defaultTagValue", + "dockerVersion": "to_be_overriden", + }, + } + metrics, err := parser.Parse([]byte(validUniqueGauge)) + + assert.NoError(t, err) + assert.Len(t, metrics, 1) + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) +} + +func TestMetricsWithTimestamp(t *testing.T) { + testTime := time.Date(2020, time.October, 4, 17, 0, 0, 0, time.UTC) + testTimeUnix := testTime.UnixNano() / int64(time.Millisecond) + metricsWithTimestamps := fmt.Sprintf(` +# TYPE test_counter counter +test_counter{label="test"} 1 %d +`, testTimeUnix) + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{ + "label": "test", + }, + map[string]interface{}{ + "test_counter": float64(1.0), + }, + testTime, + telegraf.Counter, + ), + } + + metrics, _ := parse([]byte(metricsWithTimestamps)) + + testutil.RequireMetricsEqual(t, expected, metrics, testutil.SortMetrics()) +} + +func parse(buf []byte) ([]telegraf.Metric, error) { + parser := Parser{} + return parser.Parse(buf) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 729ed048c..ac31a374d 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/plugins/parsers/nagios" + "github.com/influxdata/telegraf/plugins/parsers/prometheus" "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/plugins/parsers/wavefront" ) @@ -232,6 +233,8 @@ func NewParser(config *Config) (Parser, error) { config.DefaultTags, config.FormUrlencodedTagKeys, ) + case "prometheus": + parser, err = NewPrometheusParser(config.DefaultTags) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -339,3 +342,9 @@ func NewFormUrlencodedParser( TagKeys: tagKeys, }, nil } + +func NewPrometheusParser(defaultTags map[string]string) (Parser, error) { + return &prometheus.Parser{ + DefaultTags: defaultTags, + }, nil +}