diff --git a/agent/agent.go b/agent/agent.go index e7ffee322..1ac5f2b0b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -793,7 +793,7 @@ func (a *Agent) runOutputs( func (a *Agent) flushLoop( ctx context.Context, output *models.RunningOutput, - ticker Ticker, + ticker *RollingTicker, ) { logError := func(err error) { if err != nil { @@ -822,15 +822,11 @@ func (a *Agent) flushLoop( case <-ticker.Elapsed(): logError(a.flushOnce(output, ticker, output.Write)) case <-flushRequested: + ticker.Reset() logError(a.flushOnce(output, ticker, output.Write)) case <-output.BatchReady: - // Favor the ticker over batch ready - select { - case <-ticker.Elapsed(): - logError(a.flushOnce(output, ticker, output.Write)) - default: - logError(a.flushOnce(output, ticker, output.WriteBatch)) - } + ticker.Reset() + logError(a.flushOnce(output, ticker, output.WriteBatch)) } } } diff --git a/agent/tick.go b/agent/tick.go index 91b99712a..6afef2fa7 100644 --- a/agent/tick.go +++ b/agent/tick.go @@ -216,6 +216,7 @@ type RollingTicker struct { ch chan time.Time cancel context.CancelFunc wg sync.WaitGroup + timer *clock.Timer } func NewRollingTicker(interval, jitter time.Duration) *RollingTicker { @@ -232,12 +233,12 @@ func newRollingTicker(interval, jitter time.Duration, clock clock.Clock) *Rollin } d := t.next() - timer := clock.Timer(d) + t.timer = clock.Timer(d) t.wg.Add(1) go func() { defer t.wg.Done() - t.run(ctx, timer) + t.run(ctx) }() return t @@ -247,24 +248,28 @@ func (t *RollingTicker) next() time.Duration { return t.interval + internal.RandomDuration(t.jitter) } -func (t *RollingTicker) run(ctx context.Context, timer *clock.Timer) { +func (t *RollingTicker) run(ctx context.Context) { for { select { case <-ctx.Done(): - timer.Stop() + t.timer.Stop() return - case now := <-timer.C: + case now := <-t.timer.C: select { case t.ch <- now: default: } - d := t.next() - timer.Reset(d) + t.Reset() } } } +// Reset the ticker to the next interval + jitter. +func (t *RollingTicker) Reset() { + t.timer.Reset(t.next()) +} + func (t *RollingTicker) Elapsed() <-chan time.Time { return t.ch }