Reset the flush interval timer when flush is requested or batch is ready. (#8953)

* Reset the flush interval timer when flush is requested or batch is ready, so that timer doesn't expire while one of those flushes is occurring.

* Update tick.go
This commit is contained in:
David Bennett 2021-03-09 14:35:18 -05:00 committed by GitHub
parent 380911ffb3
commit a6d2c4f254
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 15 deletions

View File

@ -793,7 +793,7 @@ func (a *Agent) runOutputs(
func (a *Agent) flushLoop( func (a *Agent) flushLoop(
ctx context.Context, ctx context.Context,
output *models.RunningOutput, output *models.RunningOutput,
ticker Ticker, ticker *RollingTicker,
) { ) {
logError := func(err error) { logError := func(err error) {
if err != nil { if err != nil {
@ -822,15 +822,11 @@ func (a *Agent) flushLoop(
case <-ticker.Elapsed(): case <-ticker.Elapsed():
logError(a.flushOnce(output, ticker, output.Write)) logError(a.flushOnce(output, ticker, output.Write))
case <-flushRequested: case <-flushRequested:
ticker.Reset()
logError(a.flushOnce(output, ticker, output.Write)) logError(a.flushOnce(output, ticker, output.Write))
case <-output.BatchReady: case <-output.BatchReady:
// Favor the ticker over batch ready ticker.Reset()
select { logError(a.flushOnce(output, ticker, output.WriteBatch))
case <-ticker.Elapsed():
logError(a.flushOnce(output, ticker, output.Write))
default:
logError(a.flushOnce(output, ticker, output.WriteBatch))
}
} }
} }
} }

View File

@ -216,6 +216,7 @@ type RollingTicker struct {
ch chan time.Time ch chan time.Time
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
timer *clock.Timer
} }
func NewRollingTicker(interval, jitter time.Duration) *RollingTicker { func NewRollingTicker(interval, jitter time.Duration) *RollingTicker {
@ -232,12 +233,12 @@ func newRollingTicker(interval, jitter time.Duration, clock clock.Clock) *Rollin
} }
d := t.next() d := t.next()
timer := clock.Timer(d) t.timer = clock.Timer(d)
t.wg.Add(1) t.wg.Add(1)
go func() { go func() {
defer t.wg.Done() defer t.wg.Done()
t.run(ctx, timer) t.run(ctx)
}() }()
return t return t
@ -247,24 +248,28 @@ func (t *RollingTicker) next() time.Duration {
return t.interval + internal.RandomDuration(t.jitter) return t.interval + internal.RandomDuration(t.jitter)
} }
func (t *RollingTicker) run(ctx context.Context, timer *clock.Timer) { func (t *RollingTicker) run(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
timer.Stop() t.timer.Stop()
return return
case now := <-timer.C: case now := <-t.timer.C:
select { select {
case t.ch <- now: case t.ch <- now:
default: default:
} }
d := t.next() t.Reset()
timer.Reset(d)
} }
} }
} }
// Reset the ticker to the next interval + jitter.
func (t *RollingTicker) Reset() {
t.timer.Reset(t.next())
}
func (t *RollingTicker) Elapsed() <-chan time.Time { func (t *RollingTicker) Elapsed() <-chan time.Time {
return t.ch return t.ch
} }