diff --git a/config/config.go b/config/config.go index 76aa494c4..d6081aedc 100644 --- a/config/config.go +++ b/config/config.go @@ -1593,7 +1593,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { "json_string_fields", "json_time_format", "json_time_key", "json_timestamp_format", "json_timestamp_units", "json_timezone", "json_v2", "lvm", "metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix", "name_suffix", "namedrop", "namepass", "order", "pass", "period", "precision", - "prefix", "prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label", + "prefix", "prometheus_export_timestamp", "prometheus_ignore_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label", "separator", "splunkmetric_hec_routing", "splunkmetric_multimetric", "tag_keys", "tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "template", "templates", "value_field_name", "wavefront_source_override", "wavefront_use_strict", diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 2892d9914..12672da45 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -8305,6 +8305,10 @@ # ## Url tag name (tag containing scrapped url. optional, default is "url") # # url_tag = "url" # +# ## Whether the timestamp of the scraped metrics will be ignored. +# ## If set to true, the gather time will be used. +# # ignore_timestamp = false +# # ## An array of Kubernetes services to scrape metrics from. # # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] # diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 955c6ab7d..fe6d3a8e8 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -23,6 +23,10 @@ in Prometheus format. ## Url tag name (tag containing scrapped url. optional, default is "url") # url_tag = "url" + ## Whether the timestamp of the scraped metrics will be ignored. + ## If set to true, the gather time will be used. + # ignore_timestamp = false + ## An array of Kubernetes services to scrape metrics from. # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go index 7d3140dc7..dfe5cc474 100644 --- a/plugins/inputs/prometheus/parser.go +++ b/plugins/inputs/prometheus/parser.go @@ -19,7 +19,7 @@ import ( "github.com/prometheus/common/expfmt" ) -func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { +func Parse(buf []byte, header http.Header, ignoreTimestamp bool) ([]telegraf.Metric, error) { var parser expfmt.TextParser var metrics []telegraf.Metric var err error @@ -76,7 +76,7 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { // converting to telegraf metric if len(fields) > 0 { var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { + if !ignoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 { t = time.Unix(0, *m.TimestampMs*1000000) } else { t = now diff --git a/plugins/inputs/prometheus/parser_test.go b/plugins/inputs/prometheus/parser_test.go index 293e1968d..ffd596745 100644 --- a/plugins/inputs/prometheus/parser_test.go +++ b/plugins/inputs/prometheus/parser_test.go @@ -1,8 +1,10 @@ package prometheus import ( + "fmt" "net/http" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -42,7 +44,7 @@ apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 func TestParseValidPrometheus(t *testing.T) { // Gauge value - metrics, err := Parse([]byte(validUniqueGauge), http.Header{}) + metrics, err := Parse([]byte(validUniqueGauge), http.Header{}, false) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "cadvisor_version_info", metrics[0].Name()) @@ -58,7 +60,7 @@ func TestParseValidPrometheus(t *testing.T) { }, metrics[0].Tags()) // Counter value - metrics, err = Parse([]byte(validUniqueCounter), http.Header{}) + metrics, err = Parse([]byte(validUniqueCounter), http.Header{}, false) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "get_token_fail_count", metrics[0].Name()) @@ -69,7 +71,7 @@ func TestParseValidPrometheus(t *testing.T) { // Summary data //SetDefaultTags(map[string]string{}) - metrics, err = Parse([]byte(validUniqueSummary), http.Header{}) + metrics, err = Parse([]byte(validUniqueSummary), http.Header{}, false) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) @@ -83,7 +85,7 @@ func TestParseValidPrometheus(t *testing.T) { assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) // histogram data - metrics, err = Parse([]byte(validUniqueHistogram), http.Header{}) + metrics, err = Parse([]byte(validUniqueHistogram), http.Header{}, false) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "apiserver_request_latencies", metrics[0].Name()) @@ -103,3 +105,38 @@ func TestParseValidPrometheus(t *testing.T) { map[string]string{"verb": "POST", "resource": "bindings"}, metrics[0].Tags()) } + +func TestMetricsWithTimestamp(t *testing.T) { + testTime := time.Date(2020, time.October, 4, 17, 0, 0, 0, time.UTC) + testTimeUnix := testTime.UnixNano() / int64(time.Millisecond) + metricsWithTimestamps := fmt.Sprintf(` +# TYPE test_counter counter +test_counter{label="test"} 1 %d +`, testTimeUnix) + + // IgnoreTimestamp is false + metrics, err := Parse([]byte(metricsWithTimestamps), http.Header{}, false) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "test_counter", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "counter": float64(1), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "label": "test", + }, metrics[0].Tags()) + assert.Equal(t, testTime, metrics[0].Time().UTC()) + + // IgnoreTimestamp is true + metrics, err = Parse([]byte(metricsWithTimestamps), http.Header{}, true) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "test_counter", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "counter": float64(1), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "label": "test", + }, metrics[0].Tags()) + assert.WithinDuration(t, time.Now(), metrics[0].Time().UTC(), 5*time.Second) +} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 136e8ae0f..18cbf6c8b 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -58,6 +58,8 @@ type Prometheus struct { URLTag string `toml:"url_tag"` + IgnoreTimestamp bool `toml:"ignore_timestamp"` + tls.ClientConfig Log telegraf.Logger @@ -101,6 +103,10 @@ var sampleConfig = ` ## Url tag name (tag containing scrapped url. optional, default is "url") # url_tag = "url" + ## Whether the timestamp of the scraped metrics will be ignored. + ## If set to true, the gather time will be used. + # ignore_timestamp = false + ## An array of Kubernetes services to scrape metrics from. # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] @@ -414,10 +420,13 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error } if p.MetricVersion == 2 { - parser := parser_v2.Parser{Header: resp.Header} + parser := parser_v2.Parser{ + Header: resp.Header, + IgnoreTimestamp: p.IgnoreTimestamp, + } metrics, err = parser.Parse(body) } else { - metrics, err = Parse(body, resp.Header) + metrics, err = Parse(body, resp.Header, p.IgnoreTimestamp) } if err != nil { diff --git a/plugins/inputs/prometheus/prometheus_test.go b/plugins/inputs/prometheus/prometheus_test.go index ea8ca0e93..11117e05b 100644 --- a/plugins/inputs/prometheus/prometheus_test.go +++ b/plugins/inputs/prometheus/prometheus_test.go @@ -242,6 +242,29 @@ func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) { assert.True(t, acc.HasTimestamp("prometheus", time.Unix(1490802350, 0))) } +func TestPrometheusGeneratesMetricsWithIgnoreTimestamp(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := fmt.Fprintln(w, sampleTextFormat) + require.NoError(t, err) + })) + defer ts.Close() + + p := &Prometheus{ + Log: testutil.Logger{}, + URLs: []string{ts.URL}, + URLTag: "url", + IgnoreTimestamp: true, + } + + var acc testutil.Accumulator + + err := acc.GatherError(p.Gather) + require.NoError(t, err) + + m, _ := acc.Get("test_metric") + assert.WithinDuration(t, time.Now(), m.Time, 5*time.Second) +} + func TestUnsupportedFieldSelector(t *testing.T) { fieldSelectorString := "spec.containerName=container" prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString} diff --git a/plugins/parsers/prometheus/parser.go b/plugins/parsers/prometheus/parser.go index e55789f79..bc7ea0c63 100644 --- a/plugins/parsers/prometheus/parser.go +++ b/plugins/parsers/prometheus/parser.go @@ -21,8 +21,9 @@ import ( ) type Parser struct { - DefaultTags map[string]string - Header http.Header + DefaultTags map[string]string + Header http.Header + IgnoreTimestamp bool } func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { @@ -65,14 +66,15 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { for _, m := range mf.Metric { // reading tags tags := common.MakeLabels(m, p.DefaultTags) + t := p.GetTimestamp(m, now) if mf.GetType() == dto.MetricType_SUMMARY { // summary metric - telegrafMetrics := makeQuantiles(m, tags, metricName, mf.GetType(), now) + telegrafMetrics := makeQuantiles(m, tags, metricName, mf.GetType(), t) metrics = append(metrics, telegrafMetrics...) } else if mf.GetType() == dto.MetricType_HISTOGRAM { // histogram metric - telegrafMetrics := makeBuckets(m, tags, metricName, mf.GetType(), now) + telegrafMetrics := makeBuckets(m, tags, metricName, mf.GetType(), t) metrics = append(metrics, telegrafMetrics...) } else { // standard metric @@ -80,7 +82,6 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { fields := getNameAndValue(m, metricName) // converting to telegraf metric if len(fields) > 0 { - t := getTimestamp(m, now) m := metric.New("prometheus", tags, fields, t, common.ValueType(mf.GetType())) metrics = append(metrics, m) } @@ -113,10 +114,9 @@ func (p *Parser) SetDefaultTags(tags map[string]string) { } // Get Quantiles for summary metric & Buckets for histogram -func makeQuantiles(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric { +func makeQuantiles(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric { var metrics []telegraf.Metric fields := make(map[string]interface{}) - t := getTimestamp(m, now) fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount()) fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum()) @@ -137,10 +137,9 @@ func makeQuantiles(m *dto.Metric, tags map[string]string, metricName string, met } // Get Buckets from histogram metric -func makeBuckets(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric { +func makeBuckets(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric { var metrics []telegraf.Metric fields := make(map[string]interface{}) - t := getTimestamp(m, now) fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount()) fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum()) @@ -179,9 +178,9 @@ func getNameAndValue(m *dto.Metric, metricName string) map[string]interface{} { return fields } -func getTimestamp(m *dto.Metric, now time.Time) time.Time { +func (p *Parser) GetTimestamp(m *dto.Metric, now time.Time) time.Time { var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { + if !p.IgnoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 { t = time.Unix(0, m.GetTimestampMs()*1000000) } else { t = now diff --git a/plugins/parsers/prometheus/parser_test.go b/plugins/parsers/prometheus/parser_test.go index a403887e0..52ef2f5a3 100644 --- a/plugins/parsers/prometheus/parser_test.go +++ b/plugins/parsers/prometheus/parser_test.go @@ -74,7 +74,7 @@ func TestParsingValidGauge(t *testing.T) { testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) } -func TestParsingValieCounter(t *testing.T) { +func TestParsingValidCounter(t *testing.T) { expected := []telegraf.Metric{ testutil.MustMetric( "prometheus", @@ -340,6 +340,32 @@ test_counter{label="test"} 1 %d testutil.RequireMetricsEqual(t, expected, metrics, testutil.SortMetrics()) } +func TestMetricsWithoutIgnoreTimestamp(t *testing.T) { + testTime := time.Date(2020, time.October, 4, 17, 0, 0, 0, time.UTC) + testTimeUnix := testTime.UnixNano() / int64(time.Millisecond) + metricsWithTimestamps := fmt.Sprintf(` +# TYPE test_counter counter +test_counter{label="test"} 1 %d +`, testTimeUnix) + expected := testutil.MustMetric( + "prometheus", + map[string]string{ + "label": "test", + }, + map[string]interface{}{ + "test_counter": float64(1.0), + }, + testTime, + telegraf.Counter, + ) + + parser := Parser{IgnoreTimestamp: true} + metric, _ := parser.ParseLine(metricsWithTimestamps) + + testutil.RequireMetricEqual(t, expected, metric, testutil.IgnoreTime(), testutil.SortMetrics()) + assert.WithinDuration(t, time.Now(), metric.Time(), 5*time.Second) +} + func parse(buf []byte) ([]telegraf.Metric, error) { parser := Parser{} return parser.Parse(buf) diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index f07c789a2..fcdfc473a 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -156,6 +156,9 @@ type Config struct { // FormData configuration FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"` + // Prometheus configuration + PrometheusIgnoreTimestamp bool `toml:"prometheus_ignore_timestamp"` + // Value configuration ValueFieldName string `toml:"value_field_name"` @@ -259,7 +262,10 @@ func NewParser(config *Config) (Parser, error) { config.FormUrlencodedTagKeys, ) case "prometheus": - parser, err = NewPrometheusParser(config.DefaultTags) + parser, err = NewPrometheusParser( + config.DefaultTags, + config.PrometheusIgnoreTimestamp, + ) case "prometheusremotewrite": parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags) case "xml", "xpath_json", "xpath_msgpack", "xpath_protobuf": @@ -378,9 +384,10 @@ func NewFormUrlencodedParser( }, nil } -func NewPrometheusParser(defaultTags map[string]string) (Parser, error) { +func NewPrometheusParser(defaultTags map[string]string, ignoreTimestamp bool) (Parser, error) { return &prometheus.Parser{ - DefaultTags: defaultTags, + DefaultTags: defaultTags, + IgnoreTimestamp: ignoreTimestamp, }, nil }