feat(aggregators.final): Add option to disable appending _final (#15268)

This commit is contained in:
Lars Stegman 2024-05-02 20:47:24 +02:00 committed by GitHub
parent 920f92fc53
commit d3f0ba9368
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 52 additions and 6 deletions

View File

@ -32,6 +32,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## aggregator and will not get sent to the output plugins.
# drop_original = false
## If false, _final is added to every field name
# keep_original_field_names = false
## The time that a series is not updated until considering it final. Ignored
## when output_strategy is "periodic".
# series_timeout = "5m"

View File

@ -15,8 +15,9 @@ import (
var sampleConfig string
type Final struct {
OutputStrategy string `toml:"output_strategy"`
SeriesTimeout config.Duration `toml:"series_timeout"`
OutputStrategy string `toml:"output_strategy"`
SeriesTimeout config.Duration `toml:"series_timeout"`
KeepOriginalFieldNames bool `toml:"keep_original_field_names"`
// The last metric for all series which are active
metricCache map[uint64]telegraf.Metric
@ -64,10 +65,16 @@ func (m *Final) Push(acc telegraf.Accumulator) {
// younger than that. So skip the output for this period.
continue
}
fields := map[string]interface{}{}
for _, field := range metric.FieldList() {
fields[field.Key+"_final"] = field.Value
var fields map[string]any
if m.KeepOriginalFieldNames {
fields = metric.Fields()
} else {
fields = map[string]any{}
for _, field := range metric.FieldList() {
fields[field.Key+"_final"] = field.Value
}
}
acc.AddFields(metric.Name(), fields, metric.Tags(), metric.Time())
delete(m.metricCache, id)
}

View File

@ -266,9 +266,42 @@ func TestOutputStrategyPeriodic(t *testing.T) {
metric.New(
"m",
tags,
map[string]interface{}{"a_final": int64(4)},
map[string]interface{}{
"a_final": 4,
},
now.Add(time.Second*-20),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics())
}
func TestKeepOriginalFieldNames(t *testing.T) {
final := &Final{
OutputStrategy: "periodic",
SeriesTimeout: config.Duration(30 * time.Second),
KeepOriginalFieldNames: true,
}
require.NoError(t, final.Init())
now := time.Now()
tags := map[string]string{"foo": "bar"}
m1 := metric.New("m",
tags,
map[string]any{"a": 3},
now.Add(time.Second*-90))
var acc testutil.Accumulator
final.Add(m1)
final.Push(&acc)
expected := []telegraf.Metric{
metric.New(
"m",
tags,
map[string]any{"a": 3},
now.Add(time.Second*-90),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics())
}

View File

@ -7,6 +7,9 @@
## aggregator and will not get sent to the output plugins.
# drop_original = false
## If false, _final is added to every field name
# keep_original_field_names = false
## The time that a series is not updated until considering it final. Ignored
## when output_strategy is "periodic".
# series_timeout = "5m"