From b658b2d4030a337fb040260246e2983093c9056a Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 9 May 2024 11:07:10 -0400 Subject: [PATCH] feat(aggregators.merge): Allow to round metric timestamps (#15319) --- plugins/aggregators/merge/README.md | 6 + plugins/aggregators/merge/merge.go | 16 +- plugins/aggregators/merge/merge_test.go | 245 ++++++++++++++++++------ plugins/aggregators/merge/sample.conf | 6 + 4 files changed, 208 insertions(+), 65 deletions(-) diff --git a/plugins/aggregators/merge/README.md b/plugins/aggregators/merge/README.md index b1d36683d..d0520af81 100644 --- a/plugins/aggregators/merge/README.md +++ b/plugins/aggregators/merge/README.md @@ -21,6 +21,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ```toml @sample.conf # Merge metrics into multifield metrics by series key [[aggregators.merge]] + ## Precision to round the metric timestamp to + ## This is useful for cases where metrics to merge arrive within a small + ## interval and thus vary in timestamp. The timestamp of the resulting metric + ## is also rounded. + # round_timestamp_to = "1ns" + ## If true, the original metric will be dropped by the ## aggregator and will not get sent to the output plugins. drop_original = true diff --git a/plugins/aggregators/merge/merge.go b/plugins/aggregators/merge/merge.go index 9d70ba7ad..e4818ff2a 100644 --- a/plugins/aggregators/merge/merge.go +++ b/plugins/aggregators/merge/merge.go @@ -6,6 +6,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/aggregators" ) @@ -14,7 +15,8 @@ import ( var sampleConfig string type Merge struct { - grouper *metric.SeriesGrouper + RoundTimestamp config.Duration `toml:"round_timestamp_to"` + grouper *metric.SeriesGrouper } func (*Merge) SampleConfig() string { @@ -27,7 +29,17 @@ func (a *Merge) Init() error { } func (a *Merge) Add(m telegraf.Metric) { - a.grouper.AddMetric(m) + gm := m + if a.RoundTimestamp > 0 { + if unwrapped, ok := m.(telegraf.UnwrappableMetric); ok { + gm = unwrapped.Unwrap().Copy() + } else { + gm = m.Copy() + } + ts := gm.Time() + gm.SetTime(ts.Round(time.Duration(a.RoundTimestamp))) + } + a.grouper.AddMetric(gm) } func (a *Merge) Push(acc telegraf.Accumulator) { diff --git a/plugins/aggregators/merge/merge_test.go b/plugins/aggregators/merge/merge_test.go index 0569b03f0..69b27c867 100644 --- a/plugins/aggregators/merge/merge_test.go +++ b/plugins/aggregators/merge/merge_test.go @@ -7,15 +7,14 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) func TestSimple(t *testing.T) { plugin := &Merge{} - - err := plugin.Init() - require.NoError(t, err) + require.NoError(t, plugin.Init()) plugin.Add( testutil.MustMetric( @@ -29,8 +28,6 @@ func TestSimple(t *testing.T) { time.Unix(0, 0), ), ) - require.NoError(t, err) - plugin.Add( testutil.MustMetric( "cpu", @@ -43,7 +40,6 @@ func TestSimple(t *testing.T) { time.Unix(0, 0), ), ) - require.NoError(t, err) var acc testutil.Accumulator plugin.Push(&acc) @@ -67,9 +63,7 @@ func TestSimple(t *testing.T) { func TestNanosecondPrecision(t *testing.T) { plugin := &Merge{} - - err := plugin.Init() - require.NoError(t, err) + require.NoError(t, plugin.Init()) plugin.Add( testutil.MustMetric( @@ -83,7 +77,6 @@ func TestNanosecondPrecision(t *testing.T) { time.Unix(0, 1), ), ) - require.NoError(t, err) plugin.Add( testutil.MustMetric( @@ -97,7 +90,6 @@ func TestNanosecondPrecision(t *testing.T) { time.Unix(0, 1), ), ) - require.NoError(t, err) var acc testutil.Accumulator acc.SetPrecision(time.Second) @@ -120,11 +112,120 @@ func TestNanosecondPrecision(t *testing.T) { testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } +func TestNoRounding(t *testing.T) { + plugin := &Merge{} + require.NoError(t, plugin.Init()) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 23, + }, + time.Unix(0, 1), + ), + ) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 2), + ), + ) + + var acc testutil.Accumulator + acc.SetPrecision(time.Second) + plugin.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 23, + }, + time.Unix(0, 1), + ), + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 2), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +func TestWithRounding(t *testing.T) { + plugin := &Merge{RoundTimestamp: config.Duration(10 * time.Nanosecond)} + require.NoError(t, plugin.Init()) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 23, + }, + time.Unix(0, 1), + ), + ) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 2), + ), + ) + + var acc testutil.Accumulator + acc.SetPrecision(time.Second) + plugin.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 23, + "time_guest": 42, + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + func TestReset(t *testing.T) { plugin := &Merge{} - - err := plugin.Init() - require.NoError(t, err) + require.NoError(t, plugin.Init()) plugin.Add( testutil.MustMetric( @@ -138,7 +239,6 @@ func TestReset(t *testing.T) { time.Unix(0, 0), ), ) - require.NoError(t, err) var acc testutil.Accumulator plugin.Push(&acc) @@ -157,7 +257,6 @@ func TestReset(t *testing.T) { time.Unix(0, 0), ), ) - require.NoError(t, err) plugin.Push(&acc) @@ -187,65 +286,85 @@ func TestReset(t *testing.T) { testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } -var m1 = metric.New( - "mymetric", - map[string]string{ - "host": "host.example.com", - "mykey": "myvalue", - "another key": "another value", - }, - map[string]interface{}{ - "f1": 1, - "f2": 2, - "f3": 3, - "f4": 4, - "f5": 5, - "f6": 6, - "f7": 7, - "f8": 8, - }, - time.Now(), -) -var m2 = metric.New( - "mymetric", - map[string]string{ - "host": "host.example.com", - "mykey": "myvalue", - "another key": "another value", - }, - map[string]interface{}{ - "f8": 8, - "f9": 9, - "f10": 10, - "f11": 11, - "f12": 12, - "f13": 13, - "f14": 14, - "f15": 15, - "f16": 16, - }, - m1.Time(), -) - func BenchmarkMergeOne(b *testing.B) { var merger Merge - err := merger.Init() - require.NoError(b, err) - var acc testutil.NopAccumulator + require.NoError(b, merger.Init()) + m := metric.New( + "mymetric", + map[string]string{ + "host": "host.example.com", + "mykey": "myvalue", + "another key": "another value", + }, + map[string]interface{}{ + "f1": 1, + "f2": 2, + "f3": 3, + "f4": 4, + "f5": 5, + "f6": 6, + "f7": 7, + "f8": 8, + }, + time.Now(), + ) + + var acc testutil.NopAccumulator for n := 0; n < b.N; n++ { merger.Reset() - merger.Add(m1) + merger.Add(m) merger.Push(&acc) } } func BenchmarkMergeTwo(b *testing.B) { var merger Merge - err := merger.Init() - require.NoError(b, err) - var acc testutil.NopAccumulator + require.NoError(b, merger.Init()) + now := time.Now() + m1 := metric.New( + "mymetric", + map[string]string{ + "host": "host.example.com", + "mykey": "myvalue", + "another key": "another value", + }, + map[string]interface{}{ + "f1": 1, + "f2": 2, + "f3": 3, + "f4": 4, + "f5": 5, + "f6": 6, + "f7": 7, + "f8": 8, + }, + now, + ) + + m2 := metric.New( + "mymetric", + map[string]string{ + "host": "host.example.com", + "mykey": "myvalue", + "another key": "another value", + }, + map[string]interface{}{ + "f8": 8, + "f9": 9, + "f10": 10, + "f11": 11, + "f12": 12, + "f13": 13, + "f14": 14, + "f15": 15, + "f16": 16, + }, + now, + ) + + var acc testutil.NopAccumulator for n := 0; n < b.N; n++ { merger.Reset() merger.Add(m1) diff --git a/plugins/aggregators/merge/sample.conf b/plugins/aggregators/merge/sample.conf index 146b52633..b012a50a5 100644 --- a/plugins/aggregators/merge/sample.conf +++ b/plugins/aggregators/merge/sample.conf @@ -1,5 +1,11 @@ # Merge metrics into multifield metrics by series key [[aggregators.merge]] + ## Precision to round the metric timestamp to + ## This is useful for cases where metrics to merge arrive within a small + ## interval and thus vary in timestamp. The timestamp of the resulting metric + ## is also rounded. + # round_timestamp_to = "1ns" + ## If true, the original metric will be dropped by the ## aggregator and will not get sent to the output plugins. drop_original = true