feat(outputs): Implement partial write errors (#16146)

This commit is contained in:
Sven Rebhan 2024-12-04 21:55:11 +01:00 committed by GitHub
parent b12eb5a60e
commit 0ea4c1422e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 427 additions and 223 deletions

View File

@ -37,3 +37,23 @@ func (e *FatalError) Error() string {
func (e *FatalError) Unwrap() error {
return e.Err
}
// PartialWriteError indicate that only a subset of the metrics were written
// successfully (i.e. accepted). The rejected metrics should be removed from
// the buffer without being successfully written. Please note: the metrics
// are specified as indices into the batch to be able to reference tracking
// metrics correctly.
type PartialWriteError struct {
Err error
MetricsAccept []int
MetricsReject []int
MetricsRejectErrors []error
}
func (e *PartialWriteError) Error() string {
return e.Err.Error()
}
func (e *PartialWriteError) Unwrap() error {
return e.Err
}

View File

@ -11,11 +11,56 @@ import (
var (
AgentMetricsWritten = selfstat.Register("agent", "metrics_written", make(map[string]string))
AgentMetricsRejected = selfstat.Register("agent", "metrics_rejected", make(map[string]string))
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", make(map[string]string))
registerGob = sync.OnceFunc(func() { metric.Init() })
)
type Transaction struct {
// Batch of metrics to write
Batch []telegraf.Metric
// Accept denotes the indices of metrics that were successfully written
Accept []int
// Reject denotes the indices of metrics that were not written but should
// not be requeued
Reject []int
// Marks this transaction as valid
valid bool
// Internal state that can be used by the buffer implementation
state interface{}
}
func (tx *Transaction) AcceptAll() {
tx.Accept = make([]int, len(tx.Batch))
for i := range tx.Batch {
tx.Accept[i] = i
}
}
func (tx *Transaction) KeepAll() {}
func (tx *Transaction) InferKeep() []int {
used := make([]bool, len(tx.Batch))
for _, idx := range tx.Accept {
used[idx] = true
}
for _, idx := range tx.Reject {
used[idx] = true
}
keep := make([]int, 0, len(tx.Batch))
for i := range tx.Batch {
if !used[i] {
keep = append(keep, i)
}
}
return keep
}
type Buffer interface {
// Len returns the number of metrics currently in the buffer.
Len() int
@ -23,19 +68,15 @@ type Buffer interface {
// Add adds metrics to the buffer and returns number of dropped metrics.
Add(metrics ...telegraf.Metric) int
// Batch returns a slice containing up to batchSize of the oldest metrics not
// yet dropped. Metrics are ordered from oldest to newest in the batch. The
// batch must not be modified by the client.
Batch(batchSize int) []telegraf.Metric
// Batch starts a transaction by returning a slice of metrics up to the
// given batch-size starting from the oldest metric in the buffer. Metrics
// are ordered from oldest to newest and must not be modified by the plugin.
BeginTransaction(batchSize int) *Transaction
// Accept marks the batch, acquired from Batch(), as successfully written.
Accept(metrics []telegraf.Metric)
// Flush ends a metric and persists the buffer state
EndTransaction(*Transaction)
// Reject returns the batch, acquired from Batch(), to the buffer and marks it
// as unsent.
Reject([]telegraf.Metric)
// Stats returns the buffer statistics such as rejected, dropped and accepred metrics
// Stats returns the buffer statistics such as rejected, dropped and accepted metrics
Stats() BufferStats
// Close finalizes the buffer and closes all open resources
@ -47,6 +88,7 @@ type Buffer interface {
type BufferStats struct {
MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
MetricsRejected selfstat.Stat
MetricsDropped selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
@ -84,6 +126,11 @@ func NewBufferStats(name, alias string, capacity int) BufferStats {
"metrics_written",
tags,
),
MetricsRejected: selfstat.Register(
"write",
"metrics_rejected",
tags,
),
MetricsDropped: selfstat.Register(
"write",
"metrics_dropped",
@ -115,6 +162,12 @@ func (b *BufferStats) metricWritten(m telegraf.Metric) {
m.Accept()
}
func (b *BufferStats) metricRejected(m telegraf.Metric) {
AgentMetricsRejected.Incr(1)
b.MetricsRejected.Incr(1)
m.Reject()
}
func (b *BufferStats) metricDropped(m telegraf.Metric) {
AgentMetricsDropped.Incr(1)
b.MetricsDropped.Incr(1)

View File

@ -5,6 +5,8 @@ import (
"fmt"
"log"
"path/filepath"
"slices"
"sort"
"sync"
"github.com/tidwall/wal"
@ -31,6 +33,11 @@ type DiskBuffer struct {
// we have to do our best and track that the walfile "should" be empty, so that next
// write, we can remove the invalid entry (also skipping this entry if it is being read).
isEmpty bool
// The mask contains offsets of metric already removed during a previous
// transaction. Metrics at those offsets should not be contained in new
// batches.
mask []int
}
func NewDiskBuffer(name, id, path string, stats BufferStats) (*DiskBuffer, error) {
@ -67,7 +74,11 @@ func (b *DiskBuffer) length() int {
if b.isEmpty {
return 0
}
// Special case for when the read index is zero, it must be empty (otherwise it would be >= 1)
return b.entries() - len(b.mask)
}
func (b *DiskBuffer) entries() int {
if b.readIndex() == 0 {
return 0
}
@ -121,28 +132,33 @@ func (b *DiskBuffer) addSingleMetric(m telegraf.Metric) bool {
return false
}
func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
func (b *DiskBuffer) BeginTransaction(batchSize int) *Transaction {
b.Lock()
defer b.Unlock()
if b.length() == 0 {
// no metrics in the wal file, so return an empty array
return make([]telegraf.Metric, 0)
return &Transaction{}
}
b.batchFirst = b.readIndex()
var metrics []telegraf.Metric
b.batchSize = 0
metrics := make([]telegraf.Metric, 0, batchSize)
offsets := make([]int, 0, batchSize)
readIndex := b.batchFirst
endIndex := b.writeIndex()
offset := 0
for batchSize > 0 && readIndex < endIndex {
data, err := b.file.Read(readIndex)
if err != nil {
panic(err)
}
readIndex++
offset++
m, err := metric.FromBytes(data)
if slices.Contains(b.mask, offset) {
// Metric is masked by a previous write and is scheduled for removal
continue
}
// Validate that a tracking metric is from this instance of telegraf and skip ones from older instances.
// A tracking metric can be skipped here because metric.Accept() is only called once data is successfully
@ -152,11 +168,12 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
// - ErrSkipTracking: means that the tracking information was unable to be found for a tracking ID.
// - Outside of range: means that the metric was guaranteed to be left over from the previous instance
// as it was here when we opened the wal file in this instance.
m, err := metric.FromBytes(data)
if err != nil {
if errors.Is(err, metric.ErrSkipTracking) {
// could not look up tracking information for metric, skip
continue
}
if err != nil {
// non-recoverable error in deserialization, abort
log.Printf("E! raw metric data: %v", data)
panic(err)
@ -167,31 +184,80 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
}
metrics = append(metrics, m)
offsets = append(offsets, offset)
b.batchSize++
batchSize--
}
return metrics
return &Transaction{Batch: metrics, valid: true, state: offsets}
}
func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
func (b *DiskBuffer) EndTransaction(tx *Transaction) {
if len(tx.Batch) == 0 {
return
}
// Ignore invalid transactions and make sure they can only be finished once
if !tx.valid {
return
}
tx.valid = false
// Get the metric offsets from the transaction
offsets := tx.state.([]int)
b.Lock()
defer b.Unlock()
if b.batchSize == 0 || len(batch) == 0 {
// nothing to accept
// Mark metrics which should be removed in the internal mask
remove := make([]int, 0, len(tx.Accept)+len(tx.Reject))
for _, idx := range tx.Accept {
b.metricWritten(tx.Batch[idx])
remove = append(remove, offsets[idx])
}
for _, idx := range tx.Reject {
b.metricRejected(tx.Batch[idx])
remove = append(remove, offsets[idx])
}
b.mask = append(b.mask, remove...)
sort.Ints(b.mask)
// Remove the metrics that are marked for removal from the front of the
// WAL file. All other metrics must be kept.
if len(b.mask) == 0 || b.mask[0] != 0 {
// Mask is empty or the first index is not the front of the file, so
// exit early as there is nothing to remove
return
}
for _, m := range batch {
b.metricWritten(m)
// Determine up to which index we can remove the entries from the WAL file
var removeIdx int
for i, offset := range b.mask {
if offset != i {
break
}
if b.length() == len(batch) {
b.emptyFile()
} else {
err := b.file.TruncateFront(b.batchFirst + uint64(len(batch)))
if err != nil {
log.Printf("E! batch length: %d, batchFirst: %d, batchSize: %d", len(batch), b.batchFirst, b.batchSize)
removeIdx = offset
}
// Remove the metrics in front from the WAL file
b.isEmpty = b.entries()-removeIdx-1 <= 0
if b.isEmpty {
// WAL files cannot be fully empty but need to contain at least one
// item to not throw an error
if err := b.file.TruncateFront(b.writeIndex()); err != nil {
log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize)
panic(err)
}
} else {
if err := b.file.TruncateFront(b.batchFirst + uint64(removeIdx+1)); err != nil {
log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize)
panic(err)
}
}
// Truncate the mask and update the relative offsets
b.mask = b.mask[:removeIdx]
for i := range b.mask {
b.mask[i] -= removeIdx
}
// check if the original end index is still valid, clear if not
@ -203,14 +269,6 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
b.BufferSize.Set(int64(b.length()))
}
func (b *DiskBuffer) Reject(_ []telegraf.Metric) {
// very little to do here as the disk buffer retains metrics in
// the wal file until a call to accept
b.Lock()
defer b.Unlock()
b.resetBatch()
}
func (b *DiskBuffer) Stats() BufferStats {
return b.BufferStats
}
@ -238,14 +296,3 @@ func (b *DiskBuffer) handleEmptyFile() {
}
b.isEmpty = false
}
func (b *DiskBuffer) emptyFile() {
if b.isEmpty || b.length() == 0 {
return
}
if err := b.file.TruncateFront(b.writeIndex() - 1); err != nil {
log.Printf("E! writeIndex: %d, buffer len: %d", b.writeIndex(), b.length())
panic(err)
}
b.isEmpty = true
}

View File

@ -27,9 +27,9 @@ func TestDiskBufferRetainsTrackingInformation(t *testing.T) {
defer buf.Close()
buf.Add(mm)
batch := buf.Batch(1)
buf.Accept(batch)
tx := buf.BeginTransaction(1)
tx.AcceptAll()
buf.EndTransaction(tx)
require.Equal(t, 1, delivered)
}
@ -85,11 +85,11 @@ func TestDiskBufferTrackingDroppedFromOldWal(t *testing.T) {
buf.Stats().MetricsDropped.Set(0)
defer buf.Close()
batch := buf.Batch(4)
tx := buf.BeginTransaction(4)
// Check that the tracking metric is skipped
expected := []telegraf.Metric{
metrics[0], metrics[1], metrics[2], metrics[4],
}
testutil.RequireMetricsEqual(t, expected, batch)
testutil.RequireMetricsEqual(t, expected, tx.Batch)
}

View File

@ -51,67 +51,67 @@ func (b *MemoryBuffer) Add(metrics ...telegraf.Metric) int {
return dropped
}
func (b *MemoryBuffer) Batch(batchSize int) []telegraf.Metric {
func (b *MemoryBuffer) BeginTransaction(batchSize int) *Transaction {
b.Lock()
defer b.Unlock()
outLen := min(b.size, batchSize)
out := make([]telegraf.Metric, outLen)
if outLen == 0 {
return out
return &Transaction{}
}
b.batchFirst = b.first
b.batchSize = outLen
batchIndex := b.batchFirst
for i := range out {
out[i] = b.buf[batchIndex]
batch := make([]telegraf.Metric, outLen)
for i := range batch {
batch[i] = b.buf[batchIndex]
b.buf[batchIndex] = nil
batchIndex = b.next(batchIndex)
}
b.first = b.nextby(b.first, b.batchSize)
b.size -= outLen
return out
return &Transaction{Batch: batch, valid: true}
}
func (b *MemoryBuffer) Accept(batch []telegraf.Metric) {
func (b *MemoryBuffer) EndTransaction(tx *Transaction) {
b.Lock()
defer b.Unlock()
for _, m := range batch {
b.metricWritten(m)
}
b.resetBatch()
b.BufferSize.Set(int64(b.length()))
}
func (b *MemoryBuffer) Reject(batch []telegraf.Metric) {
b.Lock()
defer b.Unlock()
if len(batch) == 0 {
// Ignore invalid transactions and make sure they can only be finished once
if !tx.valid {
return
}
tx.valid = false
free := b.cap - b.size
restore := min(len(batch), free)
skip := len(batch) - restore
// Accept metrics
for _, idx := range tx.Accept {
b.metricWritten(tx.Batch[idx])
}
// Reject metrics
for _, idx := range tx.Reject {
b.metricRejected(tx.Batch[idx])
}
// Keep metrics
keep := tx.InferKeep()
if len(keep) > 0 {
restore := min(len(keep), b.cap-b.size)
b.first = b.prevby(b.first, restore)
b.size = min(b.size+restore, b.cap)
re := b.first
// Restore the metrics that fit into the buffer
current := b.first
for i := 0; i < restore; i++ {
b.buf[current] = tx.Batch[keep[i]]
current = b.next(current)
}
// Copy metrics from the batch back into the buffer
for i := range batch {
if i < skip {
b.metricDropped(batch[i])
} else {
b.buf[re] = batch[i]
re = b.next(re)
// Drop all remaining metrics
for i := restore; i < len(keep); i++ {
b.metricDropped(tx.Batch[keep[i]])
}
}

View File

@ -24,8 +24,9 @@ func TestMemoryBufferAcceptCallsMetricAccept(t *testing.T) {
},
}
buf.Add(mm, mm, mm)
batch := buf.Batch(2)
buf.Accept(batch)
tx := buf.BeginTransaction(2)
tx.AcceptAll()
buf.EndTransaction(tx)
require.Equal(t, 2, accept)
}

View File

@ -53,6 +53,7 @@ func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer {
s.Require().NoError(err)
buf.Stats().MetricsAdded.Set(0)
buf.Stats().MetricsWritten.Set(0)
buf.Stats().MetricsRejected.Set(0)
buf.Stats().MetricsDropped.Set(0)
return buf
}
@ -99,16 +100,16 @@ func (s *BufferSuiteTest) TestBufferBatchLenZero() {
buf := s.newTestBuffer(5)
defer buf.Close()
batch := buf.Batch(0)
s.Empty(batch)
tx := buf.BeginTransaction(0)
s.Empty(tx.Batch)
}
func (s *BufferSuiteTest) TestBufferBatchLenBufferEmpty() {
buf := s.newTestBuffer(5)
defer buf.Close()
batch := buf.Batch(2)
s.Empty(batch)
tx := buf.BeginTransaction(2)
s.Empty(tx.Batch)
}
func (s *BufferSuiteTest) TestBufferBatchLenUnderfill() {
@ -117,8 +118,8 @@ func (s *BufferSuiteTest) TestBufferBatchLenUnderfill() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m)
batch := buf.Batch(2)
s.Len(batch, 1)
tx := buf.BeginTransaction(2)
s.Len(tx.Batch, 1)
}
func (s *BufferSuiteTest) TestBufferBatchLenFill() {
@ -127,8 +128,8 @@ func (s *BufferSuiteTest) TestBufferBatchLenFill() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m)
batch := buf.Batch(2)
s.Len(batch, 2)
tx := buf.BeginTransaction(2)
s.Len(tx.Batch, 2)
}
func (s *BufferSuiteTest) TestBufferBatchLenExact() {
@ -137,8 +138,8 @@ func (s *BufferSuiteTest) TestBufferBatchLenExact() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m)
batch := buf.Batch(2)
s.Len(batch, 2)
tx := buf.BeginTransaction(2)
s.Len(tx.Batch, 2)
}
func (s *BufferSuiteTest) TestBufferBatchLenLargerThanBuffer() {
@ -147,8 +148,8 @@ func (s *BufferSuiteTest) TestBufferBatchLenLargerThanBuffer() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
batch := buf.Batch(6)
s.Len(batch, 5)
tx := buf.BeginTransaction(6)
s.Len(tx.Batch, 5)
}
func (s *BufferSuiteTest) TestBufferBatchWrap() {
@ -157,11 +158,12 @@ func (s *BufferSuiteTest) TestBufferBatchWrap() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
batch := buf.Batch(2)
buf.Accept(batch)
tx := buf.BeginTransaction(2)
tx.AcceptAll()
buf.EndTransaction(tx)
buf.Add(m, m)
batch = buf.Batch(5)
s.Len(batch, 5)
tx = buf.BeginTransaction(5)
s.Len(tx.Batch, 5)
}
func (s *BufferSuiteTest) TestBufferBatchLatest() {
@ -171,13 +173,13 @@ func (s *BufferSuiteTest) TestBufferBatchLatest() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
batch := buf.Batch(2)
tx := buf.BeginTransaction(2)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferBatchLatestWrap() {
@ -193,13 +195,13 @@ func (s *BufferSuiteTest) TestBufferBatchLatestWrap() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
batch := buf.Batch(2)
tx := buf.BeginTransaction(2)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferMultipleBatch() {
@ -212,7 +214,7 @@ func (s *BufferSuiteTest) TestBufferMultipleBatch() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
batch := buf.Batch(5)
tx := buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
@ -220,14 +222,16 @@ func (s *BufferSuiteTest) TestBufferMultipleBatch() {
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
}, batch)
buf.Accept(batch)
batch = buf.Batch(5)
}, tx.Batch)
tx.AcceptAll()
buf.EndTransaction(tx)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
}, batch)
buf.Accept(batch)
}, tx.Batch)
tx.AcceptAll()
buf.EndTransaction(tx)
}
func (s *BufferSuiteTest) TestBufferRejectWithRoom() {
@ -237,14 +241,15 @@ func (s *BufferSuiteTest) TestBufferRejectWithRoom() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
batch := buf.Batch(2)
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(0), buf.Stats().MetricsDropped.Get())
batch = buf.Batch(5)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
@ -252,7 +257,7 @@ func (s *BufferSuiteTest) TestBufferRejectWithRoom() {
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectNothingNewFull() {
@ -264,12 +269,13 @@ func (s *BufferSuiteTest) TestBufferRejectNothingNewFull() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
batch := buf.Batch(2)
buf.Reject(batch)
tx := buf.BeginTransaction(2)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(0), buf.Stats().MetricsDropped.Get())
batch = buf.Batch(5)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
@ -277,7 +283,7 @@ func (s *BufferSuiteTest) TestBufferRejectNothingNewFull() {
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectNoRoom() {
@ -291,18 +297,19 @@ func (s *BufferSuiteTest) TestBufferRejectNoRoom() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
batch := buf.Batch(2)
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(3), buf.Stats().MetricsDropped.Get())
batch = buf.Batch(5)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
@ -310,7 +317,7 @@ func (s *BufferSuiteTest) TestBufferRejectNoRoom() {
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectRoomExact() {
@ -319,16 +326,17 @@ func (s *BufferSuiteTest) TestBufferRejectRoomExact() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
batch := buf.Batch(2)
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(0), buf.Stats().MetricsDropped.Get())
batch = buf.Batch(5)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
@ -336,7 +344,7 @@ func (s *BufferSuiteTest) TestBufferRejectRoomExact() {
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectRoomOverwriteOld() {
@ -350,16 +358,17 @@ func (s *BufferSuiteTest) TestBufferRejectRoomOverwriteOld() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
batch := buf.Batch(1)
tx := buf.BeginTransaction(1)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(1), buf.Stats().MetricsDropped.Get())
batch = buf.Batch(5)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
@ -367,7 +376,7 @@ func (s *BufferSuiteTest) TestBufferRejectRoomOverwriteOld() {
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectPartialRoom() {
@ -381,16 +390,17 @@ func (s *BufferSuiteTest) TestBufferRejectPartialRoom() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
batch := buf.Batch(2)
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(2), buf.Stats().MetricsDropped.Get())
batch = buf.Batch(5)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
@ -398,7 +408,7 @@ func (s *BufferSuiteTest) TestBufferRejectPartialRoom() {
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() {
@ -412,7 +422,7 @@ func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
batch := buf.Batch(2)
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
@ -435,11 +445,12 @@ func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)))
// buffer: 13, 14, 15, 11, 12; batch: 2, 3
s.Equal(int64(8), buf.Stats().MetricsDropped.Get())
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(10), buf.Stats().MetricsDropped.Get())
batch = buf.Batch(5)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)),
@ -447,7 +458,7 @@ func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() {
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectWrapped() {
@ -467,16 +478,17 @@ func (s *BufferSuiteTest) TestBufferRejectWrapped() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
batch := buf.Batch(3)
tx := buf.BeginTransaction(3)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)))
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
batch = buf.Batch(5)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)),
@ -484,7 +496,7 @@ func (s *BufferSuiteTest) TestBufferRejectWrapped() {
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectAdjustFirst() {
@ -498,36 +510,39 @@ func (s *BufferSuiteTest) TestBufferRejectAdjustFirst() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
batch := buf.Batch(3)
tx := buf.BeginTransaction(3)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)))
batch = buf.Batch(3)
tx = buf.BeginTransaction(3)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)))
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)))
batch = buf.Batch(3)
tx = buf.BeginTransaction(3)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(16, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(17, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(18, 0)))
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(19, 0)))
batch = buf.Batch(10)
tx = buf.BeginTransaction(10)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)),
@ -540,7 +555,7 @@ func (s *BufferSuiteTest) TestBufferRejectAdjustFirst() {
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(17, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(18, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(19, 0)),
}, batch)
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferAddDropsOverwrittenMetrics() {
@ -565,8 +580,9 @@ func (s *BufferSuiteTest) TestBufferAcceptRemovesBatch() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m)
batch := buf.Batch(2)
buf.Accept(batch)
tx := buf.BeginTransaction(2)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(1, buf.Len())
}
@ -576,8 +592,9 @@ func (s *BufferSuiteTest) TestBufferRejectLeavesBatch() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m)
batch := buf.Batch(2)
buf.Reject(batch)
tx := buf.BeginTransaction(2)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(3, buf.Len())
}
@ -587,9 +604,10 @@ func (s *BufferSuiteTest) TestBufferAcceptWritesOverwrittenBatch() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
batch := buf.Batch(5)
tx := buf.BeginTransaction(5)
buf.Add(m, m, m, m, m)
buf.Accept(batch)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(int64(0), buf.Stats().MetricsDropped.Get())
s.Equal(int64(5), buf.Stats().MetricsWritten.Get())
@ -605,9 +623,10 @@ func (s *BufferSuiteTest) TestBufferBatchRejectDropsOverwrittenBatch() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
batch := buf.Batch(5)
tx := buf.BeginTransaction(5)
buf.Add(m, m, m, m, m)
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(5), buf.Stats().MetricsDropped.Get())
s.Equal(int64(0), buf.Stats().MetricsWritten.Get())
@ -619,9 +638,10 @@ func (s *BufferSuiteTest) TestBufferMetricsOverwriteBatchAccept() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
batch := buf.Batch(3)
tx := buf.BeginTransaction(3)
buf.Add(m, m, m)
buf.Accept(batch)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(int64(0), buf.Stats().MetricsDropped.Get(), "dropped")
s.Equal(int64(3), buf.Stats().MetricsWritten.Get(), "written")
}
@ -636,9 +656,10 @@ func (s *BufferSuiteTest) TestBufferMetricsOverwriteBatchReject() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
batch := buf.Batch(3)
tx := buf.BeginTransaction(3)
buf.Add(m, m, m)
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(3), buf.Stats().MetricsDropped.Get())
s.Equal(int64(0), buf.Stats().MetricsWritten.Get())
}
@ -653,9 +674,10 @@ func (s *BufferSuiteTest) TestBufferMetricsBatchAcceptRemoved() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
batch := buf.Batch(3)
tx := buf.BeginTransaction(3)
buf.Add(m, m, m, m, m)
buf.Accept(batch)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(int64(2), buf.Stats().MetricsDropped.Get())
s.Equal(int64(3), buf.Stats().MetricsWritten.Get())
}
@ -670,10 +692,10 @@ func (s *BufferSuiteTest) TestBufferWrapWithBatch() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m)
buf.Batch(3)
tx := buf.BeginTransaction(3)
buf.Add(m, m, m, m, m, m)
s.Equal(int64(1), buf.Stats().MetricsDropped.Get())
buf.EndTransaction(tx)
}
func (s *BufferSuiteTest) TestBufferBatchNotRemoved() {
@ -682,8 +704,9 @@ func (s *BufferSuiteTest) TestBufferBatchNotRemoved() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
buf.Batch(2)
tx := buf.BeginTransaction(2)
s.Equal(5, buf.Len())
buf.EndTransaction(tx)
}
func (s *BufferSuiteTest) TestBufferBatchRejectAcceptNoop() {
@ -692,9 +715,11 @@ func (s *BufferSuiteTest) TestBufferBatchRejectAcceptNoop() {
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
batch := buf.Batch(2)
buf.Reject(batch)
buf.Accept(batch)
tx := buf.BeginTransaction(2)
tx.KeepAll()
buf.EndTransaction(tx)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(5, buf.Len())
}
@ -734,10 +759,11 @@ func (s *BufferSuiteTest) TestBufferAddCallsMetricRejectWhenNotInBatch() {
},
}
buf.Add(mm, mm, mm, mm, mm)
batch := buf.Batch(2)
tx := buf.BeginTransaction(2)
buf.Add(mm, mm, mm, mm)
s.Equal(2, reject)
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(4, reject)
}
@ -757,10 +783,11 @@ func (s *BufferSuiteTest) TestBufferRejectCallsMetricRejectWithOverwritten() {
},
}
buf.Add(mm, mm, mm, mm, mm)
batch := buf.Batch(5)
tx := buf.BeginTransaction(5)
buf.Add(mm, mm)
s.Equal(0, reject)
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(2, reject)
}
@ -780,13 +807,14 @@ func (s *BufferSuiteTest) TestBufferAddOverwriteAndReject() {
},
}
buf.Add(mm, mm, mm, mm, mm)
batch := buf.Batch(5)
tx := buf.BeginTransaction(5)
buf.Add(mm, mm, mm, mm, mm)
buf.Add(mm, mm, mm, mm, mm)
buf.Add(mm, mm, mm, mm, mm)
buf.Add(mm, mm, mm, mm, mm)
s.Equal(15, reject)
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(20, reject)
}
@ -812,7 +840,7 @@ func (s *BufferSuiteTest) TestBufferAddOverwriteAndRejectOffset() {
buf.Add(mm, mm, mm)
buf.Add(mm, mm, mm, mm)
s.Equal(2, reject)
batch := buf.Batch(5)
tx := buf.BeginTransaction(5)
buf.Add(mm, mm, mm, mm)
s.Equal(2, reject)
buf.Add(mm, mm, mm, mm)
@ -821,7 +849,8 @@ func (s *BufferSuiteTest) TestBufferAddOverwriteAndRejectOffset() {
s.Equal(9, reject)
buf.Add(mm, mm, mm, mm)
s.Equal(13, reject)
buf.Accept(batch)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(13, reject)
s.Equal(5, accept)
}
@ -830,14 +859,16 @@ func (s *BufferSuiteTest) TestBufferRejectEmptyBatch() {
buf := s.newTestBuffer(5)
defer buf.Close()
batch := buf.Batch(2)
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Reject(batch)
tx.KeepAll()
buf.EndTransaction(tx)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
batch = buf.Batch(2)
for _, m := range batch {
tx = buf.BeginTransaction(2)
for _, m := range tx.Batch {
s.NotNil(m)
}
buf.EndTransaction(tx)
}
func (s *BufferSuiteTest) TestBufferFlushedPartial() {
@ -847,10 +878,11 @@ func (s *BufferSuiteTest) TestBufferFlushedPartial() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
batch := buf.Batch(2)
s.Len(batch, 2)
tx := buf.BeginTransaction(2)
s.Len(tx.Batch, 2)
buf.Accept(batch)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(1, buf.Len())
}
@ -860,13 +892,48 @@ func (s *BufferSuiteTest) TestBufferFlushedFull() {
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
batch := buf.Batch(2)
s.Len(batch, 2)
tx := buf.BeginTransaction(2)
s.Len(tx.Batch, 2)
buf.Accept(batch)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(0, buf.Len())
}
func (s *BufferSuiteTest) TestPartialWriteBackToFront() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
// Get a batch of all metrics but only reject the last one
tx := buf.BeginTransaction(5)
s.Len(tx.Batch, 5)
tx.Reject = []int{4}
buf.EndTransaction(tx)
s.Equal(4, buf.Len())
// Get the next batch which should miss the last metric
tx = buf.BeginTransaction(5)
s.Len(tx.Batch, 4)
tx.Accept = []int{3}
buf.EndTransaction(tx)
s.Equal(3, buf.Len())
// Now get the next batch and reject the remaining metrics
tx = buf.BeginTransaction(5)
s.Len(tx.Batch, 3)
tx.Accept = []int{0, 1, 2}
buf.EndTransaction(tx)
s.Equal(0, buf.Len())
s.Equal(int64(5), buf.Stats().MetricsAdded.Get(), "metrics added")
s.Equal(int64(4), buf.Stats().MetricsWritten.Get(), "metrics written")
s.Equal(int64(1), buf.Stats().MetricsRejected.Get(), "metrics rejected")
s.Equal(int64(0), buf.Stats().MetricsDropped.Get(), "metrics dropped")
}
type mockMetric struct {
telegraf.Metric
AcceptF func()

View File

@ -306,17 +306,16 @@ func (r *RunningOutput) Write() error {
nBuffer := r.buffer.Len()
nBatches := nBuffer/r.MetricBatchSize + 1
for i := 0; i < nBatches; i++ {
batch := r.buffer.Batch(r.MetricBatchSize)
if len(batch) == 0 {
break
tx := r.buffer.BeginTransaction(r.MetricBatchSize)
if len(tx.Batch) == 0 {
return nil
}
err := r.writeMetrics(batch)
err := r.writeMetrics(tx.Batch)
r.updateTransaction(tx, err)
r.buffer.EndTransaction(tx)
if err != nil {
r.buffer.Reject(batch)
return err
}
r.buffer.Accept(batch)
}
return nil
}
@ -334,19 +333,15 @@ func (r *RunningOutput) WriteBatch() error {
r.log.Debugf("Successfully connected after %d attempts", r.retries)
}
batch := r.buffer.Batch(r.MetricBatchSize)
if len(batch) == 0 {
tx := r.buffer.BeginTransaction(r.MetricBatchSize)
if len(tx.Batch) == 0 {
return nil
}
err := r.writeMetrics(tx.Batch)
r.updateTransaction(tx, err)
r.buffer.EndTransaction(tx)
err := r.writeMetrics(batch)
if err != nil {
r.buffer.Reject(batch)
return err
}
r.buffer.Accept(batch)
return nil
}
func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error {
@ -367,6 +362,26 @@ func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error {
return err
}
func (r *RunningOutput) updateTransaction(tx *Transaction, err error) {
// No error indicates all metrics were written successfully
if err == nil {
tx.AcceptAll()
return
}
// A non-partial-write-error indicated none of the metrics were written
// successfully and we should keep them for the next write cycle
var writeErr *internal.PartialWriteError
if !errors.As(err, &writeErr) {
tx.KeepAll()
return
}
// Transfer the accepted and rejected indices based on the write error values
tx.Accept = writeErr.MetricsAccept
tx.Reject = writeErr.MetricsReject
}
func (r *RunningOutput) LogBufferStatus() {
nBuffer := r.buffer.Len()
if r.Config.BufferStrategy == "disk" {

View File

@ -433,6 +433,7 @@ func TestRunningOutputInternalMetrics(t *testing.T) {
"buffer_size": 0,
"errors": 0,
"metrics_added": 0,
"metrics_rejected": 0,
"metrics_dropped": 0,
"metrics_filtered": 0,
"metrics_written": 0,