From ab293e853c55a8a5e80fcec6aa450d2b6cdee7cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20=C5=BBak?= Date: Fri, 14 Oct 2022 16:48:47 +0200 Subject: [PATCH] fix(regression): Fixes problem with metrics not exposed by plugins. (#12016) --- agent/agent.go | 93 ++++++++++++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index aca1d4656..43670dafc 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -280,10 +280,50 @@ func (a *Agent) runInputs( unit *inputUnit, ) { var wg sync.WaitGroup + tickers := make([]Ticker, len(unit.inputs)) for _, input := range unit.inputs { - a.runInput(ctx, startTime, unit, input, &wg) - } + // Overwrite agent interval if this plugin has its own. + interval := time.Duration(a.Config.Agent.Interval) + if input.Config.Interval != 0 { + interval = input.Config.Interval + } + // Overwrite agent precision if this plugin has its own. + precision := time.Duration(a.Config.Agent.Precision) + if input.Config.Precision != 0 { + precision = input.Config.Precision + } + + // Overwrite agent collection_jitter if this plugin has its own. + jitter := time.Duration(a.Config.Agent.CollectionJitter) + if input.Config.CollectionJitter != 0 { + jitter = input.Config.CollectionJitter + } + + // Overwrite agent collection_offset if this plugin has its own. + offset := time.Duration(a.Config.Agent.CollectionOffset) + if input.Config.CollectionOffset != 0 { + offset = input.Config.CollectionOffset + } + + var ticker Ticker + if a.Config.Agent.RoundInterval { + ticker = NewAlignedTicker(startTime, interval, jitter, offset) + } else { + ticker = NewUnalignedTicker(interval, jitter, offset) + } + tickers = append(tickers, ticker) + + acc := NewAccumulator(input, unit.dst) + acc.SetPrecision(getPrecision(precision, interval)) + + wg.Add(1) + go func(input *models.RunningInput) { + defer wg.Done() + a.gatherLoop(ctx, acc, input, ticker, interval) + }(input) + } + defer stopTickers(tickers) wg.Wait() log.Printf("D! [agent] Stopping service inputs") @@ -293,49 +333,6 @@ func (a *Agent) runInputs( log.Printf("D! [agent] Input channel closed") } -func (a *Agent) runInput(ctx context.Context, startTime time.Time, unit *inputUnit, input *models.RunningInput, wg *sync.WaitGroup) { - // Overwrite agent interval if this plugin has its own. - interval := time.Duration(a.Config.Agent.Interval) - if input.Config.Interval != 0 { - interval = input.Config.Interval - } - - // Overwrite agent precision if this plugin has its own. - precision := time.Duration(a.Config.Agent.Precision) - if input.Config.Precision != 0 { - precision = input.Config.Precision - } - - // Overwrite agent collection_jitter if this plugin has its own. - jitter := time.Duration(a.Config.Agent.CollectionJitter) - if input.Config.CollectionJitter != 0 { - jitter = input.Config.CollectionJitter - } - - // Overwrite agent collection_offset if this plugin has its own. - offset := time.Duration(a.Config.Agent.CollectionOffset) - if input.Config.CollectionOffset != 0 { - offset = input.Config.CollectionOffset - } - - var ticker Ticker - if a.Config.Agent.RoundInterval { - ticker = NewAlignedTicker(startTime, interval, jitter, offset) - } else { - ticker = NewUnalignedTicker(interval, jitter, offset) - } - defer ticker.Stop() - - acc := NewAccumulator(input, unit.dst) - acc.SetPrecision(getPrecision(precision, interval)) - - wg.Add(1) - go func(input *models.RunningInput) { - defer wg.Done() - a.gatherLoop(ctx, acc, input, ticker, interval) - }(input) -} - // testStartInputs is a variation of startInputs for use in --test and --once // mode. It differs by logging Start errors and returning only plugins // successfully started. @@ -1110,3 +1107,9 @@ func panicRecover(input *models.RunningInput) { "https://github.com/influxdata/telegraf/issues/new/choose") } } + +func stopTickers(tickers []Ticker) { + for _, ticker := range tickers { + ticker.Stop() + } +}