feat(aggregators.merge): Allow to round metric timestamps (#15319)

This commit is contained in:
Sven Rebhan 2024-05-09 11:07:10 -04:00 committed by GitHub
parent 43c8db95d8
commit b658b2d403
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 208 additions and 65 deletions

View File

@ -21,6 +21,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Merge metrics into multifield metrics by series key
[[aggregators.merge]]
## Precision to round the metric timestamp to
## This is useful for cases where metrics to merge arrive within a small
## interval and thus vary in timestamp. The timestamp of the resulting metric
## is also rounded.
# round_timestamp_to = "1ns"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = true

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/aggregators"
)
@ -14,7 +15,8 @@ import (
var sampleConfig string
type Merge struct {
grouper *metric.SeriesGrouper
RoundTimestamp config.Duration `toml:"round_timestamp_to"`
grouper *metric.SeriesGrouper
}
func (*Merge) SampleConfig() string {
@ -27,7 +29,17 @@ func (a *Merge) Init() error {
}
func (a *Merge) Add(m telegraf.Metric) {
a.grouper.AddMetric(m)
gm := m
if a.RoundTimestamp > 0 {
if unwrapped, ok := m.(telegraf.UnwrappableMetric); ok {
gm = unwrapped.Unwrap().Copy()
} else {
gm = m.Copy()
}
ts := gm.Time()
gm.SetTime(ts.Round(time.Duration(a.RoundTimestamp)))
}
a.grouper.AddMetric(gm)
}
func (a *Merge) Push(acc telegraf.Accumulator) {

View File

@ -7,15 +7,14 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
func TestSimple(t *testing.T) {
plugin := &Merge{}
err := plugin.Init()
require.NoError(t, err)
require.NoError(t, plugin.Init())
plugin.Add(
testutil.MustMetric(
@ -29,8 +28,6 @@ func TestSimple(t *testing.T) {
time.Unix(0, 0),
),
)
require.NoError(t, err)
plugin.Add(
testutil.MustMetric(
"cpu",
@ -43,7 +40,6 @@ func TestSimple(t *testing.T) {
time.Unix(0, 0),
),
)
require.NoError(t, err)
var acc testutil.Accumulator
plugin.Push(&acc)
@ -67,9 +63,7 @@ func TestSimple(t *testing.T) {
func TestNanosecondPrecision(t *testing.T) {
plugin := &Merge{}
err := plugin.Init()
require.NoError(t, err)
require.NoError(t, plugin.Init())
plugin.Add(
testutil.MustMetric(
@ -83,7 +77,6 @@ func TestNanosecondPrecision(t *testing.T) {
time.Unix(0, 1),
),
)
require.NoError(t, err)
plugin.Add(
testutil.MustMetric(
@ -97,7 +90,6 @@ func TestNanosecondPrecision(t *testing.T) {
time.Unix(0, 1),
),
)
require.NoError(t, err)
var acc testutil.Accumulator
acc.SetPrecision(time.Second)
@ -120,11 +112,120 @@ func TestNanosecondPrecision(t *testing.T) {
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestNoRounding(t *testing.T) {
plugin := &Merge{}
require.NoError(t, plugin.Init())
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 23,
},
time.Unix(0, 1),
),
)
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 2),
),
)
var acc testutil.Accumulator
acc.SetPrecision(time.Second)
plugin.Push(&acc)
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 23,
},
time.Unix(0, 1),
),
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 2),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestWithRounding(t *testing.T) {
plugin := &Merge{RoundTimestamp: config.Duration(10 * time.Nanosecond)}
require.NoError(t, plugin.Init())
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 23,
},
time.Unix(0, 1),
),
)
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 2),
),
)
var acc testutil.Accumulator
acc.SetPrecision(time.Second)
plugin.Push(&acc)
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 23,
"time_guest": 42,
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestReset(t *testing.T) {
plugin := &Merge{}
err := plugin.Init()
require.NoError(t, err)
require.NoError(t, plugin.Init())
plugin.Add(
testutil.MustMetric(
@ -138,7 +239,6 @@ func TestReset(t *testing.T) {
time.Unix(0, 0),
),
)
require.NoError(t, err)
var acc testutil.Accumulator
plugin.Push(&acc)
@ -157,7 +257,6 @@ func TestReset(t *testing.T) {
time.Unix(0, 0),
),
)
require.NoError(t, err)
plugin.Push(&acc)
@ -187,65 +286,85 @@ func TestReset(t *testing.T) {
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
var m1 = metric.New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f1": 1,
"f2": 2,
"f3": 3,
"f4": 4,
"f5": 5,
"f6": 6,
"f7": 7,
"f8": 8,
},
time.Now(),
)
var m2 = metric.New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f8": 8,
"f9": 9,
"f10": 10,
"f11": 11,
"f12": 12,
"f13": 13,
"f14": 14,
"f15": 15,
"f16": 16,
},
m1.Time(),
)
func BenchmarkMergeOne(b *testing.B) {
var merger Merge
err := merger.Init()
require.NoError(b, err)
var acc testutil.NopAccumulator
require.NoError(b, merger.Init())
m := metric.New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f1": 1,
"f2": 2,
"f3": 3,
"f4": 4,
"f5": 5,
"f6": 6,
"f7": 7,
"f8": 8,
},
time.Now(),
)
var acc testutil.NopAccumulator
for n := 0; n < b.N; n++ {
merger.Reset()
merger.Add(m1)
merger.Add(m)
merger.Push(&acc)
}
}
func BenchmarkMergeTwo(b *testing.B) {
var merger Merge
err := merger.Init()
require.NoError(b, err)
var acc testutil.NopAccumulator
require.NoError(b, merger.Init())
now := time.Now()
m1 := metric.New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f1": 1,
"f2": 2,
"f3": 3,
"f4": 4,
"f5": 5,
"f6": 6,
"f7": 7,
"f8": 8,
},
now,
)
m2 := metric.New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f8": 8,
"f9": 9,
"f10": 10,
"f11": 11,
"f12": 12,
"f13": 13,
"f14": 14,
"f15": 15,
"f16": 16,
},
now,
)
var acc testutil.NopAccumulator
for n := 0; n < b.N; n++ {
merger.Reset()
merger.Add(m1)

View File

@ -1,5 +1,11 @@
# Merge metrics into multifield metrics by series key
[[aggregators.merge]]
## Precision to round the metric timestamp to
## This is useful for cases where metrics to merge arrive within a small
## interval and thus vary in timestamp. The timestamp of the resulting metric
## is also rounded.
# round_timestamp_to = "1ns"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = true