Log after interval has elapsed; skip short intervals (#7854)

This commit is contained in:
Daniel Nelson 2020-07-17 13:46:07 -07:00 committed by GitHub
parent 0bcc515879
commit 38c01b498d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 13 deletions

View File

@ -323,7 +323,7 @@ func (a *Agent) runInputs(
wg.Add(1) wg.Add(1)
go func(input *models.RunningInput) { go func(input *models.RunningInput) {
defer wg.Done() defer wg.Done()
a.gatherLoop(ctx, acc, input, ticker) a.gatherLoop(ctx, acc, input, ticker, interval)
}(input) }(input)
} }
@ -454,13 +454,14 @@ func (a *Agent) gatherLoop(
acc telegraf.Accumulator, acc telegraf.Accumulator,
input *models.RunningInput, input *models.RunningInput,
ticker Ticker, ticker Ticker,
interval time.Duration,
) { ) {
defer panicRecover(input) defer panicRecover(input)
for { for {
select { select {
case <-ticker.Elapsed(): case <-ticker.Elapsed():
err := a.gatherOnce(acc, input, ticker) err := a.gatherOnce(acc, input, ticker, interval)
if err != nil { if err != nil {
acc.AddError(err) acc.AddError(err)
} }
@ -476,18 +477,28 @@ func (a *Agent) gatherOnce(
acc telegraf.Accumulator, acc telegraf.Accumulator,
input *models.RunningInput, input *models.RunningInput,
ticker Ticker, ticker Ticker,
interval time.Duration,
) error { ) error {
done := make(chan error) done := make(chan error)
go func() { go func() {
done <- input.Gather(acc) done <- input.Gather(acc)
}() }()
// Only warn after interval seconds, even if the interval is started late.
// Intervals can start late if the previous interval went over or due to
// clock changes.
slowWarning := time.NewTicker(interval)
defer slowWarning.Stop()
for { for {
select { select {
case err := <-done: case err := <-done:
return err return err
case <-slowWarning.C:
log.Printf("W! [%s] Collection took longer than expected; not complete after interval of %s",
input.LogName(), interval)
case <-ticker.Elapsed(): case <-ticker.Elapsed():
log.Printf("W! [agent] [%s] did not complete within its interval", log.Printf("D! [%s] Previous collection has not completed; scheduled collection skipped",
input.LogName()) input.LogName())
} }
} }

View File

@ -31,11 +31,12 @@ type Ticker interface {
// no maximum sleep, when using large intervals alignment is not corrected // no maximum sleep, when using large intervals alignment is not corrected
// until the next tick. // until the next tick.
type AlignedTicker struct { type AlignedTicker struct {
interval time.Duration interval time.Duration
jitter time.Duration jitter time.Duration
ch chan time.Time minInterval time.Duration
cancel context.CancelFunc ch chan time.Time
wg sync.WaitGroup cancel context.CancelFunc
wg sync.WaitGroup
} }
func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTicker { func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTicker {
@ -45,10 +46,11 @@ func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTic
func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock.Clock) *AlignedTicker { func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock.Clock) *AlignedTicker {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t := &AlignedTicker{ t := &AlignedTicker{
interval: interval, interval: interval,
jitter: jitter, jitter: jitter,
ch: make(chan time.Time, 1), minInterval: interval / 100,
cancel: cancel, ch: make(chan time.Time, 1),
cancel: cancel,
} }
d := t.next(now) d := t.next(now)
@ -64,7 +66,12 @@ func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock
} }
func (t *AlignedTicker) next(now time.Time) time.Duration { func (t *AlignedTicker) next(now time.Time) time.Duration {
next := internal.AlignTime(now, t.interval) // Add minimum interval size to avoid scheduling an interval that is
// exceptionally short. This avoids an issue that can occur where the
// previous interval ends slightly early due to very minor clock changes.
next := now.Add(t.minInterval)
next = internal.AlignTime(next, t.interval)
d := next.Sub(now) d := next.Sub(now)
if d == 0 { if d == 0 {
d = t.interval d = t.interval