chore(agent): Extract buffer into interface (#15545)
This commit is contained in:
parent
5a46c0aeaf
commit
6da035ba44
227
models/buffer.go
227
models/buffer.go
|
|
@ -1,7 +1,7 @@
|
||||||
package models
|
package models
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"fmt"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/selfstat"
|
"github.com/influxdata/telegraf/selfstat"
|
||||||
|
|
@ -12,18 +12,31 @@ var (
|
||||||
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{})
|
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{})
|
||||||
)
|
)
|
||||||
|
|
||||||
// Buffer stores metrics in a circular buffer.
|
type Buffer interface {
|
||||||
type Buffer struct {
|
// Len returns the number of metrics currently in the buffer.
|
||||||
sync.Mutex
|
Len() int
|
||||||
buf []telegraf.Metric
|
|
||||||
first int // index of the first/oldest metric
|
|
||||||
last int // one after the index of the last/newest metric
|
|
||||||
size int // number of metrics currently in the buffer
|
|
||||||
cap int // the capacity of the buffer
|
|
||||||
|
|
||||||
batchFirst int // index of the first metric in the batch
|
// Add adds metrics to the buffer and returns number of dropped metrics.
|
||||||
batchSize int // number of metrics currently in the batch
|
Add(metrics ...telegraf.Metric) int
|
||||||
|
|
||||||
|
// Batch returns a slice containing up to batchSize of the oldest metrics not
|
||||||
|
// yet dropped. Metrics are ordered from oldest to newest in the batch. The
|
||||||
|
// batch must not be modified by the client.
|
||||||
|
Batch(batchSize int) []telegraf.Metric
|
||||||
|
|
||||||
|
// Accept marks the batch, acquired from Batch(), as successfully written.
|
||||||
|
Accept(metrics []telegraf.Metric)
|
||||||
|
|
||||||
|
// Reject returns the batch, acquired from Batch(), to the buffer and marks it
|
||||||
|
// as unsent.
|
||||||
|
Reject([]telegraf.Metric)
|
||||||
|
|
||||||
|
Stats() BufferStats
|
||||||
|
}
|
||||||
|
|
||||||
|
// BufferStats holds common metrics used for buffer implementations.
|
||||||
|
// Implementations of Buffer should embed this struct in them.
|
||||||
|
type BufferStats struct {
|
||||||
MetricsAdded selfstat.Stat
|
MetricsAdded selfstat.Stat
|
||||||
MetricsWritten selfstat.Stat
|
MetricsWritten selfstat.Stat
|
||||||
MetricsDropped selfstat.Stat
|
MetricsDropped selfstat.Stat
|
||||||
|
|
@ -32,19 +45,23 @@ type Buffer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBuffer returns a new empty Buffer with the given capacity.
|
// NewBuffer returns a new empty Buffer with the given capacity.
|
||||||
func NewBuffer(name string, alias string, capacity int) *Buffer {
|
func NewBuffer(name string, alias string, capacity int, strategy string, _ string) (Buffer, error) {
|
||||||
|
bs := NewBufferStats(name, alias, capacity)
|
||||||
|
|
||||||
|
switch strategy {
|
||||||
|
case "", "memory":
|
||||||
|
return NewMemoryBuffer(capacity, bs)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("invalid buffer strategy %q", strategy)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBufferStats(name string, alias string, capacity int) BufferStats {
|
||||||
tags := map[string]string{"output": name}
|
tags := map[string]string{"output": name}
|
||||||
if alias != "" {
|
if alias != "" {
|
||||||
tags["alias"] = alias
|
tags["alias"] = alias
|
||||||
}
|
}
|
||||||
|
|
||||||
b := &Buffer{
|
bs := BufferStats{
|
||||||
buf: make([]telegraf.Metric, capacity),
|
|
||||||
first: 0,
|
|
||||||
last: 0,
|
|
||||||
size: 0,
|
|
||||||
cap: capacity,
|
|
||||||
|
|
||||||
MetricsAdded: selfstat.Register(
|
MetricsAdded: selfstat.Register(
|
||||||
"write",
|
"write",
|
||||||
"metrics_added",
|
"metrics_added",
|
||||||
|
|
@ -71,183 +88,23 @@ func NewBuffer(name string, alias string, capacity int) *Buffer {
|
||||||
tags,
|
tags,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
b.BufferSize.Set(int64(0))
|
bs.BufferSize.Set(int64(0))
|
||||||
b.BufferLimit.Set(int64(capacity))
|
bs.BufferLimit.Set(int64(capacity))
|
||||||
return b
|
return bs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Len returns the number of metrics currently in the buffer.
|
func (b *BufferStats) metricAdded() {
|
||||||
func (b *Buffer) Len() int {
|
|
||||||
b.Lock()
|
|
||||||
defer b.Unlock()
|
|
||||||
|
|
||||||
return b.length()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Buffer) length() int {
|
|
||||||
return min(b.size+b.batchSize, b.cap)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Buffer) metricAdded() {
|
|
||||||
b.MetricsAdded.Incr(1)
|
b.MetricsAdded.Incr(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Buffer) metricWritten(metric telegraf.Metric) {
|
func (b *BufferStats) metricWritten(metric telegraf.Metric) {
|
||||||
AgentMetricsWritten.Incr(1)
|
AgentMetricsWritten.Incr(1)
|
||||||
b.MetricsWritten.Incr(1)
|
b.MetricsWritten.Incr(1)
|
||||||
metric.Accept()
|
metric.Accept()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Buffer) metricDropped(metric telegraf.Metric) {
|
func (b *BufferStats) metricDropped(metric telegraf.Metric) {
|
||||||
AgentMetricsDropped.Incr(1)
|
AgentMetricsDropped.Incr(1)
|
||||||
b.MetricsDropped.Incr(1)
|
b.MetricsDropped.Incr(1)
|
||||||
metric.Reject()
|
metric.Reject()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Buffer) addMetric(m telegraf.Metric) int {
|
|
||||||
dropped := 0
|
|
||||||
// Check if Buffer is full
|
|
||||||
if b.size == b.cap {
|
|
||||||
b.metricDropped(b.buf[b.last])
|
|
||||||
dropped++
|
|
||||||
|
|
||||||
if b.batchSize > 0 {
|
|
||||||
b.batchSize--
|
|
||||||
b.batchFirst = b.next(b.batchFirst)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
b.metricAdded()
|
|
||||||
|
|
||||||
b.buf[b.last] = m
|
|
||||||
b.last = b.next(b.last)
|
|
||||||
|
|
||||||
if b.size == b.cap {
|
|
||||||
b.first = b.next(b.first)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.size = min(b.size+1, b.cap)
|
|
||||||
return dropped
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add adds metrics to the buffer and returns number of dropped metrics.
|
|
||||||
func (b *Buffer) Add(metrics ...telegraf.Metric) int {
|
|
||||||
b.Lock()
|
|
||||||
defer b.Unlock()
|
|
||||||
|
|
||||||
dropped := 0
|
|
||||||
for i := range metrics {
|
|
||||||
if n := b.addMetric(metrics[i]); n != 0 {
|
|
||||||
dropped += n
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
b.BufferSize.Set(int64(b.length()))
|
|
||||||
return dropped
|
|
||||||
}
|
|
||||||
|
|
||||||
// Batch returns a slice containing up to batchSize of the oldest metrics not
|
|
||||||
// yet dropped. Metrics are ordered from oldest to newest in the batch. The
|
|
||||||
// batch must not be modified by the client.
|
|
||||||
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
|
|
||||||
b.Lock()
|
|
||||||
defer b.Unlock()
|
|
||||||
|
|
||||||
outLen := min(b.size, batchSize)
|
|
||||||
out := make([]telegraf.Metric, outLen)
|
|
||||||
if outLen == 0 {
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
b.batchFirst = b.first
|
|
||||||
b.batchSize = outLen
|
|
||||||
|
|
||||||
batchIndex := b.batchFirst
|
|
||||||
for i := range out {
|
|
||||||
out[i] = b.buf[batchIndex]
|
|
||||||
b.buf[batchIndex] = nil
|
|
||||||
batchIndex = b.next(batchIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.first = b.nextby(b.first, b.batchSize)
|
|
||||||
b.size -= outLen
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept marks the batch, acquired from Batch(), as successfully written.
|
|
||||||
func (b *Buffer) Accept(batch []telegraf.Metric) {
|
|
||||||
b.Lock()
|
|
||||||
defer b.Unlock()
|
|
||||||
|
|
||||||
for _, m := range batch {
|
|
||||||
b.metricWritten(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.resetBatch()
|
|
||||||
b.BufferSize.Set(int64(b.length()))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reject returns the batch, acquired from Batch(), to the buffer and marks it
|
|
||||||
// as unsent.
|
|
||||||
func (b *Buffer) Reject(batch []telegraf.Metric) {
|
|
||||||
b.Lock()
|
|
||||||
defer b.Unlock()
|
|
||||||
|
|
||||||
if len(batch) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
free := b.cap - b.size
|
|
||||||
restore := min(len(batch), free)
|
|
||||||
skip := len(batch) - restore
|
|
||||||
|
|
||||||
b.first = b.prevby(b.first, restore)
|
|
||||||
b.size = min(b.size+restore, b.cap)
|
|
||||||
|
|
||||||
re := b.first
|
|
||||||
|
|
||||||
// Copy metrics from the batch back into the buffer
|
|
||||||
for i := range batch {
|
|
||||||
if i < skip {
|
|
||||||
b.metricDropped(batch[i])
|
|
||||||
} else {
|
|
||||||
b.buf[re] = batch[i]
|
|
||||||
re = b.next(re)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
b.resetBatch()
|
|
||||||
b.BufferSize.Set(int64(b.length()))
|
|
||||||
}
|
|
||||||
|
|
||||||
// next returns the next index with wrapping.
|
|
||||||
func (b *Buffer) next(index int) int {
|
|
||||||
index++
|
|
||||||
if index == b.cap {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return index
|
|
||||||
}
|
|
||||||
|
|
||||||
// nextby returns the index that is count newer with wrapping.
|
|
||||||
func (b *Buffer) nextby(index, count int) int {
|
|
||||||
index += count
|
|
||||||
index %= b.cap
|
|
||||||
return index
|
|
||||||
}
|
|
||||||
|
|
||||||
// prevby returns the index that is count older with wrapping.
|
|
||||||
func (b *Buffer) prevby(index, count int) int {
|
|
||||||
index -= count
|
|
||||||
for index < 0 {
|
|
||||||
index += b.cap
|
|
||||||
}
|
|
||||||
|
|
||||||
index %= b.cap
|
|
||||||
return index
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Buffer) resetBatch() {
|
|
||||||
b.batchFirst = 0
|
|
||||||
b.batchSize = 0
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,186 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MemoryBuffer stores metrics in a circular buffer.
|
||||||
|
type MemoryBuffer struct {
|
||||||
|
sync.Mutex
|
||||||
|
BufferStats
|
||||||
|
|
||||||
|
buf []telegraf.Metric
|
||||||
|
first int // index of the first/oldest metric
|
||||||
|
last int // one after the index of the last/newest metric
|
||||||
|
size int // number of metrics currently in the buffer
|
||||||
|
cap int // the capacity of the buffer
|
||||||
|
|
||||||
|
batchFirst int // index of the first metric in the batch
|
||||||
|
batchSize int // number of metrics currently in the batch
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMemoryBuffer(capacity int, stats BufferStats) (*MemoryBuffer, error) {
|
||||||
|
return &MemoryBuffer{
|
||||||
|
BufferStats: stats,
|
||||||
|
buf: make([]telegraf.Metric, capacity),
|
||||||
|
cap: capacity,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MemoryBuffer) Len() int {
|
||||||
|
b.Lock()
|
||||||
|
defer b.Unlock()
|
||||||
|
|
||||||
|
return b.length()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MemoryBuffer) length() int {
|
||||||
|
return min(b.size+b.batchSize, b.cap)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MemoryBuffer) addMetric(m telegraf.Metric) int {
|
||||||
|
dropped := 0
|
||||||
|
// Check if Buffer is full
|
||||||
|
if b.size == b.cap {
|
||||||
|
b.metricDropped(b.buf[b.last])
|
||||||
|
dropped++
|
||||||
|
|
||||||
|
if b.batchSize > 0 {
|
||||||
|
b.batchSize--
|
||||||
|
b.batchFirst = b.next(b.batchFirst)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
b.metricAdded()
|
||||||
|
|
||||||
|
b.buf[b.last] = m
|
||||||
|
b.last = b.next(b.last)
|
||||||
|
|
||||||
|
if b.size == b.cap {
|
||||||
|
b.first = b.next(b.first)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.size = min(b.size+1, b.cap)
|
||||||
|
return dropped
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MemoryBuffer) Add(metrics ...telegraf.Metric) int {
|
||||||
|
b.Lock()
|
||||||
|
defer b.Unlock()
|
||||||
|
|
||||||
|
dropped := 0
|
||||||
|
for i := range metrics {
|
||||||
|
if n := b.addMetric(metrics[i]); n != 0 {
|
||||||
|
dropped += n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
b.BufferSize.Set(int64(b.length()))
|
||||||
|
return dropped
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MemoryBuffer) Batch(batchSize int) []telegraf.Metric {
|
||||||
|
b.Lock()
|
||||||
|
defer b.Unlock()
|
||||||
|
|
||||||
|
outLen := min(b.size, batchSize)
|
||||||
|
out := make([]telegraf.Metric, outLen)
|
||||||
|
if outLen == 0 {
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
b.batchFirst = b.first
|
||||||
|
b.batchSize = outLen
|
||||||
|
|
||||||
|
batchIndex := b.batchFirst
|
||||||
|
for i := range out {
|
||||||
|
out[i] = b.buf[batchIndex]
|
||||||
|
b.buf[batchIndex] = nil
|
||||||
|
batchIndex = b.next(batchIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.first = b.nextby(b.first, b.batchSize)
|
||||||
|
b.size -= outLen
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MemoryBuffer) Accept(batch []telegraf.Metric) {
|
||||||
|
b.Lock()
|
||||||
|
defer b.Unlock()
|
||||||
|
|
||||||
|
for _, m := range batch {
|
||||||
|
b.metricWritten(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.resetBatch()
|
||||||
|
b.BufferSize.Set(int64(b.length()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MemoryBuffer) Reject(batch []telegraf.Metric) {
|
||||||
|
b.Lock()
|
||||||
|
defer b.Unlock()
|
||||||
|
|
||||||
|
if len(batch) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
free := b.cap - b.size
|
||||||
|
restore := min(len(batch), free)
|
||||||
|
skip := len(batch) - restore
|
||||||
|
|
||||||
|
b.first = b.prevby(b.first, restore)
|
||||||
|
b.size = min(b.size+restore, b.cap)
|
||||||
|
|
||||||
|
re := b.first
|
||||||
|
|
||||||
|
// Copy metrics from the batch back into the buffer
|
||||||
|
for i := range batch {
|
||||||
|
if i < skip {
|
||||||
|
b.metricDropped(batch[i])
|
||||||
|
} else {
|
||||||
|
b.buf[re] = batch[i]
|
||||||
|
re = b.next(re)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
b.resetBatch()
|
||||||
|
b.BufferSize.Set(int64(b.length()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MemoryBuffer) Stats() BufferStats {
|
||||||
|
return b.BufferStats
|
||||||
|
}
|
||||||
|
|
||||||
|
// next returns the next index with wrapping.
|
||||||
|
func (b *MemoryBuffer) next(index int) int {
|
||||||
|
index++
|
||||||
|
if index == b.cap {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return index
|
||||||
|
}
|
||||||
|
|
||||||
|
// nextby returns the index that is count newer with wrapping.
|
||||||
|
func (b *MemoryBuffer) nextby(index, count int) int {
|
||||||
|
index += count
|
||||||
|
index %= b.cap
|
||||||
|
return index
|
||||||
|
}
|
||||||
|
|
||||||
|
// prevby returns the index that is count older with wrapping.
|
||||||
|
func (b *MemoryBuffer) prevby(index, count int) int {
|
||||||
|
index -= count
|
||||||
|
for index < 0 {
|
||||||
|
index += b.cap
|
||||||
|
}
|
||||||
|
|
||||||
|
index %= b.cap
|
||||||
|
return index
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *MemoryBuffer) resetBatch() {
|
||||||
|
b.batchFirst = 0
|
||||||
|
b.batchSize = 0
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,25 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newTestMemoryBuffer(t testing.TB, capacity int) Buffer {
|
||||||
|
t.Helper()
|
||||||
|
buf, err := NewBuffer("test", "", capacity, "memory", "")
|
||||||
|
require.NoError(t, err)
|
||||||
|
buf.Stats().MetricsAdded.Set(0)
|
||||||
|
buf.Stats().MetricsWritten.Set(0)
|
||||||
|
buf.Stats().MetricsDropped.Set(0)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkAddMetrics(b *testing.B) {
|
||||||
|
buf := newTestMemoryBuffer(b, 10000)
|
||||||
|
m := Metric()
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
buf.Add(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,817 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MockMetric struct {
|
||||||
|
telegraf.Metric
|
||||||
|
AcceptF func()
|
||||||
|
RejectF func()
|
||||||
|
DropF func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockMetric) Accept() {
|
||||||
|
m.AcceptF()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockMetric) Reject() {
|
||||||
|
m.RejectF()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockMetric) Drop() {
|
||||||
|
m.DropF()
|
||||||
|
}
|
||||||
|
|
||||||
|
type BufferSuiteTest struct {
|
||||||
|
suite.Suite
|
||||||
|
bufferType string
|
||||||
|
bufferPath string
|
||||||
|
|
||||||
|
hasMaxCapacity bool // whether the buffer type being tested supports a maximum metric capacity
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) SetupTest() {
|
||||||
|
switch s.bufferType {
|
||||||
|
case "", "memory":
|
||||||
|
s.hasMaxCapacity = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TearDownTest() {
|
||||||
|
if s.bufferPath != "" {
|
||||||
|
os.RemoveAll(s.bufferPath)
|
||||||
|
s.bufferPath = ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryBufferSuite(t *testing.T) {
|
||||||
|
suite.Run(t, &BufferSuiteTest{bufferType: "memory"})
|
||||||
|
}
|
||||||
|
|
||||||
|
func Metric() telegraf.Metric {
|
||||||
|
return MetricTime(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func MetricTime(sec int64) telegraf.Metric {
|
||||||
|
m := metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(sec, 0),
|
||||||
|
)
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer {
|
||||||
|
s.T().Helper()
|
||||||
|
buf, err := NewBuffer("test", "", capacity, s.bufferType, s.bufferPath)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
buf.Stats().MetricsAdded.Set(0)
|
||||||
|
buf.Stats().MetricsWritten.Set(0)
|
||||||
|
buf.Stats().MetricsDropped.Set(0)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_LenEmpty() {
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
|
||||||
|
s.Equal(0, b.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_LenOne() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m)
|
||||||
|
|
||||||
|
s.Equal(1, b.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_LenFull() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
|
||||||
|
s.Equal(5, b.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_LenOverfill() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m, m, m, m, m, m)
|
||||||
|
|
||||||
|
s.Equal(5, b.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchLenZero() {
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
batch := b.Batch(0)
|
||||||
|
|
||||||
|
s.Empty(batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchLenBufferEmpty() {
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
|
||||||
|
s.Empty(batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchLenUnderfill() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
|
||||||
|
s.Len(batch, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchLenFill() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m, m, m)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
s.Len(batch, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchLenExact() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m, m)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
s.Len(batch, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchLenLargerThanBuffer() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
batch := b.Batch(6)
|
||||||
|
s.Len(batch, 5)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchWrap() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Accept(batch)
|
||||||
|
b.Add(m, m)
|
||||||
|
batch = b.Batch(5)
|
||||||
|
s.Len(batch, 5)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchLatest() {
|
||||||
|
b := s.newTestBuffer(4)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
batch := b.Batch(2)
|
||||||
|
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(1),
|
||||||
|
MetricTime(2),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchLatestWrap() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
b := s.newTestBuffer(4)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
batch := b.Batch(2)
|
||||||
|
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(2),
|
||||||
|
MetricTime(3),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_MultipleBatch() {
|
||||||
|
b := s.newTestBuffer(10)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
b.Add(MetricTime(6))
|
||||||
|
batch := b.Batch(5)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(1),
|
||||||
|
MetricTime(2),
|
||||||
|
MetricTime(3),
|
||||||
|
MetricTime(4),
|
||||||
|
MetricTime(5),
|
||||||
|
}, batch)
|
||||||
|
b.Accept(batch)
|
||||||
|
batch = b.Batch(5)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(6),
|
||||||
|
}, batch)
|
||||||
|
b.Accept(batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectWithRoom() {
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
s.Equal(int64(0), b.Stats().MetricsDropped.Get())
|
||||||
|
|
||||||
|
batch = b.Batch(5)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(1),
|
||||||
|
MetricTime(2),
|
||||||
|
MetricTime(3),
|
||||||
|
MetricTime(4),
|
||||||
|
MetricTime(5),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectNothingNewFull() {
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
s.Equal(int64(0), b.Stats().MetricsDropped.Get())
|
||||||
|
|
||||||
|
batch = b.Batch(5)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(1),
|
||||||
|
MetricTime(2),
|
||||||
|
MetricTime(3),
|
||||||
|
MetricTime(4),
|
||||||
|
MetricTime(5),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectNoRoom() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
batch := b.Batch(2)
|
||||||
|
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
b.Add(MetricTime(6))
|
||||||
|
b.Add(MetricTime(7))
|
||||||
|
b.Add(MetricTime(8))
|
||||||
|
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
s.Equal(int64(3), b.Stats().MetricsDropped.Get())
|
||||||
|
|
||||||
|
batch = b.Batch(5)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(4),
|
||||||
|
MetricTime(5),
|
||||||
|
MetricTime(6),
|
||||||
|
MetricTime(7),
|
||||||
|
MetricTime(8),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectRoomExact() {
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
s.Equal(int64(0), b.Stats().MetricsDropped.Get())
|
||||||
|
|
||||||
|
batch = b.Batch(5)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(1),
|
||||||
|
MetricTime(2),
|
||||||
|
MetricTime(3),
|
||||||
|
MetricTime(4),
|
||||||
|
MetricTime(5),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectRoomOverwriteOld() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
batch := b.Batch(1)
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
b.Add(MetricTime(6))
|
||||||
|
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
s.Equal(int64(1), b.Stats().MetricsDropped.Get())
|
||||||
|
|
||||||
|
batch = b.Batch(5)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(2),
|
||||||
|
MetricTime(3),
|
||||||
|
MetricTime(4),
|
||||||
|
MetricTime(5),
|
||||||
|
MetricTime(6),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectPartialRoom() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
batch := b.Batch(2)
|
||||||
|
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
b.Add(MetricTime(6))
|
||||||
|
b.Add(MetricTime(7))
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
s.Equal(int64(2), b.Stats().MetricsDropped.Get())
|
||||||
|
|
||||||
|
batch = b.Batch(5)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(3),
|
||||||
|
MetricTime(4),
|
||||||
|
MetricTime(5),
|
||||||
|
MetricTime(6),
|
||||||
|
MetricTime(7),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectNewMetricsWrapped() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
|
||||||
|
// buffer: 1, 4, 5; batch: 2, 3
|
||||||
|
s.Equal(int64(0), b.Stats().MetricsDropped.Get())
|
||||||
|
|
||||||
|
b.Add(MetricTime(6))
|
||||||
|
b.Add(MetricTime(7))
|
||||||
|
b.Add(MetricTime(8))
|
||||||
|
b.Add(MetricTime(9))
|
||||||
|
b.Add(MetricTime(10))
|
||||||
|
|
||||||
|
// buffer: 8, 9, 10, 6, 7; batch: 2, 3
|
||||||
|
s.Equal(int64(3), b.Stats().MetricsDropped.Get())
|
||||||
|
|
||||||
|
b.Add(MetricTime(11))
|
||||||
|
b.Add(MetricTime(12))
|
||||||
|
b.Add(MetricTime(13))
|
||||||
|
b.Add(MetricTime(14))
|
||||||
|
b.Add(MetricTime(15))
|
||||||
|
// buffer: 13, 14, 15, 11, 12; batch: 2, 3
|
||||||
|
s.Equal(int64(8), b.Stats().MetricsDropped.Get())
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
s.Equal(int64(10), b.Stats().MetricsDropped.Get())
|
||||||
|
|
||||||
|
batch = b.Batch(5)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(11),
|
||||||
|
MetricTime(12),
|
||||||
|
MetricTime(13),
|
||||||
|
MetricTime(14),
|
||||||
|
MetricTime(15),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectWrapped() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
|
||||||
|
b.Add(MetricTime(6))
|
||||||
|
b.Add(MetricTime(7))
|
||||||
|
b.Add(MetricTime(8))
|
||||||
|
batch := b.Batch(3)
|
||||||
|
|
||||||
|
b.Add(MetricTime(9))
|
||||||
|
b.Add(MetricTime(10))
|
||||||
|
b.Add(MetricTime(11))
|
||||||
|
b.Add(MetricTime(12))
|
||||||
|
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
batch = b.Batch(5)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(8),
|
||||||
|
MetricTime(9),
|
||||||
|
MetricTime(10),
|
||||||
|
MetricTime(11),
|
||||||
|
MetricTime(12),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectAdjustFirst() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
b := s.newTestBuffer(10)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
b.Add(MetricTime(3))
|
||||||
|
batch := b.Batch(3)
|
||||||
|
b.Add(MetricTime(4))
|
||||||
|
b.Add(MetricTime(5))
|
||||||
|
b.Add(MetricTime(6))
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
b.Add(MetricTime(7))
|
||||||
|
b.Add(MetricTime(8))
|
||||||
|
b.Add(MetricTime(9))
|
||||||
|
batch = b.Batch(3)
|
||||||
|
b.Add(MetricTime(10))
|
||||||
|
b.Add(MetricTime(11))
|
||||||
|
b.Add(MetricTime(12))
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
b.Add(MetricTime(13))
|
||||||
|
b.Add(MetricTime(14))
|
||||||
|
b.Add(MetricTime(15))
|
||||||
|
batch = b.Batch(3)
|
||||||
|
b.Add(MetricTime(16))
|
||||||
|
b.Add(MetricTime(17))
|
||||||
|
b.Add(MetricTime(18))
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
b.Add(MetricTime(19))
|
||||||
|
|
||||||
|
batch = b.Batch(10)
|
||||||
|
testutil.RequireMetricsEqual(s.T(),
|
||||||
|
[]telegraf.Metric{
|
||||||
|
MetricTime(10),
|
||||||
|
MetricTime(11),
|
||||||
|
MetricTime(12),
|
||||||
|
MetricTime(13),
|
||||||
|
MetricTime(14),
|
||||||
|
MetricTime(15),
|
||||||
|
MetricTime(16),
|
||||||
|
MetricTime(17),
|
||||||
|
MetricTime(18),
|
||||||
|
MetricTime(19),
|
||||||
|
}, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_AddDropsOverwrittenMetrics() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
|
||||||
|
s.Equal(int64(5), b.Stats().MetricsDropped.Get())
|
||||||
|
s.Equal(int64(0), b.Stats().MetricsWritten.Get())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_AcceptRemovesBatch() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m, m, m)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Accept(batch)
|
||||||
|
s.Equal(1, b.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectLeavesBatch() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m, m, m)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Reject(batch)
|
||||||
|
s.Equal(3, b.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_AcceptWritesOverwrittenBatch() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
batch := b.Batch(5)
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
b.Accept(batch)
|
||||||
|
|
||||||
|
s.Equal(int64(0), b.Stats().MetricsDropped.Get())
|
||||||
|
s.Equal(int64(5), b.Stats().MetricsWritten.Get())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchRejectDropsOverwrittenBatch() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
batch := b.Batch(5)
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
b.Reject(batch)
|
||||||
|
|
||||||
|
s.Equal(int64(5), b.Stats().MetricsDropped.Get())
|
||||||
|
s.Equal(int64(0), b.Stats().MetricsWritten.Get())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_MetricsOverwriteBatchAccept() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
batch := b.Batch(3)
|
||||||
|
b.Add(m, m, m)
|
||||||
|
b.Accept(batch)
|
||||||
|
s.Equal(int64(0), b.Stats().MetricsDropped.Get(), "dropped")
|
||||||
|
s.Equal(int64(3), b.Stats().MetricsWritten.Get(), "written")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_MetricsOverwriteBatchReject() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
batch := b.Batch(3)
|
||||||
|
b.Add(m, m, m)
|
||||||
|
b.Reject(batch)
|
||||||
|
s.Equal(int64(3), b.Stats().MetricsDropped.Get())
|
||||||
|
s.Equal(int64(0), b.Stats().MetricsWritten.Get())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_MetricsBatchAcceptRemoved() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
batch := b.Batch(3)
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
b.Accept(batch)
|
||||||
|
s.Equal(int64(2), b.Stats().MetricsDropped.Get())
|
||||||
|
s.Equal(int64(3), b.Stats().MetricsWritten.Get())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_WrapWithBatch() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
|
||||||
|
b.Add(m, m, m)
|
||||||
|
b.Batch(3)
|
||||||
|
b.Add(m, m, m, m, m, m)
|
||||||
|
|
||||||
|
s.Equal(int64(1), b.Stats().MetricsDropped.Get())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchNotRemoved() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
b.Batch(2)
|
||||||
|
s.Equal(5, b.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_BatchRejectAcceptNoop() {
|
||||||
|
m := Metric()
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(m, m, m, m, m)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Reject(batch)
|
||||||
|
b.Accept(batch)
|
||||||
|
s.Equal(5, b.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_AcceptCallsMetricAccept() {
|
||||||
|
var accept int
|
||||||
|
mm := &MockMetric{
|
||||||
|
Metric: Metric(),
|
||||||
|
AcceptF: func() {
|
||||||
|
accept++
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(mm, mm, mm)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Accept(batch)
|
||||||
|
s.Equal(2, accept)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNoBatch() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
var reject int
|
||||||
|
mm := &MockMetric{
|
||||||
|
Metric: Metric(),
|
||||||
|
RejectF: func() {
|
||||||
|
reject++
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(mm, mm, mm, mm, mm)
|
||||||
|
b.Add(mm, mm)
|
||||||
|
s.Equal(2, reject)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNotInBatch() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
var reject int
|
||||||
|
mm := &MockMetric{
|
||||||
|
Metric: Metric(),
|
||||||
|
RejectF: func() {
|
||||||
|
reject++
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(mm, mm, mm, mm, mm)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Add(mm, mm, mm, mm)
|
||||||
|
s.Equal(2, reject)
|
||||||
|
b.Reject(batch)
|
||||||
|
s.Equal(4, reject)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectCallsMetricRejectWithOverwritten() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
var reject int
|
||||||
|
mm := &MockMetric{
|
||||||
|
Metric: Metric(),
|
||||||
|
RejectF: func() {
|
||||||
|
reject++
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(mm, mm, mm, mm, mm)
|
||||||
|
batch := b.Batch(5)
|
||||||
|
b.Add(mm, mm)
|
||||||
|
s.Equal(0, reject)
|
||||||
|
b.Reject(batch)
|
||||||
|
s.Equal(2, reject)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_AddOverwriteAndReject() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
var reject int
|
||||||
|
mm := &MockMetric{
|
||||||
|
Metric: Metric(),
|
||||||
|
RejectF: func() {
|
||||||
|
reject++
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(mm, mm, mm, mm, mm)
|
||||||
|
batch := b.Batch(5)
|
||||||
|
b.Add(mm, mm, mm, mm, mm)
|
||||||
|
b.Add(mm, mm, mm, mm, mm)
|
||||||
|
b.Add(mm, mm, mm, mm, mm)
|
||||||
|
b.Add(mm, mm, mm, mm, mm)
|
||||||
|
s.Equal(15, reject)
|
||||||
|
b.Reject(batch)
|
||||||
|
s.Equal(20, reject)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_AddOverwriteAndRejectOffset() {
|
||||||
|
if !s.hasMaxCapacity {
|
||||||
|
s.T().Skip("tested buffer does not have a maximum capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
var reject int
|
||||||
|
var accept int
|
||||||
|
mm := &MockMetric{
|
||||||
|
Metric: Metric(),
|
||||||
|
RejectF: func() {
|
||||||
|
reject++
|
||||||
|
},
|
||||||
|
AcceptF: func() {
|
||||||
|
accept++
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
b.Add(mm, mm, mm)
|
||||||
|
b.Add(mm, mm, mm, mm)
|
||||||
|
s.Equal(2, reject)
|
||||||
|
batch := b.Batch(5)
|
||||||
|
b.Add(mm, mm, mm, mm)
|
||||||
|
s.Equal(2, reject)
|
||||||
|
b.Add(mm, mm, mm, mm)
|
||||||
|
s.Equal(5, reject)
|
||||||
|
b.Add(mm, mm, mm, mm)
|
||||||
|
s.Equal(9, reject)
|
||||||
|
b.Add(mm, mm, mm, mm)
|
||||||
|
s.Equal(13, reject)
|
||||||
|
b.Accept(batch)
|
||||||
|
s.Equal(13, reject)
|
||||||
|
s.Equal(5, accept)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BufferSuiteTest) TestBuffer_RejectEmptyBatch() {
|
||||||
|
b := s.newTestBuffer(5)
|
||||||
|
batch := b.Batch(2)
|
||||||
|
b.Add(MetricTime(1))
|
||||||
|
b.Reject(batch)
|
||||||
|
b.Add(MetricTime(2))
|
||||||
|
batch = b.Batch(2)
|
||||||
|
for _, m := range batch {
|
||||||
|
s.NotNil(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,726 +0,0 @@
|
||||||
package models
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"github.com/influxdata/telegraf/metric"
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MockMetric struct {
|
|
||||||
telegraf.Metric
|
|
||||||
AcceptF func()
|
|
||||||
RejectF func()
|
|
||||||
DropF func()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetric) Accept() {
|
|
||||||
m.AcceptF()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetric) Reject() {
|
|
||||||
m.RejectF()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetric) Drop() {
|
|
||||||
m.DropF()
|
|
||||||
}
|
|
||||||
|
|
||||||
func Metric() telegraf.Metric {
|
|
||||||
return MetricTime(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func MetricTime(sec int64) telegraf.Metric {
|
|
||||||
m := metric.New(
|
|
||||||
"cpu",
|
|
||||||
map[string]string{},
|
|
||||||
map[string]interface{}{
|
|
||||||
"value": 42.0,
|
|
||||||
},
|
|
||||||
time.Unix(sec, 0),
|
|
||||||
)
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkAddMetrics(b *testing.B) {
|
|
||||||
buf := NewBuffer("test", "", 10000)
|
|
||||||
m := Metric()
|
|
||||||
for n := 0; n < b.N; n++ {
|
|
||||||
buf.Add(m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func setup(b *Buffer) *Buffer {
|
|
||||||
b.MetricsAdded.Set(0)
|
|
||||||
b.MetricsWritten.Set(0)
|
|
||||||
b.MetricsDropped.Set(0)
|
|
||||||
return b
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_LenEmpty(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
|
|
||||||
require.Equal(t, 0, b.Len())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_LenOne(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m)
|
|
||||||
|
|
||||||
require.Equal(t, 1, b.Len())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_LenFull(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
|
|
||||||
require.Equal(t, 5, b.Len())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_LenOverfill(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
setup(b)
|
|
||||||
b.Add(m, m, m, m, m, m)
|
|
||||||
|
|
||||||
require.Equal(t, 5, b.Len())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchLenZero(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
batch := b.Batch(0)
|
|
||||||
|
|
||||||
require.Empty(t, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchLenBufferEmpty(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
batch := b.Batch(2)
|
|
||||||
|
|
||||||
require.Empty(t, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchLenUnderfill(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m)
|
|
||||||
batch := b.Batch(2)
|
|
||||||
|
|
||||||
require.Len(t, batch, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchLenFill(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m, m, m)
|
|
||||||
batch := b.Batch(2)
|
|
||||||
require.Len(t, batch, 2)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchLenExact(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m, m)
|
|
||||||
batch := b.Batch(2)
|
|
||||||
require.Len(t, batch, 2)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchLenLargerThanBuffer(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
batch := b.Batch(6)
|
|
||||||
require.Len(t, batch, 5)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchWrap(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Accept(batch)
|
|
||||||
b.Add(m, m)
|
|
||||||
batch = b.Batch(5)
|
|
||||||
require.Len(t, batch, 5)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchLatest(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 4))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
batch := b.Batch(2)
|
|
||||||
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(1),
|
|
||||||
MetricTime(2),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchLatestWrap(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 4))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
batch := b.Batch(2)
|
|
||||||
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(2),
|
|
||||||
MetricTime(3),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_MultipleBatch(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 10))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
b.Add(MetricTime(6))
|
|
||||||
batch := b.Batch(5)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(1),
|
|
||||||
MetricTime(2),
|
|
||||||
MetricTime(3),
|
|
||||||
MetricTime(4),
|
|
||||||
MetricTime(5),
|
|
||||||
}, batch)
|
|
||||||
b.Accept(batch)
|
|
||||||
batch = b.Batch(5)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(6),
|
|
||||||
}, batch)
|
|
||||||
b.Accept(batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectWithRoom(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
require.Equal(t, int64(0), b.MetricsDropped.Get())
|
|
||||||
|
|
||||||
batch = b.Batch(5)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(1),
|
|
||||||
MetricTime(2),
|
|
||||||
MetricTime(3),
|
|
||||||
MetricTime(4),
|
|
||||||
MetricTime(5),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectNothingNewFull(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
require.Equal(t, int64(0), b.MetricsDropped.Get())
|
|
||||||
|
|
||||||
batch = b.Batch(5)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(1),
|
|
||||||
MetricTime(2),
|
|
||||||
MetricTime(3),
|
|
||||||
MetricTime(4),
|
|
||||||
MetricTime(5),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectNoRoom(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
batch := b.Batch(2)
|
|
||||||
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
b.Add(MetricTime(6))
|
|
||||||
b.Add(MetricTime(7))
|
|
||||||
b.Add(MetricTime(8))
|
|
||||||
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
require.Equal(t, int64(3), b.MetricsDropped.Get())
|
|
||||||
|
|
||||||
batch = b.Batch(5)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(4),
|
|
||||||
MetricTime(5),
|
|
||||||
MetricTime(6),
|
|
||||||
MetricTime(7),
|
|
||||||
MetricTime(8),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectRoomExact(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
require.Equal(t, int64(0), b.MetricsDropped.Get())
|
|
||||||
|
|
||||||
batch = b.Batch(5)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(1),
|
|
||||||
MetricTime(2),
|
|
||||||
MetricTime(3),
|
|
||||||
MetricTime(4),
|
|
||||||
MetricTime(5),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectRoomOverwriteOld(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
batch := b.Batch(1)
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
b.Add(MetricTime(6))
|
|
||||||
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
require.Equal(t, int64(1), b.MetricsDropped.Get())
|
|
||||||
|
|
||||||
batch = b.Batch(5)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(2),
|
|
||||||
MetricTime(3),
|
|
||||||
MetricTime(4),
|
|
||||||
MetricTime(5),
|
|
||||||
MetricTime(6),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectPartialRoom(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
batch := b.Batch(2)
|
|
||||||
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
b.Add(MetricTime(6))
|
|
||||||
b.Add(MetricTime(7))
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
require.Equal(t, int64(2), b.MetricsDropped.Get())
|
|
||||||
|
|
||||||
batch = b.Batch(5)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(3),
|
|
||||||
MetricTime(4),
|
|
||||||
MetricTime(5),
|
|
||||||
MetricTime(6),
|
|
||||||
MetricTime(7),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectNewMetricsWrapped(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
|
|
||||||
// buffer: 1, 4, 5; batch: 2, 3
|
|
||||||
require.Equal(t, int64(0), b.MetricsDropped.Get())
|
|
||||||
|
|
||||||
b.Add(MetricTime(6))
|
|
||||||
b.Add(MetricTime(7))
|
|
||||||
b.Add(MetricTime(8))
|
|
||||||
b.Add(MetricTime(9))
|
|
||||||
b.Add(MetricTime(10))
|
|
||||||
|
|
||||||
// buffer: 8, 9, 10, 6, 7; batch: 2, 3
|
|
||||||
require.Equal(t, int64(3), b.MetricsDropped.Get())
|
|
||||||
|
|
||||||
b.Add(MetricTime(11))
|
|
||||||
b.Add(MetricTime(12))
|
|
||||||
b.Add(MetricTime(13))
|
|
||||||
b.Add(MetricTime(14))
|
|
||||||
b.Add(MetricTime(15))
|
|
||||||
// buffer: 13, 14, 15, 11, 12; batch: 2, 3
|
|
||||||
require.Equal(t, int64(8), b.MetricsDropped.Get())
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
require.Equal(t, int64(10), b.MetricsDropped.Get())
|
|
||||||
|
|
||||||
batch = b.Batch(5)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(11),
|
|
||||||
MetricTime(12),
|
|
||||||
MetricTime(13),
|
|
||||||
MetricTime(14),
|
|
||||||
MetricTime(15),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectWrapped(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
|
|
||||||
b.Add(MetricTime(6))
|
|
||||||
b.Add(MetricTime(7))
|
|
||||||
b.Add(MetricTime(8))
|
|
||||||
batch := b.Batch(3)
|
|
||||||
|
|
||||||
b.Add(MetricTime(9))
|
|
||||||
b.Add(MetricTime(10))
|
|
||||||
b.Add(MetricTime(11))
|
|
||||||
b.Add(MetricTime(12))
|
|
||||||
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
batch = b.Batch(5)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(8),
|
|
||||||
MetricTime(9),
|
|
||||||
MetricTime(10),
|
|
||||||
MetricTime(11),
|
|
||||||
MetricTime(12),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectAdjustFirst(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 10))
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
b.Add(MetricTime(3))
|
|
||||||
batch := b.Batch(3)
|
|
||||||
b.Add(MetricTime(4))
|
|
||||||
b.Add(MetricTime(5))
|
|
||||||
b.Add(MetricTime(6))
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
b.Add(MetricTime(7))
|
|
||||||
b.Add(MetricTime(8))
|
|
||||||
b.Add(MetricTime(9))
|
|
||||||
batch = b.Batch(3)
|
|
||||||
b.Add(MetricTime(10))
|
|
||||||
b.Add(MetricTime(11))
|
|
||||||
b.Add(MetricTime(12))
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
b.Add(MetricTime(13))
|
|
||||||
b.Add(MetricTime(14))
|
|
||||||
b.Add(MetricTime(15))
|
|
||||||
batch = b.Batch(3)
|
|
||||||
b.Add(MetricTime(16))
|
|
||||||
b.Add(MetricTime(17))
|
|
||||||
b.Add(MetricTime(18))
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
b.Add(MetricTime(19))
|
|
||||||
|
|
||||||
batch = b.Batch(10)
|
|
||||||
testutil.RequireMetricsEqual(t,
|
|
||||||
[]telegraf.Metric{
|
|
||||||
MetricTime(10),
|
|
||||||
MetricTime(11),
|
|
||||||
MetricTime(12),
|
|
||||||
MetricTime(13),
|
|
||||||
MetricTime(14),
|
|
||||||
MetricTime(15),
|
|
||||||
MetricTime(16),
|
|
||||||
MetricTime(17),
|
|
||||||
MetricTime(18),
|
|
||||||
MetricTime(19),
|
|
||||||
}, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
|
|
||||||
require.Equal(t, int64(5), b.MetricsDropped.Get())
|
|
||||||
require.Equal(t, int64(0), b.MetricsWritten.Get())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_AcceptRemovesBatch(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m, m, m)
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Accept(batch)
|
|
||||||
require.Equal(t, 1, b.Len())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectLeavesBatch(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m, m, m)
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Reject(batch)
|
|
||||||
require.Equal(t, 3, b.Len())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_AcceptWritesOverwrittenBatch(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
batch := b.Batch(5)
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
b.Accept(batch)
|
|
||||||
|
|
||||||
require.Equal(t, int64(0), b.MetricsDropped.Get())
|
|
||||||
require.Equal(t, int64(5), b.MetricsWritten.Get())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchRejectDropsOverwrittenBatch(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
batch := b.Batch(5)
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
b.Reject(batch)
|
|
||||||
|
|
||||||
require.Equal(t, int64(5), b.MetricsDropped.Get())
|
|
||||||
require.Equal(t, int64(0), b.MetricsWritten.Get())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_MetricsOverwriteBatchAccept(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
batch := b.Batch(3)
|
|
||||||
b.Add(m, m, m)
|
|
||||||
b.Accept(batch)
|
|
||||||
require.Equal(t, int64(0), b.MetricsDropped.Get(), "dropped")
|
|
||||||
require.Equal(t, int64(3), b.MetricsWritten.Get(), "written")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_MetricsOverwriteBatchReject(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
batch := b.Batch(3)
|
|
||||||
b.Add(m, m, m)
|
|
||||||
b.Reject(batch)
|
|
||||||
require.Equal(t, int64(3), b.MetricsDropped.Get())
|
|
||||||
require.Equal(t, int64(0), b.MetricsWritten.Get())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_MetricsBatchAcceptRemoved(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
batch := b.Batch(3)
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
b.Accept(batch)
|
|
||||||
require.Equal(t, int64(2), b.MetricsDropped.Get())
|
|
||||||
require.Equal(t, int64(3), b.MetricsWritten.Get())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_WrapWithBatch(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
|
|
||||||
b.Add(m, m, m)
|
|
||||||
b.Batch(3)
|
|
||||||
b.Add(m, m, m, m, m, m)
|
|
||||||
|
|
||||||
require.Equal(t, int64(1), b.MetricsDropped.Get())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchNotRemoved(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
b.Batch(2)
|
|
||||||
require.Equal(t, 5, b.Len())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_BatchRejectAcceptNoop(t *testing.T) {
|
|
||||||
m := Metric()
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(m, m, m, m, m)
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Reject(batch)
|
|
||||||
b.Accept(batch)
|
|
||||||
require.Equal(t, 5, b.Len())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_AcceptCallsMetricAccept(t *testing.T) {
|
|
||||||
var accept int
|
|
||||||
mm := &MockMetric{
|
|
||||||
Metric: Metric(),
|
|
||||||
AcceptF: func() {
|
|
||||||
accept++
|
|
||||||
},
|
|
||||||
}
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(mm, mm, mm)
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Accept(batch)
|
|
||||||
require.Equal(t, 2, accept)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_AddCallsMetricRejectWhenNoBatch(t *testing.T) {
|
|
||||||
var reject int
|
|
||||||
mm := &MockMetric{
|
|
||||||
Metric: Metric(),
|
|
||||||
RejectF: func() {
|
|
||||||
reject++
|
|
||||||
},
|
|
||||||
}
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
setup(b)
|
|
||||||
b.Add(mm, mm, mm, mm, mm)
|
|
||||||
b.Add(mm, mm)
|
|
||||||
require.Equal(t, 2, reject)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_AddCallsMetricRejectWhenNotInBatch(t *testing.T) {
|
|
||||||
var reject int
|
|
||||||
mm := &MockMetric{
|
|
||||||
Metric: Metric(),
|
|
||||||
RejectF: func() {
|
|
||||||
reject++
|
|
||||||
},
|
|
||||||
}
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
setup(b)
|
|
||||||
b.Add(mm, mm, mm, mm, mm)
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Add(mm, mm, mm, mm)
|
|
||||||
require.Equal(t, 2, reject)
|
|
||||||
b.Reject(batch)
|
|
||||||
require.Equal(t, 4, reject)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectCallsMetricRejectWithOverwritten(t *testing.T) {
|
|
||||||
var reject int
|
|
||||||
mm := &MockMetric{
|
|
||||||
Metric: Metric(),
|
|
||||||
RejectF: func() {
|
|
||||||
reject++
|
|
||||||
},
|
|
||||||
}
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(mm, mm, mm, mm, mm)
|
|
||||||
batch := b.Batch(5)
|
|
||||||
b.Add(mm, mm)
|
|
||||||
require.Equal(t, 0, reject)
|
|
||||||
b.Reject(batch)
|
|
||||||
require.Equal(t, 2, reject)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_AddOverwriteAndReject(t *testing.T) {
|
|
||||||
var reject int
|
|
||||||
mm := &MockMetric{
|
|
||||||
Metric: Metric(),
|
|
||||||
RejectF: func() {
|
|
||||||
reject++
|
|
||||||
},
|
|
||||||
}
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(mm, mm, mm, mm, mm)
|
|
||||||
batch := b.Batch(5)
|
|
||||||
b.Add(mm, mm, mm, mm, mm)
|
|
||||||
b.Add(mm, mm, mm, mm, mm)
|
|
||||||
b.Add(mm, mm, mm, mm, mm)
|
|
||||||
b.Add(mm, mm, mm, mm, mm)
|
|
||||||
require.Equal(t, 15, reject)
|
|
||||||
b.Reject(batch)
|
|
||||||
require.Equal(t, 20, reject)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) {
|
|
||||||
var reject int
|
|
||||||
var accept int
|
|
||||||
mm := &MockMetric{
|
|
||||||
Metric: Metric(),
|
|
||||||
RejectF: func() {
|
|
||||||
reject++
|
|
||||||
},
|
|
||||||
AcceptF: func() {
|
|
||||||
accept++
|
|
||||||
},
|
|
||||||
}
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
b.Add(mm, mm, mm)
|
|
||||||
b.Add(mm, mm, mm, mm)
|
|
||||||
require.Equal(t, 2, reject)
|
|
||||||
batch := b.Batch(5)
|
|
||||||
b.Add(mm, mm, mm, mm)
|
|
||||||
require.Equal(t, 2, reject)
|
|
||||||
b.Add(mm, mm, mm, mm)
|
|
||||||
require.Equal(t, 5, reject)
|
|
||||||
b.Add(mm, mm, mm, mm)
|
|
||||||
require.Equal(t, 9, reject)
|
|
||||||
b.Add(mm, mm, mm, mm)
|
|
||||||
require.Equal(t, 13, reject)
|
|
||||||
b.Accept(batch)
|
|
||||||
require.Equal(t, 13, reject)
|
|
||||||
require.Equal(t, 5, accept)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuffer_RejectEmptyBatch(t *testing.T) {
|
|
||||||
b := setup(NewBuffer("test", "", 5))
|
|
||||||
batch := b.Batch(2)
|
|
||||||
b.Add(MetricTime(1))
|
|
||||||
b.Reject(batch)
|
|
||||||
b.Add(MetricTime(2))
|
|
||||||
batch = b.Batch(2)
|
|
||||||
for _, m := range batch {
|
|
||||||
require.NotNil(t, m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -37,6 +37,9 @@ type OutputConfig struct {
|
||||||
NameOverride string
|
NameOverride string
|
||||||
NamePrefix string
|
NamePrefix string
|
||||||
NameSuffix string
|
NameSuffix string
|
||||||
|
|
||||||
|
BufferStrategy string
|
||||||
|
BufferDirectory string
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunningOutput contains the output configuration
|
// RunningOutput contains the output configuration
|
||||||
|
|
@ -56,7 +59,7 @@ type RunningOutput struct {
|
||||||
|
|
||||||
BatchReady chan time.Time
|
BatchReady chan time.Time
|
||||||
|
|
||||||
buffer *Buffer
|
buffer Buffer
|
||||||
log telegraf.Logger
|
log telegraf.Logger
|
||||||
|
|
||||||
started bool
|
started bool
|
||||||
|
|
@ -96,8 +99,13 @@ func NewRunningOutput(
|
||||||
batchSize = DefaultMetricBatchSize
|
batchSize = DefaultMetricBatchSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b, err := NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
ro := &RunningOutput{
|
ro := &RunningOutput{
|
||||||
buffer: NewBuffer(config.Name, config.Alias, bufferLimit),
|
buffer: b,
|
||||||
BatchReady: make(chan time.Time, 1),
|
BatchReady: make(chan time.Time, 1),
|
||||||
Output: output,
|
Output: output,
|
||||||
Config: config,
|
Config: config,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue