From 5b58d0ded61bde4376f00bfa38039035f8515a63 Mon Sep 17 00:00:00 2001 From: Eugene Komarov Date: Fri, 4 Feb 2022 04:05:33 +0600 Subject: [PATCH] feat: aggregator histogram add expiration (#10520) --- plugins/aggregators/histogram/histogram.go | 28 ++++++++++++-- .../aggregators/histogram/histogram_test.go | 37 +++++++++++++++++++ 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index dab524d62..f4ffff5ff 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -3,8 +3,10 @@ package histogram import ( "sort" "strconv" + "time" "github.com/influxdata/telegraf" + telegrafConfig "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/aggregators" ) @@ -22,9 +24,10 @@ const bucketNegInf = "-Inf" // HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics type HistogramAggregator struct { - Configs []config `toml:"config"` - ResetBuckets bool `toml:"reset"` - Cumulative bool `toml:"cumulative"` + Configs []config `toml:"config"` + ResetBuckets bool `toml:"reset"` + Cumulative bool `toml:"cumulative"` + ExpirationInterval telegrafConfig.Duration `toml:"expiration_interval"` buckets bucketsByMetrics cache map[uint64]metricHistogramCollection @@ -51,6 +54,7 @@ type metricHistogramCollection struct { histogramCollection map[string]counts name string tags map[string]string + expireTime time.Time } // counts is the number of hits in the bucket @@ -63,6 +67,8 @@ type groupedByCountFields struct { fieldsWithCount map[string]int64 } +var timeNow = time.Now + // NewHistogramAggregator creates new histogram aggregator func NewHistogramAggregator() *HistogramAggregator { h := &HistogramAggregator{ @@ -90,6 +96,10 @@ var sampleConfig = ` ## Defaults to true. cumulative = true + ## Expiration interval for each histogram. The histogram will be expired if + ## there are no changes in any buckets for this time interval. 0 == no expiration. + # expiration_interval = "0m" + ## Example config that aggregates all fields of the metric. # [[aggregators.histogram.config]] # ## Right borders of buckets (with +Inf implicitly added). @@ -119,6 +129,8 @@ func (h *HistogramAggregator) Description() string { // Add adds new hit to the buckets func (h *HistogramAggregator) Add(in telegraf.Metric) { + addTime := timeNow() + bucketsByField := make(map[string][]float64) for field := range in.Fields() { buckets := h.getBuckets(in.Name(), field) @@ -151,6 +163,9 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) { index := sort.SearchFloat64s(buckets, value) agr.histogramCollection[field][index]++ } + if h.ExpirationInterval != 0 { + agr.expireTime = addTime.Add(time.Duration(h.ExpirationInterval)) + } } } @@ -160,8 +175,13 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) { // Push returns histogram values for metrics func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { metricsWithGroupedFields := []groupedByCountFields{} + now := timeNow() - for _, aggregate := range h.cache { + for id, aggregate := range h.cache { + if h.ExpirationInterval != 0 && now.After(aggregate.expireTime) { + delete(h.cache, id) + continue + } for field, counts := range aggregate.histogramCollection { h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts) } diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index ad24d5b33..c63b46d0a 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + telegrafConfig "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) @@ -17,10 +18,15 @@ type tags map[string]string // NewTestHistogram creates new test histogram aggregation with specified config func NewTestHistogram(cfg []config, reset bool, cumulative bool) telegraf.Aggregator { + return NewTestHistogramWithExpirationInterval(cfg, reset, cumulative, 0) +} + +func NewTestHistogramWithExpirationInterval(cfg []config, reset bool, cumulative bool, expirationInterval telegrafConfig.Duration) telegraf.Aggregator { htm := NewHistogramAggregator() htm.Configs = cfg htm.ResetBuckets = reset htm.Cumulative = cumulative + htm.ExpirationInterval = expirationInterval return htm } @@ -244,6 +250,37 @@ func TestWrongBucketsOrder(t *testing.T) { histogram.Add(firstMetric2) } +// TestHistogram tests two metrics getting added and metric expiration +func TestHistogramMetricExpiration(t *testing.T) { + currentTime := time.Unix(10, 0) + timeNow = func() time.Time { + return currentTime + } + defer func() { + timeNow = time.Now + }() + + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) + cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}}) + histogram := NewTestHistogramWithExpirationInterval(cfg, false, true, telegrafConfig.Duration(30)) + + acc := &testutil.Accumulator{} + + histogram.Add(firstMetric1) + currentTime = time.Unix(41, 0) + histogram.Add(secondMetric) + histogram.Push(acc) + + require.Len(t, acc.Metrics, 6, "Incorrect number of metrics") + assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "0"}) + assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "4"}) + assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "10"}) + assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "23"}) + assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "30"}) + assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: bucketPosInf}) +} + // assertContainsTaggedField is help functions to test histogram data func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, tags map[string]string) { acc.Lock()