feat: aggregator histogram add expiration (#10520)
This commit is contained in:
parent
5c8751f97c
commit
5b58d0ded6
|
|
@ -3,8 +3,10 @@ package histogram
|
||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
telegrafConfig "github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/plugins/aggregators"
|
"github.com/influxdata/telegraf/plugins/aggregators"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -22,9 +24,10 @@ const bucketNegInf = "-Inf"
|
||||||
|
|
||||||
// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
|
// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
|
||||||
type HistogramAggregator struct {
|
type HistogramAggregator struct {
|
||||||
Configs []config `toml:"config"`
|
Configs []config `toml:"config"`
|
||||||
ResetBuckets bool `toml:"reset"`
|
ResetBuckets bool `toml:"reset"`
|
||||||
Cumulative bool `toml:"cumulative"`
|
Cumulative bool `toml:"cumulative"`
|
||||||
|
ExpirationInterval telegrafConfig.Duration `toml:"expiration_interval"`
|
||||||
|
|
||||||
buckets bucketsByMetrics
|
buckets bucketsByMetrics
|
||||||
cache map[uint64]metricHistogramCollection
|
cache map[uint64]metricHistogramCollection
|
||||||
|
|
@ -51,6 +54,7 @@ type metricHistogramCollection struct {
|
||||||
histogramCollection map[string]counts
|
histogramCollection map[string]counts
|
||||||
name string
|
name string
|
||||||
tags map[string]string
|
tags map[string]string
|
||||||
|
expireTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// counts is the number of hits in the bucket
|
// counts is the number of hits in the bucket
|
||||||
|
|
@ -63,6 +67,8 @@ type groupedByCountFields struct {
|
||||||
fieldsWithCount map[string]int64
|
fieldsWithCount map[string]int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var timeNow = time.Now
|
||||||
|
|
||||||
// NewHistogramAggregator creates new histogram aggregator
|
// NewHistogramAggregator creates new histogram aggregator
|
||||||
func NewHistogramAggregator() *HistogramAggregator {
|
func NewHistogramAggregator() *HistogramAggregator {
|
||||||
h := &HistogramAggregator{
|
h := &HistogramAggregator{
|
||||||
|
|
@ -90,6 +96,10 @@ var sampleConfig = `
|
||||||
## Defaults to true.
|
## Defaults to true.
|
||||||
cumulative = true
|
cumulative = true
|
||||||
|
|
||||||
|
## Expiration interval for each histogram. The histogram will be expired if
|
||||||
|
## there are no changes in any buckets for this time interval. 0 == no expiration.
|
||||||
|
# expiration_interval = "0m"
|
||||||
|
|
||||||
## Example config that aggregates all fields of the metric.
|
## Example config that aggregates all fields of the metric.
|
||||||
# [[aggregators.histogram.config]]
|
# [[aggregators.histogram.config]]
|
||||||
# ## Right borders of buckets (with +Inf implicitly added).
|
# ## Right borders of buckets (with +Inf implicitly added).
|
||||||
|
|
@ -119,6 +129,8 @@ func (h *HistogramAggregator) Description() string {
|
||||||
|
|
||||||
// Add adds new hit to the buckets
|
// Add adds new hit to the buckets
|
||||||
func (h *HistogramAggregator) Add(in telegraf.Metric) {
|
func (h *HistogramAggregator) Add(in telegraf.Metric) {
|
||||||
|
addTime := timeNow()
|
||||||
|
|
||||||
bucketsByField := make(map[string][]float64)
|
bucketsByField := make(map[string][]float64)
|
||||||
for field := range in.Fields() {
|
for field := range in.Fields() {
|
||||||
buckets := h.getBuckets(in.Name(), field)
|
buckets := h.getBuckets(in.Name(), field)
|
||||||
|
|
@ -151,6 +163,9 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) {
|
||||||
index := sort.SearchFloat64s(buckets, value)
|
index := sort.SearchFloat64s(buckets, value)
|
||||||
agr.histogramCollection[field][index]++
|
agr.histogramCollection[field][index]++
|
||||||
}
|
}
|
||||||
|
if h.ExpirationInterval != 0 {
|
||||||
|
agr.expireTime = addTime.Add(time.Duration(h.ExpirationInterval))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -160,8 +175,13 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) {
|
||||||
// Push returns histogram values for metrics
|
// Push returns histogram values for metrics
|
||||||
func (h *HistogramAggregator) Push(acc telegraf.Accumulator) {
|
func (h *HistogramAggregator) Push(acc telegraf.Accumulator) {
|
||||||
metricsWithGroupedFields := []groupedByCountFields{}
|
metricsWithGroupedFields := []groupedByCountFields{}
|
||||||
|
now := timeNow()
|
||||||
|
|
||||||
for _, aggregate := range h.cache {
|
for id, aggregate := range h.cache {
|
||||||
|
if h.ExpirationInterval != 0 && now.After(aggregate.expireTime) {
|
||||||
|
delete(h.cache, id)
|
||||||
|
continue
|
||||||
|
}
|
||||||
for field, counts := range aggregate.histogramCollection {
|
for field, counts := range aggregate.histogramCollection {
|
||||||
h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts)
|
h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
telegrafConfig "github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
@ -17,10 +18,15 @@ type tags map[string]string
|
||||||
|
|
||||||
// NewTestHistogram creates new test histogram aggregation with specified config
|
// NewTestHistogram creates new test histogram aggregation with specified config
|
||||||
func NewTestHistogram(cfg []config, reset bool, cumulative bool) telegraf.Aggregator {
|
func NewTestHistogram(cfg []config, reset bool, cumulative bool) telegraf.Aggregator {
|
||||||
|
return NewTestHistogramWithExpirationInterval(cfg, reset, cumulative, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTestHistogramWithExpirationInterval(cfg []config, reset bool, cumulative bool, expirationInterval telegrafConfig.Duration) telegraf.Aggregator {
|
||||||
htm := NewHistogramAggregator()
|
htm := NewHistogramAggregator()
|
||||||
htm.Configs = cfg
|
htm.Configs = cfg
|
||||||
htm.ResetBuckets = reset
|
htm.ResetBuckets = reset
|
||||||
htm.Cumulative = cumulative
|
htm.Cumulative = cumulative
|
||||||
|
htm.ExpirationInterval = expirationInterval
|
||||||
|
|
||||||
return htm
|
return htm
|
||||||
}
|
}
|
||||||
|
|
@ -244,6 +250,37 @@ func TestWrongBucketsOrder(t *testing.T) {
|
||||||
histogram.Add(firstMetric2)
|
histogram.Add(firstMetric2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestHistogram tests two metrics getting added and metric expiration
|
||||||
|
func TestHistogramMetricExpiration(t *testing.T) {
|
||||||
|
currentTime := time.Unix(10, 0)
|
||||||
|
timeNow = func() time.Time {
|
||||||
|
return currentTime
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
timeNow = time.Now
|
||||||
|
}()
|
||||||
|
|
||||||
|
var cfg []config
|
||||||
|
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
|
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
|
||||||
|
histogram := NewTestHistogramWithExpirationInterval(cfg, false, true, telegrafConfig.Duration(30))
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
histogram.Add(firstMetric1)
|
||||||
|
currentTime = time.Unix(41, 0)
|
||||||
|
histogram.Add(secondMetric)
|
||||||
|
histogram.Push(acc)
|
||||||
|
|
||||||
|
require.Len(t, acc.Metrics, 6, "Incorrect number of metrics")
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "0"})
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "4"})
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "10"})
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "23"})
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "30"})
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: bucketPosInf})
|
||||||
|
}
|
||||||
|
|
||||||
// assertContainsTaggedField is help functions to test histogram data
|
// assertContainsTaggedField is help functions to test histogram data
|
||||||
func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, tags map[string]string) {
|
func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, tags map[string]string) {
|
||||||
acc.Lock()
|
acc.Lock()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue