fix: Revert "Reset the flush interval timer when flush is requested or batch is ready. (#8953)" (#9800)

This reverts commit a6d2c4f254.
This commit is contained in:
Joshua Powers 2021-09-30 10:28:48 -06:00 committed by GitHub
parent 11193a3b4c
commit 70afc94d12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 16 deletions

View File

@ -775,7 +775,7 @@ func (a *Agent) runOutputs(
func (a *Agent) flushLoop(
ctx context.Context,
output *models.RunningOutput,
ticker *RollingTicker,
ticker Ticker,
) {
logError := func(err error) {
if err != nil {
@ -804,11 +804,15 @@ func (a *Agent) flushLoop(
case <-ticker.Elapsed():
logError(a.flushOnce(output, ticker, output.Write))
case <-flushRequested:
ticker.Reset()
logError(a.flushOnce(output, ticker, output.Write))
case <-output.BatchReady:
ticker.Reset()
logError(a.flushOnce(output, ticker, output.WriteBatch))
// Favor the ticker over batch ready
select {
case <-ticker.Elapsed():
logError(a.flushOnce(output, ticker, output.Write))
default:
logError(a.flushOnce(output, ticker, output.WriteBatch))
}
}
}
}

View File

@ -214,7 +214,6 @@ type RollingTicker struct {
ch chan time.Time
cancel context.CancelFunc
wg sync.WaitGroup
timer *clock.Timer
}
func NewRollingTicker(interval, jitter time.Duration) *RollingTicker {
@ -231,12 +230,12 @@ func newRollingTicker(interval, jitter time.Duration, clock clock.Clock) *Rollin
}
d := t.next()
t.timer = clock.Timer(d)
timer := clock.Timer(d)
t.wg.Add(1)
go func() {
defer t.wg.Done()
t.run(ctx)
t.run(ctx, timer)
}()
return t
@ -246,28 +245,24 @@ func (t *RollingTicker) next() time.Duration {
return t.interval + internal.RandomDuration(t.jitter)
}
func (t *RollingTicker) run(ctx context.Context) {
func (t *RollingTicker) run(ctx context.Context, timer *clock.Timer) {
for {
select {
case <-ctx.Done():
t.timer.Stop()
timer.Stop()
return
case now := <-t.timer.C:
case now := <-timer.C:
select {
case t.ch <- now:
default:
}
t.Reset()
d := t.next()
timer.Reset(d)
}
}
}
// Reset the ticker to the next interval + jitter.
func (t *RollingTicker) Reset() {
t.timer.Reset(t.next())
}
func (t *RollingTicker) Elapsed() <-chan time.Time {
return t.ch
}