diff --git a/agent/agent.go b/agent/agent.go index 2ff474dbd..f7ae32c95 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -323,7 +323,7 @@ func (a *Agent) runInputs( wg.Add(1) go func(input *models.RunningInput) { defer wg.Done() - a.gatherLoop(ctx, acc, input, ticker) + a.gatherLoop(ctx, acc, input, ticker, interval) }(input) } @@ -454,13 +454,14 @@ func (a *Agent) gatherLoop( acc telegraf.Accumulator, input *models.RunningInput, ticker Ticker, + interval time.Duration, ) { defer panicRecover(input) for { select { case <-ticker.Elapsed(): - err := a.gatherOnce(acc, input, ticker) + err := a.gatherOnce(acc, input, ticker, interval) if err != nil { acc.AddError(err) } @@ -476,18 +477,28 @@ func (a *Agent) gatherOnce( acc telegraf.Accumulator, input *models.RunningInput, ticker Ticker, + interval time.Duration, ) error { done := make(chan error) go func() { done <- input.Gather(acc) }() + // Only warn after interval seconds, even if the interval is started late. + // Intervals can start late if the previous interval went over or due to + // clock changes. + slowWarning := time.NewTicker(interval) + defer slowWarning.Stop() + for { select { case err := <-done: return err + case <-slowWarning.C: + log.Printf("W! [%s] Collection took longer than expected; not complete after interval of %s", + input.LogName(), interval) case <-ticker.Elapsed(): - log.Printf("W! [agent] [%s] did not complete within its interval", + log.Printf("D! [%s] Previous collection has not completed; scheduled collection skipped", input.LogName()) } } diff --git a/agent/tick.go b/agent/tick.go index 93e3a3d76..91b99712a 100644 --- a/agent/tick.go +++ b/agent/tick.go @@ -31,11 +31,12 @@ type Ticker interface { // no maximum sleep, when using large intervals alignment is not corrected // until the next tick. type AlignedTicker struct { - interval time.Duration - jitter time.Duration - ch chan time.Time - cancel context.CancelFunc - wg sync.WaitGroup + interval time.Duration + jitter time.Duration + minInterval time.Duration + ch chan time.Time + cancel context.CancelFunc + wg sync.WaitGroup } func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTicker { @@ -45,10 +46,11 @@ func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTic func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock.Clock) *AlignedTicker { ctx, cancel := context.WithCancel(context.Background()) t := &AlignedTicker{ - interval: interval, - jitter: jitter, - ch: make(chan time.Time, 1), - cancel: cancel, + interval: interval, + jitter: jitter, + minInterval: interval / 100, + ch: make(chan time.Time, 1), + cancel: cancel, } d := t.next(now) @@ -64,7 +66,12 @@ func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock } func (t *AlignedTicker) next(now time.Time) time.Duration { - next := internal.AlignTime(now, t.interval) + // Add minimum interval size to avoid scheduling an interval that is + // exceptionally short. This avoids an issue that can occur where the + // previous interval ends slightly early due to very minor clock changes. + next := now.Add(t.minInterval) + + next = internal.AlignTime(next, t.interval) d := next.Sub(now) if d == 0 { d = t.interval