Make prometheus serializer update timestamps and expiration time as new data arrives (#9139)

This commit is contained in:
Jake McCrary 2021-09-02 09:56:45 -05:00 committed by GitHub
parent 167b6e0075
commit 514a942a6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 428 additions and 1 deletions

View File

@ -8,7 +8,11 @@ use the `metric_version = 2` option in order to properly round trip metrics.
not be correct if the metric spans multiple batches. This issue can be
somewhat, but not fully, mitigated by using outputs that support writing in
"batch format". When using histogram and summary types, it is recommended to
use only the `prometheus_client` output.
use only the `prometheus_client` output. Histogram and Summary types
also update their expiration time based on the most recently received data.
If incoming metrics stop updating specific buckets or quantiles but continue
reporting others every bucket/quantile will continue to exist.
### Configuration

View File

@ -241,6 +241,9 @@ func (c *Collection) Add(metric telegraf.Metric, now time.Time) {
AddTime: now,
Histogram: &Histogram{},
}
} else {
m.Time = metric.Time()
m.AddTime = now
}
switch {
case strings.HasSuffix(field.Key, "_bucket"):
@ -289,6 +292,9 @@ func (c *Collection) Add(metric telegraf.Metric, now time.Time) {
AddTime: now,
Summary: &Summary{},
}
} else {
m.Time = metric.Time()
m.AddTime = now
}
switch {
case strings.HasSuffix(field.Key, "_sum"):

View File

@ -302,6 +302,117 @@ func TestCollectionExpire(t *testing.T) {
},
},
},
{
name: "entire histogram expires",
now: time.Unix(20, 0),
age: 10 * time.Second,
input: []Input{
{
metric: testutil.MustMetric(
"prometheus",
map[string]string{},
map[string]interface{}{
"http_request_duration_seconds_sum": 10.0,
"http_request_duration_seconds_count": 2,
},
time.Unix(0, 0),
telegraf.Histogram,
),
addtime: time.Unix(0, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"le": "0.05"},
map[string]interface{}{
"http_request_duration_seconds_bucket": 1.0,
},
time.Unix(0, 0),
telegraf.Histogram,
),
addtime: time.Unix(0, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"le": "+Inf"},
map[string]interface{}{
"http_request_duration_seconds_bucket": 1.0,
},
time.Unix(0, 0),
telegraf.Histogram,
),
addtime: time.Unix(0, 0),
},
},
expected: []*dto.MetricFamily{},
},
{
name: "histogram does not expire because of addtime from bucket",
now: time.Unix(20, 0),
age: 10 * time.Second,
input: []Input{
{
metric: testutil.MustMetric(
"prometheus",
map[string]string{"le": "+Inf"},
map[string]interface{}{
"http_request_duration_seconds_bucket": 1.0,
},
time.Unix(0, 0),
telegraf.Histogram,
),
addtime: time.Unix(0, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{},
map[string]interface{}{
"http_request_duration_seconds_sum": 10.0,
"http_request_duration_seconds_count": 2,
},
time.Unix(0, 0),
telegraf.Histogram,
),
addtime: time.Unix(0, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"le": "0.05"},
map[string]interface{}{
"http_request_duration_seconds_bucket": 1.0,
},
time.Unix(0, 0),
telegraf.Histogram,
),
addtime: time.Unix(15, 0), // More recent addtime causes entire metric to stay valid
},
},
expected: []*dto.MetricFamily{
{
Name: proto.String("http_request_duration_seconds"),
Help: proto.String(helpString),
Type: dto.MetricType_HISTOGRAM.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{},
Histogram: &dto.Histogram{
SampleCount: proto.Uint64(2),
SampleSum: proto.Float64(10.0),
Bucket: []*dto.Bucket{
{
UpperBound: proto.Float64(math.Inf(1)),
CumulativeCount: proto.Uint64(1),
},
{
UpperBound: proto.Float64(0.05),
CumulativeCount: proto.Uint64(1),
},
},
},
},
},
},
},
},
{
name: "summary quantile updates",
now: time.Unix(0, 0),
@ -379,6 +490,106 @@ func TestCollectionExpire(t *testing.T) {
},
},
},
{
name: "Entire summary expires",
now: time.Unix(20, 0),
age: 10 * time.Second,
input: []Input{
{
metric: testutil.MustMetric(
"prometheus",
map[string]string{},
map[string]interface{}{
"rpc_duration_seconds_sum": 1.0,
"rpc_duration_seconds_count": 1,
},
time.Unix(0, 0),
telegraf.Summary,
),
addtime: time.Unix(0, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"quantile": "0.01"},
map[string]interface{}{
"rpc_duration_seconds": 1.0,
},
time.Unix(0, 0),
telegraf.Summary,
),
addtime: time.Unix(0, 0),
},
},
expected: []*dto.MetricFamily{},
},
{
name: "summary does not expire because of quantile addtime",
now: time.Unix(20, 0),
age: 10 * time.Second,
input: []Input{
{
metric: testutil.MustMetric(
"prometheus",
map[string]string{},
map[string]interface{}{
"rpc_duration_seconds_sum": 1.0,
"rpc_duration_seconds_count": 1,
},
time.Unix(0, 0),
telegraf.Summary,
),
addtime: time.Unix(0, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"quantile": "0.5"},
map[string]interface{}{
"rpc_duration_seconds": 10.0,
},
time.Unix(0, 0),
telegraf.Summary,
),
addtime: time.Unix(0, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"quantile": "0.01"},
map[string]interface{}{
"rpc_duration_seconds": 1.0,
},
time.Unix(0, 0),
telegraf.Summary,
),
addtime: time.Unix(15, 0), // Recent addtime keeps entire metric around
},
},
expected: []*dto.MetricFamily{
{
Name: proto.String("rpc_duration_seconds"),
Help: proto.String(helpString),
Type: dto.MetricType_SUMMARY.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{},
Summary: &dto.Summary{
SampleSum: proto.Float64(1),
SampleCount: proto.Uint64(1),
Quantile: []*dto.Quantile{
{
Quantile: proto.Float64(0.5),
Value: proto.Float64(10),
},
{
Quantile: proto.Float64(0.01),
Value: proto.Float64(1),
},
},
},
},
},
},
},
},
{
name: "expire based on add time",
now: time.Unix(20, 0),
@ -425,3 +636,209 @@ func TestCollectionExpire(t *testing.T) {
})
}
}
func TestExportTimestamps(t *testing.T) {
tests := []struct {
name string
now time.Time
age time.Duration
input []Input
expected []*dto.MetricFamily
}{
{
name: "histogram bucket updates",
now: time.Unix(23, 0),
age: 10 * time.Second,
input: []Input{
{
metric: testutil.MustMetric(
"prometheus",
map[string]string{},
map[string]interface{}{
"http_request_duration_seconds_sum": 10.0,
"http_request_duration_seconds_count": 2,
},
time.Unix(15, 0),
telegraf.Histogram,
),
addtime: time.Unix(23, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"le": "0.05"},
map[string]interface{}{
"http_request_duration_seconds_bucket": 1.0,
},
time.Unix(15, 0),
telegraf.Histogram,
),
addtime: time.Unix(23, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"le": "+Inf"},
map[string]interface{}{
"http_request_duration_seconds_bucket": 1.0,
},
time.Unix(15, 0),
telegraf.Histogram,
),
addtime: time.Unix(23, 0),
}, {
// Next interval
metric: testutil.MustMetric(
"prometheus",
map[string]string{},
map[string]interface{}{
"http_request_duration_seconds_sum": 20.0,
"http_request_duration_seconds_count": 4,
},
time.Unix(20, 0), // Updated timestamp
telegraf.Histogram,
),
addtime: time.Unix(23, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"le": "0.05"},
map[string]interface{}{
"http_request_duration_seconds_bucket": 2.0,
},
time.Unix(20, 0), // Updated timestamp
telegraf.Histogram,
),
addtime: time.Unix(23, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"le": "+Inf"},
map[string]interface{}{
"http_request_duration_seconds_bucket": 2.0,
},
time.Unix(20, 0), // Updated timestamp
telegraf.Histogram,
),
addtime: time.Unix(23, 0),
},
},
expected: []*dto.MetricFamily{
{
Name: proto.String("http_request_duration_seconds"),
Help: proto.String(helpString),
Type: dto.MetricType_HISTOGRAM.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{},
TimestampMs: proto.Int64(time.Unix(20, 0).UnixNano() / int64(time.Millisecond)),
Histogram: &dto.Histogram{
SampleCount: proto.Uint64(4),
SampleSum: proto.Float64(20.0),
Bucket: []*dto.Bucket{
{
UpperBound: proto.Float64(0.05),
CumulativeCount: proto.Uint64(2),
},
{
UpperBound: proto.Float64(math.Inf(1)),
CumulativeCount: proto.Uint64(2),
},
},
},
},
},
},
},
},
{
name: "summary quantile updates",
now: time.Unix(23, 0),
age: 10 * time.Second,
input: []Input{
{
metric: testutil.MustMetric(
"prometheus",
map[string]string{},
map[string]interface{}{
"rpc_duration_seconds_sum": 1.0,
"rpc_duration_seconds_count": 1,
},
time.Unix(15, 0),
telegraf.Summary,
),
addtime: time.Unix(23, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"quantile": "0.01"},
map[string]interface{}{
"rpc_duration_seconds": 1.0,
},
time.Unix(15, 0),
telegraf.Summary,
),
addtime: time.Unix(23, 0),
}, {
// Updated Summary
metric: testutil.MustMetric(
"prometheus",
map[string]string{},
map[string]interface{}{
"rpc_duration_seconds_sum": 2.0,
"rpc_duration_seconds_count": 2,
},
time.Unix(20, 0), // Updated timestamp
telegraf.Summary,
),
addtime: time.Unix(23, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"quantile": "0.01"},
map[string]interface{}{
"rpc_duration_seconds": 2.0,
},
time.Unix(20, 0), // Updated timestamp
telegraf.Summary,
),
addtime: time.Unix(23, 0),
},
},
expected: []*dto.MetricFamily{
{
Name: proto.String("rpc_duration_seconds"),
Help: proto.String(helpString),
Type: dto.MetricType_SUMMARY.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{},
TimestampMs: proto.Int64(time.Unix(20, 0).UnixNano() / int64(time.Millisecond)),
Summary: &dto.Summary{
SampleCount: proto.Uint64(2),
SampleSum: proto.Float64(2.0),
Quantile: []*dto.Quantile{
{
Quantile: proto.Float64(0.01),
Value: proto.Float64(2),
},
},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := NewCollection(FormatConfig{TimestampExport: ExportTimestamp})
for _, item := range tt.input {
c.Add(item.metric, item.addtime)
}
c.Expire(tt.now, tt.age)
actual := c.GetProto()
require.Equal(t, tt.expected, actual)
})
}
}