From 92d1b0efcf268d6f9a829b48d460a4acf8520f6b Mon Sep 17 00:00:00 2001 From: Eugene Komarov Date: Fri, 25 Feb 2022 04:04:58 +0600 Subject: [PATCH] fix: add push only updated values flag to histogram aggregator (#10515) --- plugins/aggregators/histogram/README.md | 4 ++ plugins/aggregators/histogram/histogram.go | 12 ++++ .../aggregators/histogram/histogram_test.go | 59 +++++++++++++++---- 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/plugins/aggregators/histogram/README.md b/plugins/aggregators/histogram/README.md index aad2eb431..2eeb33fc6 100644 --- a/plugins/aggregators/histogram/README.md +++ b/plugins/aggregators/histogram/README.md @@ -48,6 +48,10 @@ of the algorithm which is implemented in the Prometheus ## there are no changes in any buckets for this time interval. 0 == no expiration. # expiration_interval = "0m" + ## If true, aggregated histogram are pushed to output only if it was updated since + ## previous push. Defaults to false. + # push_only_on_update = false + ## Example config that aggregates all fields of the metric. # [[aggregators.histogram.config]] # ## Right borders of buckets (with +Inf implicitly added). diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index f4ffff5ff..a78e25d7f 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -28,6 +28,7 @@ type HistogramAggregator struct { ResetBuckets bool `toml:"reset"` Cumulative bool `toml:"cumulative"` ExpirationInterval telegrafConfig.Duration `toml:"expiration_interval"` + PushOnlyOnUpdate bool `toml:"push_only_on_update"` buckets bucketsByMetrics cache map[uint64]metricHistogramCollection @@ -55,6 +56,7 @@ type metricHistogramCollection struct { name string tags map[string]string expireTime time.Time + updated bool } // counts is the number of hits in the bucket @@ -100,6 +102,10 @@ var sampleConfig = ` ## there are no changes in any buckets for this time interval. 0 == no expiration. # expiration_interval = "0m" + ## If true, aggregated histogram are pushed to output only if it was updated since + ## previous push. Defaults to false. + # push_only_on_update = false + ## Example config that aggregates all fields of the metric. # [[aggregators.histogram.config]] # ## Right borders of buckets (with +Inf implicitly added). @@ -166,6 +172,7 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) { if h.ExpirationInterval != 0 { agr.expireTime = addTime.Add(time.Duration(h.ExpirationInterval)) } + agr.updated = true } } @@ -182,6 +189,11 @@ func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { delete(h.cache, id) continue } + if h.PushOnlyOnUpdate && !h.cache[id].updated { + continue + } + aggregate.updated = false + h.cache[id] = aggregate for field, counts := range aggregate.histogramCollection { h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts) } diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index c63b46d0a..dc78587b4 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -17,16 +17,17 @@ type fields map[string]interface{} type tags map[string]string // NewTestHistogram creates new test histogram aggregation with specified config -func NewTestHistogram(cfg []config, reset bool, cumulative bool) telegraf.Aggregator { - return NewTestHistogramWithExpirationInterval(cfg, reset, cumulative, 0) +func NewTestHistogram(cfg []config, reset bool, cumulative bool, pushOnlyOnUpdate bool) telegraf.Aggregator { + return NewTestHistogramWithExpirationInterval(cfg, reset, cumulative, pushOnlyOnUpdate, 0) } -func NewTestHistogramWithExpirationInterval(cfg []config, reset bool, cumulative bool, expirationInterval telegrafConfig.Duration) telegraf.Aggregator { +func NewTestHistogramWithExpirationInterval(cfg []config, reset bool, cumulative bool, pushOnlyOnUpdate bool, expirationInterval telegrafConfig.Duration) telegraf.Aggregator { htm := NewHistogramAggregator() htm.Configs = cfg htm.ResetBuckets = reset htm.Cumulative = cumulative htm.ExpirationInterval = expirationInterval + htm.PushOnlyOnUpdate = pushOnlyOnUpdate return htm } @@ -80,7 +81,7 @@ func BenchmarkApply(b *testing.B) { func TestHistogram(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) - histogram := NewTestHistogram(cfg, false, true) + histogram := NewTestHistogram(cfg, false, true, false) acc := &testutil.Accumulator{} @@ -98,11 +99,47 @@ func TestHistogram(t *testing.T) { assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: bucketPosInf}) } +// TestHistogram tests metrics for one period, for one field and push only on histogram update +func TestHistogramPushOnUpdate(t *testing.T) { + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) + histogram := NewTestHistogram(cfg, false, true, true) + + acc := &testutil.Accumulator{} + + histogram.Add(firstMetric1) + histogram.Reset() + histogram.Add(firstMetric2) + histogram.Push(acc) + + require.Len(t, acc.Metrics, 6, "Incorrect number of metrics") + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "0"}) + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "10"}) + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: "20"}) + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: "30"}) + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: "40"}) + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: bucketPosInf}) + + acc.ClearMetrics() + histogram.Push(acc) + require.Len(t, acc.Metrics, 0, "Incorrect number of metrics") + histogram.Add(firstMetric2) + histogram.Push(acc) + + require.Len(t, acc.Metrics, 6, "Incorrect number of metrics") + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "0"}) + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "10"}) + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(3)}, tags{bucketRightTag: "20"}) + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(3)}, tags{bucketRightTag: "30"}) + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(3)}, tags{bucketRightTag: "40"}) + assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(3)}, tags{bucketRightTag: bucketPosInf}) +} + // TestHistogramNonCumulative tests metrics for one period and for one field func TestHistogramNonCumulative(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) - histogram := NewTestHistogram(cfg, false, false) + histogram := NewTestHistogram(cfg, false, false, false) acc := &testutil.Accumulator{} @@ -124,7 +161,7 @@ func TestHistogramNonCumulative(t *testing.T) { func TestHistogramWithReset(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) - histogram := NewTestHistogram(cfg, true, true) + histogram := NewTestHistogram(cfg, true, true, false) acc := &testutil.Accumulator{} @@ -147,7 +184,7 @@ func TestHistogramWithAllFields(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}}) cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}}) - histogram := NewTestHistogram(cfg, false, true) + histogram := NewTestHistogram(cfg, false, true, false) acc := &testutil.Accumulator{} @@ -177,7 +214,7 @@ func TestHistogramWithAllFieldsNonCumulative(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}}) cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}}) - histogram := NewTestHistogram(cfg, false, false) + histogram := NewTestHistogram(cfg, false, false, false) acc := &testutil.Accumulator{} @@ -207,7 +244,7 @@ func TestHistogramWithAllFieldsNonCumulative(t *testing.T) { func TestHistogramWithTwoPeriodsAndAllFields(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) - histogram := NewTestHistogram(cfg, false, true) + histogram := NewTestHistogram(cfg, false, true, false) acc := &testutil.Accumulator{} histogram.Add(firstMetric1) @@ -246,7 +283,7 @@ func TestWrongBucketsOrder(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}}) - histogram := NewTestHistogram(cfg, false, true) + histogram := NewTestHistogram(cfg, false, true, false) histogram.Add(firstMetric2) } @@ -263,7 +300,7 @@ func TestHistogramMetricExpiration(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}}) - histogram := NewTestHistogramWithExpirationInterval(cfg, false, true, telegrafConfig.Duration(30)) + histogram := NewTestHistogramWithExpirationInterval(cfg, false, true, false, telegrafConfig.Duration(30)) acc := &testutil.Accumulator{}