From 6da035ba44a0679a951b70223828712fd32974a1 Mon Sep 17 00:00:00 2001 From: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> Date: Tue, 25 Jun 2024 08:46:23 -0500 Subject: [PATCH] chore(agent): Extract buffer into interface (#15545) --- models/buffer.go | 227 ++-------- models/buffer_mem.go | 186 ++++++++ models/buffer_mem_test.go | 25 ++ models/buffer_suite_test.go | 817 ++++++++++++++++++++++++++++++++++++ models/buffer_test.go | 726 -------------------------------- models/running_output.go | 12 +- 6 files changed, 1080 insertions(+), 913 deletions(-) create mode 100644 models/buffer_mem.go create mode 100644 models/buffer_mem_test.go create mode 100644 models/buffer_suite_test.go delete mode 100644 models/buffer_test.go diff --git a/models/buffer.go b/models/buffer.go index 3f97a43a6..92ea02178 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -1,7 +1,7 @@ package models import ( - "sync" + "fmt" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/selfstat" @@ -12,18 +12,31 @@ var ( AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{}) ) -// Buffer stores metrics in a circular buffer. -type Buffer struct { - sync.Mutex - buf []telegraf.Metric - first int // index of the first/oldest metric - last int // one after the index of the last/newest metric - size int // number of metrics currently in the buffer - cap int // the capacity of the buffer +type Buffer interface { + // Len returns the number of metrics currently in the buffer. + Len() int - batchFirst int // index of the first metric in the batch - batchSize int // number of metrics currently in the batch + // Add adds metrics to the buffer and returns number of dropped metrics. + Add(metrics ...telegraf.Metric) int + // Batch returns a slice containing up to batchSize of the oldest metrics not + // yet dropped. Metrics are ordered from oldest to newest in the batch. The + // batch must not be modified by the client. + Batch(batchSize int) []telegraf.Metric + + // Accept marks the batch, acquired from Batch(), as successfully written. + Accept(metrics []telegraf.Metric) + + // Reject returns the batch, acquired from Batch(), to the buffer and marks it + // as unsent. + Reject([]telegraf.Metric) + + Stats() BufferStats +} + +// BufferStats holds common metrics used for buffer implementations. +// Implementations of Buffer should embed this struct in them. +type BufferStats struct { MetricsAdded selfstat.Stat MetricsWritten selfstat.Stat MetricsDropped selfstat.Stat @@ -32,19 +45,23 @@ type Buffer struct { } // NewBuffer returns a new empty Buffer with the given capacity. -func NewBuffer(name string, alias string, capacity int) *Buffer { +func NewBuffer(name string, alias string, capacity int, strategy string, _ string) (Buffer, error) { + bs := NewBufferStats(name, alias, capacity) + + switch strategy { + case "", "memory": + return NewMemoryBuffer(capacity, bs) + } + return nil, fmt.Errorf("invalid buffer strategy %q", strategy) +} + +func NewBufferStats(name string, alias string, capacity int) BufferStats { tags := map[string]string{"output": name} if alias != "" { tags["alias"] = alias } - b := &Buffer{ - buf: make([]telegraf.Metric, capacity), - first: 0, - last: 0, - size: 0, - cap: capacity, - + bs := BufferStats{ MetricsAdded: selfstat.Register( "write", "metrics_added", @@ -71,183 +88,23 @@ func NewBuffer(name string, alias string, capacity int) *Buffer { tags, ), } - b.BufferSize.Set(int64(0)) - b.BufferLimit.Set(int64(capacity)) - return b + bs.BufferSize.Set(int64(0)) + bs.BufferLimit.Set(int64(capacity)) + return bs } -// Len returns the number of metrics currently in the buffer. -func (b *Buffer) Len() int { - b.Lock() - defer b.Unlock() - - return b.length() -} - -func (b *Buffer) length() int { - return min(b.size+b.batchSize, b.cap) -} - -func (b *Buffer) metricAdded() { +func (b *BufferStats) metricAdded() { b.MetricsAdded.Incr(1) } -func (b *Buffer) metricWritten(metric telegraf.Metric) { +func (b *BufferStats) metricWritten(metric telegraf.Metric) { AgentMetricsWritten.Incr(1) b.MetricsWritten.Incr(1) metric.Accept() } -func (b *Buffer) metricDropped(metric telegraf.Metric) { +func (b *BufferStats) metricDropped(metric telegraf.Metric) { AgentMetricsDropped.Incr(1) b.MetricsDropped.Incr(1) metric.Reject() } - -func (b *Buffer) addMetric(m telegraf.Metric) int { - dropped := 0 - // Check if Buffer is full - if b.size == b.cap { - b.metricDropped(b.buf[b.last]) - dropped++ - - if b.batchSize > 0 { - b.batchSize-- - b.batchFirst = b.next(b.batchFirst) - } - } - - b.metricAdded() - - b.buf[b.last] = m - b.last = b.next(b.last) - - if b.size == b.cap { - b.first = b.next(b.first) - } - - b.size = min(b.size+1, b.cap) - return dropped -} - -// Add adds metrics to the buffer and returns number of dropped metrics. -func (b *Buffer) Add(metrics ...telegraf.Metric) int { - b.Lock() - defer b.Unlock() - - dropped := 0 - for i := range metrics { - if n := b.addMetric(metrics[i]); n != 0 { - dropped += n - } - } - - b.BufferSize.Set(int64(b.length())) - return dropped -} - -// Batch returns a slice containing up to batchSize of the oldest metrics not -// yet dropped. Metrics are ordered from oldest to newest in the batch. The -// batch must not be modified by the client. -func (b *Buffer) Batch(batchSize int) []telegraf.Metric { - b.Lock() - defer b.Unlock() - - outLen := min(b.size, batchSize) - out := make([]telegraf.Metric, outLen) - if outLen == 0 { - return out - } - - b.batchFirst = b.first - b.batchSize = outLen - - batchIndex := b.batchFirst - for i := range out { - out[i] = b.buf[batchIndex] - b.buf[batchIndex] = nil - batchIndex = b.next(batchIndex) - } - - b.first = b.nextby(b.first, b.batchSize) - b.size -= outLen - return out -} - -// Accept marks the batch, acquired from Batch(), as successfully written. -func (b *Buffer) Accept(batch []telegraf.Metric) { - b.Lock() - defer b.Unlock() - - for _, m := range batch { - b.metricWritten(m) - } - - b.resetBatch() - b.BufferSize.Set(int64(b.length())) -} - -// Reject returns the batch, acquired from Batch(), to the buffer and marks it -// as unsent. -func (b *Buffer) Reject(batch []telegraf.Metric) { - b.Lock() - defer b.Unlock() - - if len(batch) == 0 { - return - } - - free := b.cap - b.size - restore := min(len(batch), free) - skip := len(batch) - restore - - b.first = b.prevby(b.first, restore) - b.size = min(b.size+restore, b.cap) - - re := b.first - - // Copy metrics from the batch back into the buffer - for i := range batch { - if i < skip { - b.metricDropped(batch[i]) - } else { - b.buf[re] = batch[i] - re = b.next(re) - } - } - - b.resetBatch() - b.BufferSize.Set(int64(b.length())) -} - -// next returns the next index with wrapping. -func (b *Buffer) next(index int) int { - index++ - if index == b.cap { - return 0 - } - return index -} - -// nextby returns the index that is count newer with wrapping. -func (b *Buffer) nextby(index, count int) int { - index += count - index %= b.cap - return index -} - -// prevby returns the index that is count older with wrapping. -func (b *Buffer) prevby(index, count int) int { - index -= count - for index < 0 { - index += b.cap - } - - index %= b.cap - return index -} - -func (b *Buffer) resetBatch() { - b.batchFirst = 0 - b.batchSize = 0 -} diff --git a/models/buffer_mem.go b/models/buffer_mem.go new file mode 100644 index 000000000..88216c9f8 --- /dev/null +++ b/models/buffer_mem.go @@ -0,0 +1,186 @@ +package models + +import ( + "sync" + + "github.com/influxdata/telegraf" +) + +// MemoryBuffer stores metrics in a circular buffer. +type MemoryBuffer struct { + sync.Mutex + BufferStats + + buf []telegraf.Metric + first int // index of the first/oldest metric + last int // one after the index of the last/newest metric + size int // number of metrics currently in the buffer + cap int // the capacity of the buffer + + batchFirst int // index of the first metric in the batch + batchSize int // number of metrics currently in the batch +} + +func NewMemoryBuffer(capacity int, stats BufferStats) (*MemoryBuffer, error) { + return &MemoryBuffer{ + BufferStats: stats, + buf: make([]telegraf.Metric, capacity), + cap: capacity, + }, nil +} + +func (b *MemoryBuffer) Len() int { + b.Lock() + defer b.Unlock() + + return b.length() +} + +func (b *MemoryBuffer) length() int { + return min(b.size+b.batchSize, b.cap) +} + +func (b *MemoryBuffer) addMetric(m telegraf.Metric) int { + dropped := 0 + // Check if Buffer is full + if b.size == b.cap { + b.metricDropped(b.buf[b.last]) + dropped++ + + if b.batchSize > 0 { + b.batchSize-- + b.batchFirst = b.next(b.batchFirst) + } + } + + b.metricAdded() + + b.buf[b.last] = m + b.last = b.next(b.last) + + if b.size == b.cap { + b.first = b.next(b.first) + } + + b.size = min(b.size+1, b.cap) + return dropped +} + +func (b *MemoryBuffer) Add(metrics ...telegraf.Metric) int { + b.Lock() + defer b.Unlock() + + dropped := 0 + for i := range metrics { + if n := b.addMetric(metrics[i]); n != 0 { + dropped += n + } + } + + b.BufferSize.Set(int64(b.length())) + return dropped +} + +func (b *MemoryBuffer) Batch(batchSize int) []telegraf.Metric { + b.Lock() + defer b.Unlock() + + outLen := min(b.size, batchSize) + out := make([]telegraf.Metric, outLen) + if outLen == 0 { + return out + } + + b.batchFirst = b.first + b.batchSize = outLen + + batchIndex := b.batchFirst + for i := range out { + out[i] = b.buf[batchIndex] + b.buf[batchIndex] = nil + batchIndex = b.next(batchIndex) + } + + b.first = b.nextby(b.first, b.batchSize) + b.size -= outLen + return out +} + +func (b *MemoryBuffer) Accept(batch []telegraf.Metric) { + b.Lock() + defer b.Unlock() + + for _, m := range batch { + b.metricWritten(m) + } + + b.resetBatch() + b.BufferSize.Set(int64(b.length())) +} + +func (b *MemoryBuffer) Reject(batch []telegraf.Metric) { + b.Lock() + defer b.Unlock() + + if len(batch) == 0 { + return + } + + free := b.cap - b.size + restore := min(len(batch), free) + skip := len(batch) - restore + + b.first = b.prevby(b.first, restore) + b.size = min(b.size+restore, b.cap) + + re := b.first + + // Copy metrics from the batch back into the buffer + for i := range batch { + if i < skip { + b.metricDropped(batch[i]) + } else { + b.buf[re] = batch[i] + re = b.next(re) + } + } + + b.resetBatch() + b.BufferSize.Set(int64(b.length())) +} + +func (b *MemoryBuffer) Stats() BufferStats { + return b.BufferStats +} + +// next returns the next index with wrapping. +func (b *MemoryBuffer) next(index int) int { + index++ + if index == b.cap { + return 0 + } + return index +} + +// nextby returns the index that is count newer with wrapping. +func (b *MemoryBuffer) nextby(index, count int) int { + index += count + index %= b.cap + return index +} + +// prevby returns the index that is count older with wrapping. +func (b *MemoryBuffer) prevby(index, count int) int { + index -= count + for index < 0 { + index += b.cap + } + + index %= b.cap + return index +} + +func (b *MemoryBuffer) resetBatch() { + b.batchFirst = 0 + b.batchSize = 0 +} diff --git a/models/buffer_mem_test.go b/models/buffer_mem_test.go new file mode 100644 index 000000000..12803184f --- /dev/null +++ b/models/buffer_mem_test.go @@ -0,0 +1,25 @@ +package models + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func newTestMemoryBuffer(t testing.TB, capacity int) Buffer { + t.Helper() + buf, err := NewBuffer("test", "", capacity, "memory", "") + require.NoError(t, err) + buf.Stats().MetricsAdded.Set(0) + buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsDropped.Set(0) + return buf +} + +func BenchmarkAddMetrics(b *testing.B) { + buf := newTestMemoryBuffer(b, 10000) + m := Metric() + for n := 0; n < b.N; n++ { + buf.Add(m) + } +} diff --git a/models/buffer_suite_test.go b/models/buffer_suite_test.go new file mode 100644 index 000000000..19df31355 --- /dev/null +++ b/models/buffer_suite_test.go @@ -0,0 +1,817 @@ +package models + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" +) + +type MockMetric struct { + telegraf.Metric + AcceptF func() + RejectF func() + DropF func() +} + +func (m *MockMetric) Accept() { + m.AcceptF() +} + +func (m *MockMetric) Reject() { + m.RejectF() +} + +func (m *MockMetric) Drop() { + m.DropF() +} + +type BufferSuiteTest struct { + suite.Suite + bufferType string + bufferPath string + + hasMaxCapacity bool // whether the buffer type being tested supports a maximum metric capacity +} + +func (s *BufferSuiteTest) SetupTest() { + switch s.bufferType { + case "", "memory": + s.hasMaxCapacity = true + } +} + +func (s *BufferSuiteTest) TearDownTest() { + if s.bufferPath != "" { + os.RemoveAll(s.bufferPath) + s.bufferPath = "" + } +} + +func TestMemoryBufferSuite(t *testing.T) { + suite.Run(t, &BufferSuiteTest{bufferType: "memory"}) +} + +func Metric() telegraf.Metric { + return MetricTime(0) +} + +func MetricTime(sec int64) telegraf.Metric { + m := metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(sec, 0), + ) + return m +} + +func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer { + s.T().Helper() + buf, err := NewBuffer("test", "", capacity, s.bufferType, s.bufferPath) + s.Require().NoError(err) + buf.Stats().MetricsAdded.Set(0) + buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsDropped.Set(0) + return buf +} + +func (s *BufferSuiteTest) TestBuffer_LenEmpty() { + b := s.newTestBuffer(5) + + s.Equal(0, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_LenOne() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m) + + s.Equal(1, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_LenFull() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m) + + s.Equal(5, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_LenOverfill() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m, m) + + s.Equal(5, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenZero() { + b := s.newTestBuffer(5) + batch := b.Batch(0) + + s.Empty(batch) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenBufferEmpty() { + b := s.newTestBuffer(5) + batch := b.Batch(2) + + s.Empty(batch) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenUnderfill() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m) + batch := b.Batch(2) + + s.Len(batch, 1) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenFill() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m) + batch := b.Batch(2) + s.Len(batch, 2) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenExact() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m) + batch := b.Batch(2) + s.Len(batch, 2) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLenLargerThanBuffer() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m) + batch := b.Batch(6) + s.Len(batch, 5) +} + +func (s *BufferSuiteTest) TestBuffer_BatchWrap() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m) + batch := b.Batch(2) + b.Accept(batch) + b.Add(m, m) + batch = b.Batch(5) + s.Len(batch, 5) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLatest() { + b := s.newTestBuffer(4) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(1), + MetricTime(2), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_BatchLatestWrap() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(4) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + batch := b.Batch(2) + + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(2), + MetricTime(3), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_MultipleBatch() { + b := s.newTestBuffer(10) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + batch := b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + }, batch) + b.Accept(batch) + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(6), + }, batch) + b.Accept(batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectWithRoom() { + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Reject(batch) + + s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectNothingNewFull() { + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + batch := b.Batch(2) + b.Reject(batch) + + s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectNoRoom() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + + b.Reject(batch) + + s.Equal(int64(3), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(4), + MetricTime(5), + MetricTime(6), + MetricTime(7), + MetricTime(8), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectRoomExact() { + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + batch := b.Batch(2) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + + b.Reject(batch) + + s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectRoomOverwriteOld() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(1) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + + b.Reject(batch) + + s.Equal(int64(1), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + MetricTime(6), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectPartialRoom() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Reject(batch) + + s.Equal(int64(2), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(3), + MetricTime(4), + MetricTime(5), + MetricTime(6), + MetricTime(7), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectNewMetricsWrapped() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + + // buffer: 1, 4, 5; batch: 2, 3 + s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + b.Add(MetricTime(9)) + b.Add(MetricTime(10)) + + // buffer: 8, 9, 10, 6, 7; batch: 2, 3 + s.Equal(int64(3), b.Stats().MetricsDropped.Get()) + + b.Add(MetricTime(11)) + b.Add(MetricTime(12)) + b.Add(MetricTime(13)) + b.Add(MetricTime(14)) + b.Add(MetricTime(15)) + // buffer: 13, 14, 15, 11, 12; batch: 2, 3 + s.Equal(int64(8), b.Stats().MetricsDropped.Get()) + b.Reject(batch) + + s.Equal(int64(10), b.Stats().MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(11), + MetricTime(12), + MetricTime(13), + MetricTime(14), + MetricTime(15), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectWrapped() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(5) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + batch := b.Batch(3) + + b.Add(MetricTime(9)) + b.Add(MetricTime(10)) + b.Add(MetricTime(11)) + b.Add(MetricTime(12)) + + b.Reject(batch) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(8), + MetricTime(9), + MetricTime(10), + MetricTime(11), + MetricTime(12), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_RejectAdjustFirst() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + b := s.newTestBuffer(10) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(3) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + b.Reject(batch) + + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + b.Add(MetricTime(9)) + batch = b.Batch(3) + b.Add(MetricTime(10)) + b.Add(MetricTime(11)) + b.Add(MetricTime(12)) + b.Reject(batch) + + b.Add(MetricTime(13)) + b.Add(MetricTime(14)) + b.Add(MetricTime(15)) + batch = b.Batch(3) + b.Add(MetricTime(16)) + b.Add(MetricTime(17)) + b.Add(MetricTime(18)) + b.Reject(batch) + + b.Add(MetricTime(19)) + + batch = b.Batch(10) + testutil.RequireMetricsEqual(s.T(), + []telegraf.Metric{ + MetricTime(10), + MetricTime(11), + MetricTime(12), + MetricTime(13), + MetricTime(14), + MetricTime(15), + MetricTime(16), + MetricTime(17), + MetricTime(18), + MetricTime(19), + }, batch) +} + +func (s *BufferSuiteTest) TestBuffer_AddDropsOverwrittenMetrics() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + b.Add(m, m, m, m, m) + + s.Equal(int64(5), b.Stats().MetricsDropped.Get()) + s.Equal(int64(0), b.Stats().MetricsWritten.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_AcceptRemovesBatch() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m) + batch := b.Batch(2) + b.Accept(batch) + s.Equal(1, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_RejectLeavesBatch() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m) + batch := b.Batch(2) + b.Reject(batch) + s.Equal(3, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_AcceptWritesOverwrittenBatch() { + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + batch := b.Batch(5) + b.Add(m, m, m, m, m) + b.Accept(batch) + + s.Equal(int64(0), b.Stats().MetricsDropped.Get()) + s.Equal(int64(5), b.Stats().MetricsWritten.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_BatchRejectDropsOverwrittenBatch() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + batch := b.Batch(5) + b.Add(m, m, m, m, m) + b.Reject(batch) + + s.Equal(int64(5), b.Stats().MetricsDropped.Get()) + s.Equal(int64(0), b.Stats().MetricsWritten.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_MetricsOverwriteBatchAccept() { + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + batch := b.Batch(3) + b.Add(m, m, m) + b.Accept(batch) + s.Equal(int64(0), b.Stats().MetricsDropped.Get(), "dropped") + s.Equal(int64(3), b.Stats().MetricsWritten.Get(), "written") +} + +func (s *BufferSuiteTest) TestBuffer_MetricsOverwriteBatchReject() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + batch := b.Batch(3) + b.Add(m, m, m) + b.Reject(batch) + s.Equal(int64(3), b.Stats().MetricsDropped.Get()) + s.Equal(int64(0), b.Stats().MetricsWritten.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_MetricsBatchAcceptRemoved() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m, m, m) + batch := b.Batch(3) + b.Add(m, m, m, m, m) + b.Accept(batch) + s.Equal(int64(2), b.Stats().MetricsDropped.Get()) + s.Equal(int64(3), b.Stats().MetricsWritten.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_WrapWithBatch() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + m := Metric() + b := s.newTestBuffer(5) + + b.Add(m, m, m) + b.Batch(3) + b.Add(m, m, m, m, m, m) + + s.Equal(int64(1), b.Stats().MetricsDropped.Get()) +} + +func (s *BufferSuiteTest) TestBuffer_BatchNotRemoved() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m) + b.Batch(2) + s.Equal(5, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_BatchRejectAcceptNoop() { + m := Metric() + b := s.newTestBuffer(5) + b.Add(m, m, m, m, m) + batch := b.Batch(2) + b.Reject(batch) + b.Accept(batch) + s.Equal(5, b.Len()) +} + +func (s *BufferSuiteTest) TestBuffer_AcceptCallsMetricAccept() { + var accept int + mm := &MockMetric{ + Metric: Metric(), + AcceptF: func() { + accept++ + }, + } + b := s.newTestBuffer(5) + b.Add(mm, mm, mm) + batch := b.Batch(2) + b.Accept(batch) + s.Equal(2, accept) +} + +func (s *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNoBatch() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + var reject int + mm := &MockMetric{ + Metric: Metric(), + RejectF: func() { + reject++ + }, + } + b := s.newTestBuffer(5) + b.Add(mm, mm, mm, mm, mm) + b.Add(mm, mm) + s.Equal(2, reject) +} + +func (s *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNotInBatch() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + var reject int + mm := &MockMetric{ + Metric: Metric(), + RejectF: func() { + reject++ + }, + } + b := s.newTestBuffer(5) + b.Add(mm, mm, mm, mm, mm) + batch := b.Batch(2) + b.Add(mm, mm, mm, mm) + s.Equal(2, reject) + b.Reject(batch) + s.Equal(4, reject) +} + +func (s *BufferSuiteTest) TestBuffer_RejectCallsMetricRejectWithOverwritten() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + var reject int + mm := &MockMetric{ + Metric: Metric(), + RejectF: func() { + reject++ + }, + } + b := s.newTestBuffer(5) + b.Add(mm, mm, mm, mm, mm) + batch := b.Batch(5) + b.Add(mm, mm) + s.Equal(0, reject) + b.Reject(batch) + s.Equal(2, reject) +} + +func (s *BufferSuiteTest) TestBuffer_AddOverwriteAndReject() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + var reject int + mm := &MockMetric{ + Metric: Metric(), + RejectF: func() { + reject++ + }, + } + b := s.newTestBuffer(5) + b.Add(mm, mm, mm, mm, mm) + batch := b.Batch(5) + b.Add(mm, mm, mm, mm, mm) + b.Add(mm, mm, mm, mm, mm) + b.Add(mm, mm, mm, mm, mm) + b.Add(mm, mm, mm, mm, mm) + s.Equal(15, reject) + b.Reject(batch) + s.Equal(20, reject) +} + +func (s *BufferSuiteTest) TestBuffer_AddOverwriteAndRejectOffset() { + if !s.hasMaxCapacity { + s.T().Skip("tested buffer does not have a maximum capacity") + } + + var reject int + var accept int + mm := &MockMetric{ + Metric: Metric(), + RejectF: func() { + reject++ + }, + AcceptF: func() { + accept++ + }, + } + b := s.newTestBuffer(5) + b.Add(mm, mm, mm) + b.Add(mm, mm, mm, mm) + s.Equal(2, reject) + batch := b.Batch(5) + b.Add(mm, mm, mm, mm) + s.Equal(2, reject) + b.Add(mm, mm, mm, mm) + s.Equal(5, reject) + b.Add(mm, mm, mm, mm) + s.Equal(9, reject) + b.Add(mm, mm, mm, mm) + s.Equal(13, reject) + b.Accept(batch) + s.Equal(13, reject) + s.Equal(5, accept) +} + +func (s *BufferSuiteTest) TestBuffer_RejectEmptyBatch() { + b := s.newTestBuffer(5) + batch := b.Batch(2) + b.Add(MetricTime(1)) + b.Reject(batch) + b.Add(MetricTime(2)) + batch = b.Batch(2) + for _, m := range batch { + s.NotNil(m) + } +} diff --git a/models/buffer_test.go b/models/buffer_test.go deleted file mode 100644 index 276b5c47c..000000000 --- a/models/buffer_test.go +++ /dev/null @@ -1,726 +0,0 @@ -package models - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/testutil" -) - -type MockMetric struct { - telegraf.Metric - AcceptF func() - RejectF func() - DropF func() -} - -func (m *MockMetric) Accept() { - m.AcceptF() -} - -func (m *MockMetric) Reject() { - m.RejectF() -} - -func (m *MockMetric) Drop() { - m.DropF() -} - -func Metric() telegraf.Metric { - return MetricTime(0) -} - -func MetricTime(sec int64) telegraf.Metric { - m := metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "value": 42.0, - }, - time.Unix(sec, 0), - ) - return m -} - -func BenchmarkAddMetrics(b *testing.B) { - buf := NewBuffer("test", "", 10000) - m := Metric() - for n := 0; n < b.N; n++ { - buf.Add(m) - } -} - -func setup(b *Buffer) *Buffer { - b.MetricsAdded.Set(0) - b.MetricsWritten.Set(0) - b.MetricsDropped.Set(0) - return b -} - -func TestBuffer_LenEmpty(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - - require.Equal(t, 0, b.Len()) -} - -func TestBuffer_LenOne(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m) - - require.Equal(t, 1, b.Len()) -} - -func TestBuffer_LenFull(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m, m, m) - - require.Equal(t, 5, b.Len()) -} - -func TestBuffer_LenOverfill(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - setup(b) - b.Add(m, m, m, m, m, m) - - require.Equal(t, 5, b.Len()) -} - -func TestBuffer_BatchLenZero(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - batch := b.Batch(0) - - require.Empty(t, batch) -} - -func TestBuffer_BatchLenBufferEmpty(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - batch := b.Batch(2) - - require.Empty(t, batch) -} - -func TestBuffer_BatchLenUnderfill(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m) - batch := b.Batch(2) - - require.Len(t, batch, 1) -} - -func TestBuffer_BatchLenFill(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m) - batch := b.Batch(2) - require.Len(t, batch, 2) -} - -func TestBuffer_BatchLenExact(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m) - batch := b.Batch(2) - require.Len(t, batch, 2) -} - -func TestBuffer_BatchLenLargerThanBuffer(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m, m, m) - batch := b.Batch(6) - require.Len(t, batch, 5) -} - -func TestBuffer_BatchWrap(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m, m, m) - batch := b.Batch(2) - b.Accept(batch) - b.Add(m, m) - batch = b.Batch(5) - require.Len(t, batch, 5) -} - -func TestBuffer_BatchLatest(t *testing.T) { - b := setup(NewBuffer("test", "", 4)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - }, batch) -} - -func TestBuffer_BatchLatestWrap(t *testing.T) { - b := setup(NewBuffer("test", "", 4)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - batch := b.Batch(2) - - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(2), - MetricTime(3), - }, batch) -} - -func TestBuffer_MultipleBatch(t *testing.T) { - b := setup(NewBuffer("test", "", 10)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - batch := b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - }, batch) - b.Accept(batch) - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(6), - }, batch) - b.Accept(batch) -} - -func TestBuffer_RejectWithRoom(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Reject(batch) - - require.Equal(t, int64(0), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - }, batch) -} - -func TestBuffer_RejectNothingNewFull(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - batch := b.Batch(2) - b.Reject(batch) - - require.Equal(t, int64(0), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - }, batch) -} - -func TestBuffer_RejectNoRoom(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - - b.Reject(batch) - - require.Equal(t, int64(3), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(4), - MetricTime(5), - MetricTime(6), - MetricTime(7), - MetricTime(8), - }, batch) -} - -func TestBuffer_RejectRoomExact(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - batch := b.Batch(2) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - - b.Reject(batch) - - require.Equal(t, int64(0), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - }, batch) -} - -func TestBuffer_RejectRoomOverwriteOld(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(1) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - - b.Reject(batch) - - require.Equal(t, int64(1), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(2), - MetricTime(3), - MetricTime(4), - MetricTime(5), - MetricTime(6), - }, batch) -} - -func TestBuffer_RejectPartialRoom(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Reject(batch) - - require.Equal(t, int64(2), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(3), - MetricTime(4), - MetricTime(5), - MetricTime(6), - MetricTime(7), - }, batch) -} - -func TestBuffer_RejectNewMetricsWrapped(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(2) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - - // buffer: 1, 4, 5; batch: 2, 3 - require.Equal(t, int64(0), b.MetricsDropped.Get()) - - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - b.Add(MetricTime(9)) - b.Add(MetricTime(10)) - - // buffer: 8, 9, 10, 6, 7; batch: 2, 3 - require.Equal(t, int64(3), b.MetricsDropped.Get()) - - b.Add(MetricTime(11)) - b.Add(MetricTime(12)) - b.Add(MetricTime(13)) - b.Add(MetricTime(14)) - b.Add(MetricTime(15)) - // buffer: 13, 14, 15, 11, 12; batch: 2, 3 - require.Equal(t, int64(8), b.MetricsDropped.Get()) - b.Reject(batch) - - require.Equal(t, int64(10), b.MetricsDropped.Get()) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(11), - MetricTime(12), - MetricTime(13), - MetricTime(14), - MetricTime(15), - }, batch) -} - -func TestBuffer_RejectWrapped(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - - b.Add(MetricTime(6)) - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - batch := b.Batch(3) - - b.Add(MetricTime(9)) - b.Add(MetricTime(10)) - b.Add(MetricTime(11)) - b.Add(MetricTime(12)) - - b.Reject(batch) - - batch = b.Batch(5) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(8), - MetricTime(9), - MetricTime(10), - MetricTime(11), - MetricTime(12), - }, batch) -} - -func TestBuffer_RejectAdjustFirst(t *testing.T) { - b := setup(NewBuffer("test", "", 10)) - b.Add(MetricTime(1)) - b.Add(MetricTime(2)) - b.Add(MetricTime(3)) - batch := b.Batch(3) - b.Add(MetricTime(4)) - b.Add(MetricTime(5)) - b.Add(MetricTime(6)) - b.Reject(batch) - - b.Add(MetricTime(7)) - b.Add(MetricTime(8)) - b.Add(MetricTime(9)) - batch = b.Batch(3) - b.Add(MetricTime(10)) - b.Add(MetricTime(11)) - b.Add(MetricTime(12)) - b.Reject(batch) - - b.Add(MetricTime(13)) - b.Add(MetricTime(14)) - b.Add(MetricTime(15)) - batch = b.Batch(3) - b.Add(MetricTime(16)) - b.Add(MetricTime(17)) - b.Add(MetricTime(18)) - b.Reject(batch) - - b.Add(MetricTime(19)) - - batch = b.Batch(10) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(10), - MetricTime(11), - MetricTime(12), - MetricTime(13), - MetricTime(14), - MetricTime(15), - MetricTime(16), - MetricTime(17), - MetricTime(18), - MetricTime(19), - }, batch) -} - -func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - b.Add(m, m, m, m, m) - - require.Equal(t, int64(5), b.MetricsDropped.Get()) - require.Equal(t, int64(0), b.MetricsWritten.Get()) -} - -func TestBuffer_AcceptRemovesBatch(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m) - batch := b.Batch(2) - b.Accept(batch) - require.Equal(t, 1, b.Len()) -} - -func TestBuffer_RejectLeavesBatch(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m) - batch := b.Batch(2) - b.Reject(batch) - require.Equal(t, 3, b.Len()) -} - -func TestBuffer_AcceptWritesOverwrittenBatch(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - batch := b.Batch(5) - b.Add(m, m, m, m, m) - b.Accept(batch) - - require.Equal(t, int64(0), b.MetricsDropped.Get()) - require.Equal(t, int64(5), b.MetricsWritten.Get()) -} - -func TestBuffer_BatchRejectDropsOverwrittenBatch(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - batch := b.Batch(5) - b.Add(m, m, m, m, m) - b.Reject(batch) - - require.Equal(t, int64(5), b.MetricsDropped.Get()) - require.Equal(t, int64(0), b.MetricsWritten.Get()) -} - -func TestBuffer_MetricsOverwriteBatchAccept(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - batch := b.Batch(3) - b.Add(m, m, m) - b.Accept(batch) - require.Equal(t, int64(0), b.MetricsDropped.Get(), "dropped") - require.Equal(t, int64(3), b.MetricsWritten.Get(), "written") -} - -func TestBuffer_MetricsOverwriteBatchReject(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - batch := b.Batch(3) - b.Add(m, m, m) - b.Reject(batch) - require.Equal(t, int64(3), b.MetricsDropped.Get()) - require.Equal(t, int64(0), b.MetricsWritten.Get()) -} - -func TestBuffer_MetricsBatchAcceptRemoved(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m, m, m) - batch := b.Batch(3) - b.Add(m, m, m, m, m) - b.Accept(batch) - require.Equal(t, int64(2), b.MetricsDropped.Get()) - require.Equal(t, int64(3), b.MetricsWritten.Get()) -} - -func TestBuffer_WrapWithBatch(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - - b.Add(m, m, m) - b.Batch(3) - b.Add(m, m, m, m, m, m) - - require.Equal(t, int64(1), b.MetricsDropped.Get()) -} - -func TestBuffer_BatchNotRemoved(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m, m, m) - b.Batch(2) - require.Equal(t, 5, b.Len()) -} - -func TestBuffer_BatchRejectAcceptNoop(t *testing.T) { - m := Metric() - b := setup(NewBuffer("test", "", 5)) - b.Add(m, m, m, m, m) - batch := b.Batch(2) - b.Reject(batch) - b.Accept(batch) - require.Equal(t, 5, b.Len()) -} - -func TestBuffer_AcceptCallsMetricAccept(t *testing.T) { - var accept int - mm := &MockMetric{ - Metric: Metric(), - AcceptF: func() { - accept++ - }, - } - b := setup(NewBuffer("test", "", 5)) - b.Add(mm, mm, mm) - batch := b.Batch(2) - b.Accept(batch) - require.Equal(t, 2, accept) -} - -func TestBuffer_AddCallsMetricRejectWhenNoBatch(t *testing.T) { - var reject int - mm := &MockMetric{ - Metric: Metric(), - RejectF: func() { - reject++ - }, - } - b := setup(NewBuffer("test", "", 5)) - setup(b) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm) - require.Equal(t, 2, reject) -} - -func TestBuffer_AddCallsMetricRejectWhenNotInBatch(t *testing.T) { - var reject int - mm := &MockMetric{ - Metric: Metric(), - RejectF: func() { - reject++ - }, - } - b := setup(NewBuffer("test", "", 5)) - setup(b) - b.Add(mm, mm, mm, mm, mm) - batch := b.Batch(2) - b.Add(mm, mm, mm, mm) - require.Equal(t, 2, reject) - b.Reject(batch) - require.Equal(t, 4, reject) -} - -func TestBuffer_RejectCallsMetricRejectWithOverwritten(t *testing.T) { - var reject int - mm := &MockMetric{ - Metric: Metric(), - RejectF: func() { - reject++ - }, - } - b := setup(NewBuffer("test", "", 5)) - b.Add(mm, mm, mm, mm, mm) - batch := b.Batch(5) - b.Add(mm, mm) - require.Equal(t, 0, reject) - b.Reject(batch) - require.Equal(t, 2, reject) -} - -func TestBuffer_AddOverwriteAndReject(t *testing.T) { - var reject int - mm := &MockMetric{ - Metric: Metric(), - RejectF: func() { - reject++ - }, - } - b := setup(NewBuffer("test", "", 5)) - b.Add(mm, mm, mm, mm, mm) - batch := b.Batch(5) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm, mm, mm, mm) - b.Add(mm, mm, mm, mm, mm) - require.Equal(t, 15, reject) - b.Reject(batch) - require.Equal(t, 20, reject) -} - -func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) { - var reject int - var accept int - mm := &MockMetric{ - Metric: Metric(), - RejectF: func() { - reject++ - }, - AcceptF: func() { - accept++ - }, - } - b := setup(NewBuffer("test", "", 5)) - b.Add(mm, mm, mm) - b.Add(mm, mm, mm, mm) - require.Equal(t, 2, reject) - batch := b.Batch(5) - b.Add(mm, mm, mm, mm) - require.Equal(t, 2, reject) - b.Add(mm, mm, mm, mm) - require.Equal(t, 5, reject) - b.Add(mm, mm, mm, mm) - require.Equal(t, 9, reject) - b.Add(mm, mm, mm, mm) - require.Equal(t, 13, reject) - b.Accept(batch) - require.Equal(t, 13, reject) - require.Equal(t, 5, accept) -} - -func TestBuffer_RejectEmptyBatch(t *testing.T) { - b := setup(NewBuffer("test", "", 5)) - batch := b.Batch(2) - b.Add(MetricTime(1)) - b.Reject(batch) - b.Add(MetricTime(2)) - batch = b.Batch(2) - for _, m := range batch { - require.NotNil(t, m) - } -} diff --git a/models/running_output.go b/models/running_output.go index ada1f745f..0374de04d 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -37,6 +37,9 @@ type OutputConfig struct { NameOverride string NamePrefix string NameSuffix string + + BufferStrategy string + BufferDirectory string } // RunningOutput contains the output configuration @@ -56,7 +59,7 @@ type RunningOutput struct { BatchReady chan time.Time - buffer *Buffer + buffer Buffer log telegraf.Logger started bool @@ -96,8 +99,13 @@ func NewRunningOutput( batchSize = DefaultMetricBatchSize } + b, err := NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory) + if err != nil { + panic(err) + } + ro := &RunningOutput{ - buffer: NewBuffer(config.Name, config.Alias, bufferLimit), + buffer: b, BatchReady: make(chan time.Time, 1), Output: output, Config: config,