fix: add push only updated values flag to histogram aggregator (#10515)
This commit is contained in:
parent
5adecc3cd9
commit
92d1b0efcf
|
|
@ -48,6 +48,10 @@ of the algorithm which is implemented in the Prometheus
|
||||||
## there are no changes in any buckets for this time interval. 0 == no expiration.
|
## there are no changes in any buckets for this time interval. 0 == no expiration.
|
||||||
# expiration_interval = "0m"
|
# expiration_interval = "0m"
|
||||||
|
|
||||||
|
## If true, aggregated histogram are pushed to output only if it was updated since
|
||||||
|
## previous push. Defaults to false.
|
||||||
|
# push_only_on_update = false
|
||||||
|
|
||||||
## Example config that aggregates all fields of the metric.
|
## Example config that aggregates all fields of the metric.
|
||||||
# [[aggregators.histogram.config]]
|
# [[aggregators.histogram.config]]
|
||||||
# ## Right borders of buckets (with +Inf implicitly added).
|
# ## Right borders of buckets (with +Inf implicitly added).
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ type HistogramAggregator struct {
|
||||||
ResetBuckets bool `toml:"reset"`
|
ResetBuckets bool `toml:"reset"`
|
||||||
Cumulative bool `toml:"cumulative"`
|
Cumulative bool `toml:"cumulative"`
|
||||||
ExpirationInterval telegrafConfig.Duration `toml:"expiration_interval"`
|
ExpirationInterval telegrafConfig.Duration `toml:"expiration_interval"`
|
||||||
|
PushOnlyOnUpdate bool `toml:"push_only_on_update"`
|
||||||
|
|
||||||
buckets bucketsByMetrics
|
buckets bucketsByMetrics
|
||||||
cache map[uint64]metricHistogramCollection
|
cache map[uint64]metricHistogramCollection
|
||||||
|
|
@ -55,6 +56,7 @@ type metricHistogramCollection struct {
|
||||||
name string
|
name string
|
||||||
tags map[string]string
|
tags map[string]string
|
||||||
expireTime time.Time
|
expireTime time.Time
|
||||||
|
updated bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// counts is the number of hits in the bucket
|
// counts is the number of hits in the bucket
|
||||||
|
|
@ -100,6 +102,10 @@ var sampleConfig = `
|
||||||
## there are no changes in any buckets for this time interval. 0 == no expiration.
|
## there are no changes in any buckets for this time interval. 0 == no expiration.
|
||||||
# expiration_interval = "0m"
|
# expiration_interval = "0m"
|
||||||
|
|
||||||
|
## If true, aggregated histogram are pushed to output only if it was updated since
|
||||||
|
## previous push. Defaults to false.
|
||||||
|
# push_only_on_update = false
|
||||||
|
|
||||||
## Example config that aggregates all fields of the metric.
|
## Example config that aggregates all fields of the metric.
|
||||||
# [[aggregators.histogram.config]]
|
# [[aggregators.histogram.config]]
|
||||||
# ## Right borders of buckets (with +Inf implicitly added).
|
# ## Right borders of buckets (with +Inf implicitly added).
|
||||||
|
|
@ -166,6 +172,7 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) {
|
||||||
if h.ExpirationInterval != 0 {
|
if h.ExpirationInterval != 0 {
|
||||||
agr.expireTime = addTime.Add(time.Duration(h.ExpirationInterval))
|
agr.expireTime = addTime.Add(time.Duration(h.ExpirationInterval))
|
||||||
}
|
}
|
||||||
|
agr.updated = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -182,6 +189,11 @@ func (h *HistogramAggregator) Push(acc telegraf.Accumulator) {
|
||||||
delete(h.cache, id)
|
delete(h.cache, id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if h.PushOnlyOnUpdate && !h.cache[id].updated {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
aggregate.updated = false
|
||||||
|
h.cache[id] = aggregate
|
||||||
for field, counts := range aggregate.histogramCollection {
|
for field, counts := range aggregate.histogramCollection {
|
||||||
h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts)
|
h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,16 +17,17 @@ type fields map[string]interface{}
|
||||||
type tags map[string]string
|
type tags map[string]string
|
||||||
|
|
||||||
// NewTestHistogram creates new test histogram aggregation with specified config
|
// NewTestHistogram creates new test histogram aggregation with specified config
|
||||||
func NewTestHistogram(cfg []config, reset bool, cumulative bool) telegraf.Aggregator {
|
func NewTestHistogram(cfg []config, reset bool, cumulative bool, pushOnlyOnUpdate bool) telegraf.Aggregator {
|
||||||
return NewTestHistogramWithExpirationInterval(cfg, reset, cumulative, 0)
|
return NewTestHistogramWithExpirationInterval(cfg, reset, cumulative, pushOnlyOnUpdate, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestHistogramWithExpirationInterval(cfg []config, reset bool, cumulative bool, expirationInterval telegrafConfig.Duration) telegraf.Aggregator {
|
func NewTestHistogramWithExpirationInterval(cfg []config, reset bool, cumulative bool, pushOnlyOnUpdate bool, expirationInterval telegrafConfig.Duration) telegraf.Aggregator {
|
||||||
htm := NewHistogramAggregator()
|
htm := NewHistogramAggregator()
|
||||||
htm.Configs = cfg
|
htm.Configs = cfg
|
||||||
htm.ResetBuckets = reset
|
htm.ResetBuckets = reset
|
||||||
htm.Cumulative = cumulative
|
htm.Cumulative = cumulative
|
||||||
htm.ExpirationInterval = expirationInterval
|
htm.ExpirationInterval = expirationInterval
|
||||||
|
htm.PushOnlyOnUpdate = pushOnlyOnUpdate
|
||||||
|
|
||||||
return htm
|
return htm
|
||||||
}
|
}
|
||||||
|
|
@ -80,7 +81,7 @@ func BenchmarkApply(b *testing.B) {
|
||||||
func TestHistogram(t *testing.T) {
|
func TestHistogram(t *testing.T) {
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
histogram := NewTestHistogram(cfg, false, true)
|
histogram := NewTestHistogram(cfg, false, true, false)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
|
@ -98,11 +99,47 @@ func TestHistogram(t *testing.T) {
|
||||||
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: bucketPosInf})
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: bucketPosInf})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestHistogram tests metrics for one period, for one field and push only on histogram update
|
||||||
|
func TestHistogramPushOnUpdate(t *testing.T) {
|
||||||
|
var cfg []config
|
||||||
|
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
|
histogram := NewTestHistogram(cfg, false, true, true)
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
histogram.Add(firstMetric1)
|
||||||
|
histogram.Reset()
|
||||||
|
histogram.Add(firstMetric2)
|
||||||
|
histogram.Push(acc)
|
||||||
|
|
||||||
|
require.Len(t, acc.Metrics, 6, "Incorrect number of metrics")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "0"})
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "10"})
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: "20"})
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: "30"})
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: "40"})
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: bucketPosInf})
|
||||||
|
|
||||||
|
acc.ClearMetrics()
|
||||||
|
histogram.Push(acc)
|
||||||
|
require.Len(t, acc.Metrics, 0, "Incorrect number of metrics")
|
||||||
|
histogram.Add(firstMetric2)
|
||||||
|
histogram.Push(acc)
|
||||||
|
|
||||||
|
require.Len(t, acc.Metrics, 6, "Incorrect number of metrics")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "0"})
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "10"})
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(3)}, tags{bucketRightTag: "20"})
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(3)}, tags{bucketRightTag: "30"})
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(3)}, tags{bucketRightTag: "40"})
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(3)}, tags{bucketRightTag: bucketPosInf})
|
||||||
|
}
|
||||||
|
|
||||||
// TestHistogramNonCumulative tests metrics for one period and for one field
|
// TestHistogramNonCumulative tests metrics for one period and for one field
|
||||||
func TestHistogramNonCumulative(t *testing.T) {
|
func TestHistogramNonCumulative(t *testing.T) {
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
histogram := NewTestHistogram(cfg, false, false)
|
histogram := NewTestHistogram(cfg, false, false, false)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
|
@ -124,7 +161,7 @@ func TestHistogramNonCumulative(t *testing.T) {
|
||||||
func TestHistogramWithReset(t *testing.T) {
|
func TestHistogramWithReset(t *testing.T) {
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
histogram := NewTestHistogram(cfg, true, true)
|
histogram := NewTestHistogram(cfg, true, true, false)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
|
@ -147,7 +184,7 @@ func TestHistogramWithAllFields(t *testing.T) {
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
|
||||||
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
|
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
|
||||||
histogram := NewTestHistogram(cfg, false, true)
|
histogram := NewTestHistogram(cfg, false, true, false)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
|
@ -177,7 +214,7 @@ func TestHistogramWithAllFieldsNonCumulative(t *testing.T) {
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
|
||||||
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
|
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
|
||||||
histogram := NewTestHistogram(cfg, false, false)
|
histogram := NewTestHistogram(cfg, false, false, false)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
|
@ -207,7 +244,7 @@ func TestHistogramWithAllFieldsNonCumulative(t *testing.T) {
|
||||||
func TestHistogramWithTwoPeriodsAndAllFields(t *testing.T) {
|
func TestHistogramWithTwoPeriodsAndAllFields(t *testing.T) {
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
histogram := NewTestHistogram(cfg, false, true)
|
histogram := NewTestHistogram(cfg, false, true, false)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
histogram.Add(firstMetric1)
|
histogram.Add(firstMetric1)
|
||||||
|
|
@ -246,7 +283,7 @@ func TestWrongBucketsOrder(t *testing.T) {
|
||||||
|
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}})
|
||||||
histogram := NewTestHistogram(cfg, false, true)
|
histogram := NewTestHistogram(cfg, false, true, false)
|
||||||
histogram.Add(firstMetric2)
|
histogram.Add(firstMetric2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -263,7 +300,7 @@ func TestHistogramMetricExpiration(t *testing.T) {
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
|
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
|
||||||
histogram := NewTestHistogramWithExpirationInterval(cfg, false, true, telegrafConfig.Duration(30))
|
histogram := NewTestHistogramWithExpirationInterval(cfg, false, true, false, telegrafConfig.Duration(30))
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue