chore(agent): Add warning about changing default for 'skip_processors_after_aggregators' (#16302)

This commit is contained in:
Sven Rebhan 2024-12-13 17:46:37 +01:00 committed by GitHub
parent 36553aec1e
commit e15a3c8dc6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 43 additions and 5 deletions

View File

@ -1,6 +1,16 @@
<!-- markdownlint-disable MD024 -->
# Changelog
## Unreleased
### Important Changes
- The default value of `skip_processors_after_aggregators` will change to `true`
with Telegraf `v1.40.0`, skip running the processors again after aggregators!
If you need the current default behavior, please explicitly set the option to
`false`! To silence the warning and use the future default behavior, please
explicitly set the option to `true`.
## v1.33.0 [2024-12-09]
### New Plugins

View File

@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/fatih/color"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
@ -106,6 +107,15 @@ func (a *Agent) Run(ctx context.Context) error {
time.Duration(a.Config.Agent.Interval), a.Config.Agent.Quiet,
a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval))
// Set the default for processor skipping
if a.Config.Agent.SkipProcessorsAfterAggregators == nil {
msg := `The default value of 'skip_processors_after_aggregators' will change to 'true' with Telegraf v1.40.0! `
msg += `If you need the current default behavior, please explicitly set the option to 'false'!`
log.Print("W! [agent] ", color.YellowString(msg))
skipProcessorsAfterAggregators := false
a.Config.Agent.SkipProcessorsAfterAggregators = &skipProcessorsAfterAggregators
}
log.Printf("D! [agent] Initializing plugins")
if err := a.InitPlugins(); err != nil {
return err
@ -136,7 +146,7 @@ func (a *Agent) Run(ctx context.Context) error {
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
aggC := next
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
if len(a.Config.AggProcessors) != 0 && !*a.Config.Agent.SkipProcessorsAfterAggregators {
aggC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err
@ -231,7 +241,7 @@ func (a *Agent) InitPlugins() error {
return fmt.Errorf("could not initialize aggregator %s: %w", aggregator.LogName(), err)
}
}
if !a.Config.Agent.SkipProcessorsAfterAggregators {
if !*a.Config.Agent.SkipProcessorsAfterAggregators {
for _, processor := range a.Config.AggProcessors {
err := processor.Init()
if err != nil {
@ -1000,6 +1010,15 @@ func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
// outputC. After gathering pauses for the wait duration to allow service
// inputs to run.
func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<- telegraf.Metric) error {
// Set the default for processor skipping
if a.Config.Agent.SkipProcessorsAfterAggregators == nil {
msg := `The default value of 'skip_processors_after_aggregators' will change to 'true' with Telegraf v1.40.0! `
msg += `If you need the current default behavior, please explicitly set the option to 'false'!`
log.Print("W! [agent] ", color.YellowString(msg))
skipProcessorsAfterAggregators := false
a.Config.Agent.SkipProcessorsAfterAggregators = &skipProcessorsAfterAggregators
}
log.Printf("D! [agent] Initializing plugins")
if err := a.InitPlugins(); err != nil {
return err
@ -1013,7 +1032,7 @@ func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<-
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
procC := next
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
if len(a.Config.AggProcessors) != 0 && !*a.Config.Agent.SkipProcessorsAfterAggregators {
var err error
procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
@ -1096,6 +1115,15 @@ func (a *Agent) Once(ctx context.Context, wait time.Duration) error {
// outputC. After gathering pauses for the wait duration to allow service
// inputs to run.
func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error {
// Set the default for processor skipping
if a.Config.Agent.SkipProcessorsAfterAggregators == nil {
msg := `The default value of 'skip_processors_after_aggregators' will change to 'true' with Telegraf v1.40.0! `
msg += `If you need the current default behavior, please explicitly set the option to 'false'!`
log.Print("W! [agent] ", color.YellowString(msg))
skipProcessorsAfterAggregators := false
a.Config.Agent.SkipProcessorsAfterAggregators = &skipProcessorsAfterAggregators
}
log.Printf("D! [agent] Initializing plugins")
if err := a.InitPlugins(); err != nil {
return err
@ -1113,7 +1141,7 @@ func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error {
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
procC := next
if len(a.Config.AggProcessors) != 0 && !a.Config.Agent.SkipProcessorsAfterAggregators {
if len(a.Config.AggProcessors) != 0 && !*a.Config.Agent.SkipProcessorsAfterAggregators {
procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
if err != nil {
return err

View File

@ -279,7 +279,7 @@ type AgentConfig struct {
// Flag to skip running processors after aggregators
// By default, processors are run a second time after aggregators. Changing
// this setting to true will skip the second run of processors.
SkipProcessorsAfterAggregators bool `toml:"skip_processors_after_aggregators"`
SkipProcessorsAfterAggregators *bool `toml:"skip_processors_after_aggregators"`
// Number of attempts to obtain a remote configuration via a URL during
// startup. Set to -1 for unlimited attempts.