From f0656a4910002d87f2568ed6cb5ea0da9ea38f04 Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Fri, 23 Feb 2024 14:10:33 -0500 Subject: [PATCH] feat(agent): Add option to skip re-running processors after aggregators (#14882) --- agent/agent.go | 6 ++--- agent/agent_test.go | 4 +--- .../aggregators-rerun-processors/expected.out | 2 ++ .../aggregators-rerun-processors/input.influx | 1 + .../telegraf.conf | 22 +++++++++++++++++++ .../aggregators-skip-processors/expected.out | 2 ++ .../aggregators-skip-processors/input.influx | 1 + .../aggregators-skip-processors/telegraf.conf | 22 +++++++++++++++++++ cmd/telegraf/agent.conf | 5 +++++ config/config.go | 5 +++++ 10 files changed, 64 insertions(+), 6 deletions(-) create mode 100644 agent/testcases/aggregators-rerun-processors/expected.out create mode 100644 agent/testcases/aggregators-rerun-processors/input.influx create mode 100644 agent/testcases/aggregators-rerun-processors/telegraf.conf create mode 100644 agent/testcases/aggregators-skip-processors/expected.out create mode 100644 agent/testcases/aggregators-skip-processors/input.influx create mode 100644 agent/testcases/aggregators-skip-processors/telegraf.conf diff --git a/agent/agent.go b/agent/agent.go index 9beb5ba38..ca236ce8f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -136,7 +136,7 @@ func (a *Agent) Run(ctx context.Context) error { var au *aggregatorUnit if len(a.Config.Aggregators) != 0 { aggC := next - if len(a.Config.AggProcessors) != 0 { + if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators { aggC, apu, err = a.startProcessors(next, a.Config.AggProcessors) if err != nil { return err @@ -1013,7 +1013,7 @@ func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<- var au *aggregatorUnit if len(a.Config.Aggregators) != 0 { procC := next - if len(a.Config.AggProcessors) != 0 { + if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators { procC, apu, err = a.startProcessors(next, a.Config.AggProcessors) if err != nil { return err @@ -1112,7 +1112,7 @@ func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error { var au *aggregatorUnit if len(a.Config.Aggregators) != 0 { procC := next - if len(a.Config.AggProcessors) != 0 { + if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators { procC, apu, err = a.startProcessors(next, a.Config.AggProcessors) if err != nil { return err diff --git a/agent/agent_test.go b/agent/agent_test.go index 77e8fbead..849b491b7 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -222,9 +222,7 @@ func TestCases(t *testing.T) { // Process expected metrics and compare with resulting metrics options := []cmp.Option{ testutil.IgnoreTags("host"), - } - if expected[0].Time().IsZero() { - options = append(options, testutil.IgnoreTime()) + testutil.IgnoreTime(), } testutil.RequireMetricsEqual(t, expected, actual, options...) }) diff --git a/agent/testcases/aggregators-rerun-processors/expected.out b/agent/testcases/aggregators-rerun-processors/expected.out new file mode 100644 index 000000000..ea610c4ef --- /dev/null +++ b/agent/testcases/aggregators-rerun-processors/expected.out @@ -0,0 +1,2 @@ +metric value=420 +metric value_min=4200,value_max=4200 diff --git a/agent/testcases/aggregators-rerun-processors/input.influx b/agent/testcases/aggregators-rerun-processors/input.influx new file mode 100644 index 000000000..194bd0264 --- /dev/null +++ b/agent/testcases/aggregators-rerun-processors/input.influx @@ -0,0 +1 @@ +metric value=42.0 diff --git a/agent/testcases/aggregators-rerun-processors/telegraf.conf b/agent/testcases/aggregators-rerun-processors/telegraf.conf new file mode 100644 index 000000000..2cc749bfc --- /dev/null +++ b/agent/testcases/aggregators-rerun-processors/telegraf.conf @@ -0,0 +1,22 @@ +# Test for not skipping processors after running aggregators +[agent] + omit_hostname = true + skip_processors_after_aggregators = false + +[[inputs.file]] + files = ["testcases/aggregators-rerun-processors/input.influx"] + data_format = "influx" + +[[processors.starlark]] + source = ''' +def apply(metric): + for k, v in metric.fields.items(): + if type(v) == "float": + metric.fields[k] = v * 10 + return metric +''' + +[[aggregators.minmax]] + period = "1s" + drop_original = false + diff --git a/agent/testcases/aggregators-skip-processors/expected.out b/agent/testcases/aggregators-skip-processors/expected.out new file mode 100644 index 000000000..7a006b8fa --- /dev/null +++ b/agent/testcases/aggregators-skip-processors/expected.out @@ -0,0 +1,2 @@ +metric value=420 +metric value_min=420,value_max=420 diff --git a/agent/testcases/aggregators-skip-processors/input.influx b/agent/testcases/aggregators-skip-processors/input.influx new file mode 100644 index 000000000..194bd0264 --- /dev/null +++ b/agent/testcases/aggregators-skip-processors/input.influx @@ -0,0 +1 @@ +metric value=42.0 diff --git a/agent/testcases/aggregators-skip-processors/telegraf.conf b/agent/testcases/aggregators-skip-processors/telegraf.conf new file mode 100644 index 000000000..8dd2f5703 --- /dev/null +++ b/agent/testcases/aggregators-skip-processors/telegraf.conf @@ -0,0 +1,22 @@ +# Test for skipping processors after running aggregators +[agent] + omit_hostname = true + skip_processors_after_aggregators = true + +[[inputs.file]] + files = ["testcases/aggregators-skip-processors/input.influx"] + data_format = "influx" + +[[processors.starlark]] + source = ''' +def apply(metric): + for k, v in metric.fields.items(): + if type(v) == "float": + metric.fields[k] = v * 10 + return metric +''' + +[[aggregators.minmax]] + period = "1s" + drop_original = false + diff --git a/cmd/telegraf/agent.conf b/cmd/telegraf/agent.conf index 919b63d09..bb9ea0ab8 100644 --- a/cmd/telegraf/agent.conf +++ b/cmd/telegraf/agent.conf @@ -94,3 +94,8 @@ ## stateful plugins on termination of Telegraf. If the file exists on start, ## the state in the file will be restored for the plugins. # statefile = "" + + ## Flag to skip running processors after aggregators + ## By default, processors are run a second time after aggregators. Changing + ## this setting to true will skip the second run of processors. + # skip_processors_after_aggregators = false diff --git a/config/config.go b/config/config.go index 2a8448a8c..394d22d96 100644 --- a/config/config.go +++ b/config/config.go @@ -265,6 +265,11 @@ type AgentConfig struct { // Flag to always keep tags explicitly defined in the global tags section // and ensure those tags always pass filtering. AlwaysIncludeGlobalTags bool `toml:"always_include_global_tags"` + + // Flag to skip running processors after aggregators + // By default, processors are run a second time after aggregators. Changing + // this setting to true will skip the second run of processors. + SkipProcessorsAfterAggregators bool `toml:"skip_processors_after_aggregators"` } // InputNames returns a list of strings of the configured inputs.