feat(agent): Add option to skip re-running processors after aggregators (#14882)

This commit is contained in:
Joshua Powers 2024-02-23 14:10:33 -05:00 committed by GitHub
parent 3929a42504
commit f0656a4910
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 64 additions and 6 deletions

View File

@ -136,7 +136,7 @@ func (a *Agent) Run(ctx context.Context) error {
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
aggC := next
if len(a.Config.AggProcessors) != 0 {
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
aggC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
@ -1013,7 +1013,7 @@ func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<-
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
procC := next
if len(a.Config.AggProcessors) != 0 {
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
@ -1112,7 +1112,7 @@ func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error {
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
procC := next
if len(a.Config.AggProcessors) != 0 {
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err

View File

@ -222,9 +222,7 @@ func TestCases(t *testing.T) {
// Process expected metrics and compare with resulting metrics
options := []cmp.Option{
testutil.IgnoreTags("host"),
}
if expected[0].Time().IsZero() {
options = append(options, testutil.IgnoreTime())
testutil.IgnoreTime(),
}
testutil.RequireMetricsEqual(t, expected, actual, options...)
})

View File

@ -0,0 +1,2 @@
metric value=420
metric value_min=4200,value_max=4200

View File

@ -0,0 +1 @@
metric value=42.0

View File

@ -0,0 +1,22 @@
# Test for not skipping processors after running aggregators
[agent]
omit_hostname = true
skip_processors_after_aggregators = false
[[inputs.file]]
files = ["testcases/aggregators-rerun-processors/input.influx"]
data_format = "influx"
[[processors.starlark]]
source = '''
def apply(metric):
for k, v in metric.fields.items():
if type(v) == "float":
metric.fields[k] = v * 10
return metric
'''
[[aggregators.minmax]]
period = "1s"
drop_original = false

View File

@ -0,0 +1,2 @@
metric value=420
metric value_min=420,value_max=420

View File

@ -0,0 +1 @@
metric value=42.0

View File

@ -0,0 +1,22 @@
# Test for skipping processors after running aggregators
[agent]
omit_hostname = true
skip_processors_after_aggregators = true
[[inputs.file]]
files = ["testcases/aggregators-skip-processors/input.influx"]
data_format = "influx"
[[processors.starlark]]
source = '''
def apply(metric):
for k, v in metric.fields.items():
if type(v) == "float":
metric.fields[k] = v * 10
return metric
'''
[[aggregators.minmax]]
period = "1s"
drop_original = false

View File

@ -94,3 +94,8 @@
## stateful plugins on termination of Telegraf. If the file exists on start,
## the state in the file will be restored for the plugins.
# statefile = ""
## Flag to skip running processors after aggregators
## By default, processors are run a second time after aggregators. Changing
## this setting to true will skip the second run of processors.
# skip_processors_after_aggregators = false

View File

@ -265,6 +265,11 @@ type AgentConfig struct {
// Flag to always keep tags explicitly defined in the global tags section
// and ensure those tags always pass filtering.
AlwaysIncludeGlobalTags bool `toml:"always_include_global_tags"`
// Flag to skip running processors after aggregators
// By default, processors are run a second time after aggregators. Changing
// this setting to true will skip the second run of processors.
SkipProcessorsAfterAggregators bool `toml:"skip_processors_after_aggregators"`
}
// InputNames returns a list of strings of the configured inputs.