fix(agent): add flushBatch method (#11615)
This commit is contained in:
parent
93121f3894
commit
0481a78ec6
|
|
@ -824,19 +824,13 @@ func (a *Agent) flushLoop(
|
||||||
case <-flushRequested:
|
case <-flushRequested:
|
||||||
logError(a.flushOnce(output, ticker, output.Write))
|
logError(a.flushOnce(output, ticker, output.Write))
|
||||||
case <-output.BatchReady:
|
case <-output.BatchReady:
|
||||||
// Favor the ticker over batch ready
|
logError(a.flushBatch(output, output.WriteBatch))
|
||||||
select {
|
|
||||||
case <-ticker.Elapsed():
|
|
||||||
logError(a.flushOnce(output, ticker, output.Write))
|
|
||||||
default:
|
|
||||||
logError(a.flushOnce(output, ticker, output.WriteBatch))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// flushOnce runs the output's Write function once, logging a warning each
|
// flushOnce runs the output's Write function once, logging a warning each
|
||||||
// interval it fails to complete before.
|
// interval it fails to complete before the flush interval elapses.
|
||||||
func (a *Agent) flushOnce(
|
func (a *Agent) flushOnce(
|
||||||
output *models.RunningOutput,
|
output *models.RunningOutput,
|
||||||
ticker Ticker,
|
ticker Ticker,
|
||||||
|
|
@ -860,6 +854,17 @@ func (a *Agent) flushOnce(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// flushBatch runs the output's Write function once Unlike flushOnce the
|
||||||
|
// interval elapsing is not considered during these flushes.
|
||||||
|
func (a *Agent) flushBatch(
|
||||||
|
output *models.RunningOutput,
|
||||||
|
writeFunc func() error,
|
||||||
|
) error {
|
||||||
|
err := writeFunc()
|
||||||
|
output.LogBufferStatus()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Test runs the inputs, processors and aggregators for a single gather and
|
// Test runs the inputs, processors and aggregators for a single gather and
|
||||||
// writes the metrics to stdout.
|
// writes the metrics to stdout.
|
||||||
func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
|
func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue