feat(prometheus): add ignore_timestamp option (#9740)
This commit is contained in:
parent
014161cd0c
commit
d2a25456d5
|
|
@ -1593,7 +1593,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
|
|||
"json_string_fields", "json_time_format", "json_time_key", "json_timestamp_format", "json_timestamp_units", "json_timezone", "json_v2",
|
||||
"lvm", "metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix",
|
||||
"name_suffix", "namedrop", "namepass", "order", "pass", "period", "precision",
|
||||
"prefix", "prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
|
||||
"prefix", "prometheus_export_timestamp", "prometheus_ignore_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
|
||||
"separator", "splunkmetric_hec_routing", "splunkmetric_multimetric", "tag_keys",
|
||||
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "template", "templates",
|
||||
"value_field_name", "wavefront_source_override", "wavefront_use_strict",
|
||||
|
|
|
|||
|
|
@ -8305,6 +8305,10 @@
|
|||
# ## Url tag name (tag containing scrapped url. optional, default is "url")
|
||||
# # url_tag = "url"
|
||||
#
|
||||
# ## Whether the timestamp of the scraped metrics will be ignored.
|
||||
# ## If set to true, the gather time will be used.
|
||||
# # ignore_timestamp = false
|
||||
#
|
||||
# ## An array of Kubernetes services to scrape metrics from.
|
||||
# # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
|
||||
#
|
||||
|
|
|
|||
|
|
@ -23,6 +23,10 @@ in Prometheus format.
|
|||
## Url tag name (tag containing scrapped url. optional, default is "url")
|
||||
# url_tag = "url"
|
||||
|
||||
## Whether the timestamp of the scraped metrics will be ignored.
|
||||
## If set to true, the gather time will be used.
|
||||
# ignore_timestamp = false
|
||||
|
||||
## An array of Kubernetes services to scrape metrics from.
|
||||
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import (
|
|||
"github.com/prometheus/common/expfmt"
|
||||
)
|
||||
|
||||
func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
|
||||
func Parse(buf []byte, header http.Header, ignoreTimestamp bool) ([]telegraf.Metric, error) {
|
||||
var parser expfmt.TextParser
|
||||
var metrics []telegraf.Metric
|
||||
var err error
|
||||
|
|
@ -76,7 +76,7 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
|
|||
// converting to telegraf metric
|
||||
if len(fields) > 0 {
|
||||
var t time.Time
|
||||
if m.TimestampMs != nil && *m.TimestampMs > 0 {
|
||||
if !ignoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 {
|
||||
t = time.Unix(0, *m.TimestampMs*1000000)
|
||||
} else {
|
||||
t = now
|
||||
|
|
|
|||
|
|
@ -1,8 +1,10 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
|
@ -42,7 +44,7 @@ apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025
|
|||
|
||||
func TestParseValidPrometheus(t *testing.T) {
|
||||
// Gauge value
|
||||
metrics, err := Parse([]byte(validUniqueGauge), http.Header{})
|
||||
metrics, err := Parse([]byte(validUniqueGauge), http.Header{}, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 1)
|
||||
assert.Equal(t, "cadvisor_version_info", metrics[0].Name())
|
||||
|
|
@ -58,7 +60,7 @@ func TestParseValidPrometheus(t *testing.T) {
|
|||
}, metrics[0].Tags())
|
||||
|
||||
// Counter value
|
||||
metrics, err = Parse([]byte(validUniqueCounter), http.Header{})
|
||||
metrics, err = Parse([]byte(validUniqueCounter), http.Header{}, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 1)
|
||||
assert.Equal(t, "get_token_fail_count", metrics[0].Name())
|
||||
|
|
@ -69,7 +71,7 @@ func TestParseValidPrometheus(t *testing.T) {
|
|||
|
||||
// Summary data
|
||||
//SetDefaultTags(map[string]string{})
|
||||
metrics, err = Parse([]byte(validUniqueSummary), http.Header{})
|
||||
metrics, err = Parse([]byte(validUniqueSummary), http.Header{}, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 1)
|
||||
assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name())
|
||||
|
|
@ -83,7 +85,7 @@ func TestParseValidPrometheus(t *testing.T) {
|
|||
assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags())
|
||||
|
||||
// histogram data
|
||||
metrics, err = Parse([]byte(validUniqueHistogram), http.Header{})
|
||||
metrics, err = Parse([]byte(validUniqueHistogram), http.Header{}, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 1)
|
||||
assert.Equal(t, "apiserver_request_latencies", metrics[0].Name())
|
||||
|
|
@ -103,3 +105,38 @@ func TestParseValidPrometheus(t *testing.T) {
|
|||
map[string]string{"verb": "POST", "resource": "bindings"},
|
||||
metrics[0].Tags())
|
||||
}
|
||||
|
||||
func TestMetricsWithTimestamp(t *testing.T) {
|
||||
testTime := time.Date(2020, time.October, 4, 17, 0, 0, 0, time.UTC)
|
||||
testTimeUnix := testTime.UnixNano() / int64(time.Millisecond)
|
||||
metricsWithTimestamps := fmt.Sprintf(`
|
||||
# TYPE test_counter counter
|
||||
test_counter{label="test"} 1 %d
|
||||
`, testTimeUnix)
|
||||
|
||||
// IgnoreTimestamp is false
|
||||
metrics, err := Parse([]byte(metricsWithTimestamps), http.Header{}, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 1)
|
||||
assert.Equal(t, "test_counter", metrics[0].Name())
|
||||
assert.Equal(t, map[string]interface{}{
|
||||
"counter": float64(1),
|
||||
}, metrics[0].Fields())
|
||||
assert.Equal(t, map[string]string{
|
||||
"label": "test",
|
||||
}, metrics[0].Tags())
|
||||
assert.Equal(t, testTime, metrics[0].Time().UTC())
|
||||
|
||||
// IgnoreTimestamp is true
|
||||
metrics, err = Parse([]byte(metricsWithTimestamps), http.Header{}, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 1)
|
||||
assert.Equal(t, "test_counter", metrics[0].Name())
|
||||
assert.Equal(t, map[string]interface{}{
|
||||
"counter": float64(1),
|
||||
}, metrics[0].Fields())
|
||||
assert.Equal(t, map[string]string{
|
||||
"label": "test",
|
||||
}, metrics[0].Tags())
|
||||
assert.WithinDuration(t, time.Now(), metrics[0].Time().UTC(), 5*time.Second)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -58,6 +58,8 @@ type Prometheus struct {
|
|||
|
||||
URLTag string `toml:"url_tag"`
|
||||
|
||||
IgnoreTimestamp bool `toml:"ignore_timestamp"`
|
||||
|
||||
tls.ClientConfig
|
||||
|
||||
Log telegraf.Logger
|
||||
|
|
@ -101,6 +103,10 @@ var sampleConfig = `
|
|||
## Url tag name (tag containing scrapped url. optional, default is "url")
|
||||
# url_tag = "url"
|
||||
|
||||
## Whether the timestamp of the scraped metrics will be ignored.
|
||||
## If set to true, the gather time will be used.
|
||||
# ignore_timestamp = false
|
||||
|
||||
## An array of Kubernetes services to scrape metrics from.
|
||||
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
|
||||
|
||||
|
|
@ -414,10 +420,13 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
|
|||
}
|
||||
|
||||
if p.MetricVersion == 2 {
|
||||
parser := parser_v2.Parser{Header: resp.Header}
|
||||
parser := parser_v2.Parser{
|
||||
Header: resp.Header,
|
||||
IgnoreTimestamp: p.IgnoreTimestamp,
|
||||
}
|
||||
metrics, err = parser.Parse(body)
|
||||
} else {
|
||||
metrics, err = Parse(body, resp.Header)
|
||||
metrics, err = Parse(body, resp.Header, p.IgnoreTimestamp)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -242,6 +242,29 @@ func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) {
|
|||
assert.True(t, acc.HasTimestamp("prometheus", time.Unix(1490802350, 0)))
|
||||
}
|
||||
|
||||
func TestPrometheusGeneratesMetricsWithIgnoreTimestamp(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, err := fmt.Fprintln(w, sampleTextFormat)
|
||||
require.NoError(t, err)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
p := &Prometheus{
|
||||
Log: testutil.Logger{},
|
||||
URLs: []string{ts.URL},
|
||||
URLTag: "url",
|
||||
IgnoreTimestamp: true,
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := acc.GatherError(p.Gather)
|
||||
require.NoError(t, err)
|
||||
|
||||
m, _ := acc.Get("test_metric")
|
||||
assert.WithinDuration(t, time.Now(), m.Time, 5*time.Second)
|
||||
}
|
||||
|
||||
func TestUnsupportedFieldSelector(t *testing.T) {
|
||||
fieldSelectorString := "spec.containerName=container"
|
||||
prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString}
|
||||
|
|
|
|||
|
|
@ -21,8 +21,9 @@ import (
|
|||
)
|
||||
|
||||
type Parser struct {
|
||||
DefaultTags map[string]string
|
||||
Header http.Header
|
||||
DefaultTags map[string]string
|
||||
Header http.Header
|
||||
IgnoreTimestamp bool
|
||||
}
|
||||
|
||||
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
|
|
@ -65,14 +66,15 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
|||
for _, m := range mf.Metric {
|
||||
// reading tags
|
||||
tags := common.MakeLabels(m, p.DefaultTags)
|
||||
t := p.GetTimestamp(m, now)
|
||||
|
||||
if mf.GetType() == dto.MetricType_SUMMARY {
|
||||
// summary metric
|
||||
telegrafMetrics := makeQuantiles(m, tags, metricName, mf.GetType(), now)
|
||||
telegrafMetrics := makeQuantiles(m, tags, metricName, mf.GetType(), t)
|
||||
metrics = append(metrics, telegrafMetrics...)
|
||||
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
|
||||
// histogram metric
|
||||
telegrafMetrics := makeBuckets(m, tags, metricName, mf.GetType(), now)
|
||||
telegrafMetrics := makeBuckets(m, tags, metricName, mf.GetType(), t)
|
||||
metrics = append(metrics, telegrafMetrics...)
|
||||
} else {
|
||||
// standard metric
|
||||
|
|
@ -80,7 +82,6 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
|||
fields := getNameAndValue(m, metricName)
|
||||
// converting to telegraf metric
|
||||
if len(fields) > 0 {
|
||||
t := getTimestamp(m, now)
|
||||
m := metric.New("prometheus", tags, fields, t, common.ValueType(mf.GetType()))
|
||||
metrics = append(metrics, m)
|
||||
}
|
||||
|
|
@ -113,10 +114,9 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
|
|||
}
|
||||
|
||||
// Get Quantiles for summary metric & Buckets for histogram
|
||||
func makeQuantiles(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric {
|
||||
func makeQuantiles(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric {
|
||||
var metrics []telegraf.Metric
|
||||
fields := make(map[string]interface{})
|
||||
t := getTimestamp(m, now)
|
||||
|
||||
fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount())
|
||||
fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum())
|
||||
|
|
@ -137,10 +137,9 @@ func makeQuantiles(m *dto.Metric, tags map[string]string, metricName string, met
|
|||
}
|
||||
|
||||
// Get Buckets from histogram metric
|
||||
func makeBuckets(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric {
|
||||
func makeBuckets(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric {
|
||||
var metrics []telegraf.Metric
|
||||
fields := make(map[string]interface{})
|
||||
t := getTimestamp(m, now)
|
||||
|
||||
fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount())
|
||||
fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum())
|
||||
|
|
@ -179,9 +178,9 @@ func getNameAndValue(m *dto.Metric, metricName string) map[string]interface{} {
|
|||
return fields
|
||||
}
|
||||
|
||||
func getTimestamp(m *dto.Metric, now time.Time) time.Time {
|
||||
func (p *Parser) GetTimestamp(m *dto.Metric, now time.Time) time.Time {
|
||||
var t time.Time
|
||||
if m.TimestampMs != nil && *m.TimestampMs > 0 {
|
||||
if !p.IgnoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 {
|
||||
t = time.Unix(0, m.GetTimestampMs()*1000000)
|
||||
} else {
|
||||
t = now
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ func TestParsingValidGauge(t *testing.T) {
|
|||
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics())
|
||||
}
|
||||
|
||||
func TestParsingValieCounter(t *testing.T) {
|
||||
func TestParsingValidCounter(t *testing.T) {
|
||||
expected := []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
|
|
@ -340,6 +340,32 @@ test_counter{label="test"} 1 %d
|
|||
testutil.RequireMetricsEqual(t, expected, metrics, testutil.SortMetrics())
|
||||
}
|
||||
|
||||
func TestMetricsWithoutIgnoreTimestamp(t *testing.T) {
|
||||
testTime := time.Date(2020, time.October, 4, 17, 0, 0, 0, time.UTC)
|
||||
testTimeUnix := testTime.UnixNano() / int64(time.Millisecond)
|
||||
metricsWithTimestamps := fmt.Sprintf(`
|
||||
# TYPE test_counter counter
|
||||
test_counter{label="test"} 1 %d
|
||||
`, testTimeUnix)
|
||||
expected := testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{
|
||||
"label": "test",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"test_counter": float64(1.0),
|
||||
},
|
||||
testTime,
|
||||
telegraf.Counter,
|
||||
)
|
||||
|
||||
parser := Parser{IgnoreTimestamp: true}
|
||||
metric, _ := parser.ParseLine(metricsWithTimestamps)
|
||||
|
||||
testutil.RequireMetricEqual(t, expected, metric, testutil.IgnoreTime(), testutil.SortMetrics())
|
||||
assert.WithinDuration(t, time.Now(), metric.Time(), 5*time.Second)
|
||||
}
|
||||
|
||||
func parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
parser := Parser{}
|
||||
return parser.Parse(buf)
|
||||
|
|
|
|||
|
|
@ -156,6 +156,9 @@ type Config struct {
|
|||
// FormData configuration
|
||||
FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"`
|
||||
|
||||
// Prometheus configuration
|
||||
PrometheusIgnoreTimestamp bool `toml:"prometheus_ignore_timestamp"`
|
||||
|
||||
// Value configuration
|
||||
ValueFieldName string `toml:"value_field_name"`
|
||||
|
||||
|
|
@ -259,7 +262,10 @@ func NewParser(config *Config) (Parser, error) {
|
|||
config.FormUrlencodedTagKeys,
|
||||
)
|
||||
case "prometheus":
|
||||
parser, err = NewPrometheusParser(config.DefaultTags)
|
||||
parser, err = NewPrometheusParser(
|
||||
config.DefaultTags,
|
||||
config.PrometheusIgnoreTimestamp,
|
||||
)
|
||||
case "prometheusremotewrite":
|
||||
parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags)
|
||||
case "xml", "xpath_json", "xpath_msgpack", "xpath_protobuf":
|
||||
|
|
@ -378,9 +384,10 @@ func NewFormUrlencodedParser(
|
|||
}, nil
|
||||
}
|
||||
|
||||
func NewPrometheusParser(defaultTags map[string]string) (Parser, error) {
|
||||
func NewPrometheusParser(defaultTags map[string]string, ignoreTimestamp bool) (Parser, error) {
|
||||
return &prometheus.Parser{
|
||||
DefaultTags: defaultTags,
|
||||
DefaultTags: defaultTags,
|
||||
IgnoreTimestamp: ignoreTimestamp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue