Send metrics in FIFO order (#7814)

Metrics are send from older to newer metrics, even when outputs is
failing.  In case of buffer full, we still drop the oldest metrics, but
non-dropped metrics are send in the order they are received.
This commit is contained in:
Pierre Fersing 2020-07-14 20:32:54 +02:00 committed by GitHub
parent fa0f739360
commit 3ec3f1bc50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 87 deletions

View File

@ -105,6 +105,7 @@
- [#7558](https://github.com/influxdata/telegraf/issues/7558): Remove trailing backslash from tag keys/values in influx serializer. - [#7558](https://github.com/influxdata/telegraf/issues/7558): Remove trailing backslash from tag keys/values in influx serializer.
- [#7715](https://github.com/influxdata/telegraf/issues/7715): Fix incorrect Azure SQL DB server properties. - [#7715](https://github.com/influxdata/telegraf/issues/7715): Fix incorrect Azure SQL DB server properties.
- [#7431](https://github.com/influxdata/telegraf/issues/7431): Fix json unmarshal error in the kibana input. - [#7431](https://github.com/influxdata/telegraf/issues/7431): Fix json unmarshal error in the kibana input.
- [#5633](https://github.com/influxdata/telegraf/issues/5633): Send metrics in FIFO order.
## v1.14.5 [2020-06-30] ## v1.14.5 [2020-06-30]

View File

@ -111,7 +111,7 @@ func (b *Buffer) add(m telegraf.Metric) int {
b.metricDropped(b.buf[b.last]) b.metricDropped(b.buf[b.last])
dropped++ dropped++
if b.last == b.batchFirst && b.batchSize > 0 { if b.batchSize > 0 {
b.batchSize-- b.batchSize--
b.batchFirst = b.next(b.batchFirst) b.batchFirst = b.next(b.batchFirst)
} }
@ -146,8 +146,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) int {
return dropped return dropped
} }
// Batch returns a slice containing up to batchSize of the most recently added // Batch returns a slice containing up to batchSize of the oldest metrics not
// metrics. Metrics are ordered from newest to oldest in the batch. The // yet dropped. Metrics are ordered from oldest to newest in the batch. The
// batch must not be modified by the client. // batch must not be modified by the client.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric { func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.Lock() b.Lock()
@ -159,18 +159,17 @@ func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
return out return out
} }
b.batchFirst = b.cap + b.last - outLen b.batchFirst = b.first
b.batchFirst %= b.cap
b.batchSize = outLen b.batchSize = outLen
batchIndex := b.batchFirst batchIndex := b.batchFirst
for i := range out { for i := range out {
out[len(out)-1-i] = b.buf[batchIndex] out[i] = b.buf[batchIndex]
b.buf[batchIndex] = nil b.buf[batchIndex] = nil
batchIndex = b.next(batchIndex) batchIndex = b.next(batchIndex)
} }
b.last = b.batchFirst b.first = b.nextby(b.first, b.batchSize)
b.size -= outLen b.size -= outLen
return out return out
} }
@ -198,38 +197,22 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
return return
} }
older := b.dist(b.first, b.batchFirst)
free := b.cap - b.size free := b.cap - b.size
restore := min(len(batch), free+older) restore := min(len(batch), free)
skip := len(batch) - restore
// Rotate newer metrics forward the number of metrics that we can restore. b.first = b.prevby(b.first, restore)
rb := b.batchFirst b.size = min(b.size+restore, b.cap)
rp := b.last
re := b.nextby(rp, restore)
b.last = re
for rb != rp && rp != re { re := b.first
rp = b.prev(rp)
re = b.prev(re)
if b.buf[re] != nil { // Copy metrics from the batch back into the buffer
b.metricDropped(b.buf[re])
b.first = b.next(b.first)
}
b.buf[re] = b.buf[rp]
b.buf[rp] = nil
}
// Copy metrics from the batch back into the buffer; recall that the
// batch is in reverse order compared to b.buf
for i := range batch { for i := range batch {
if i < restore { if i < skip {
re = b.prev(re)
b.buf[re] = batch[i]
b.size = min(b.size+1, b.cap)
} else {
b.metricDropped(batch[i]) b.metricDropped(batch[i])
} else {
b.buf[re] = batch[i]
re = b.next(re)
} }
} }
@ -273,6 +256,17 @@ func (b *Buffer) prev(index int) int {
return index return index
} }
// prevby returns the index that is count older with wrapping.
func (b *Buffer) prevby(index, count int) int {
index -= count
for index < 0 {
index += b.cap
}
index %= b.cap
return index
}
func (b *Buffer) resetBatch() { func (b *Buffer) resetBatch() {
b.batchFirst = 0 b.batchFirst = 0
b.batchSize = 0 b.batchSize = 0

View File

@ -161,7 +161,7 @@ func TestBuffer_BatchLatest(t *testing.T) {
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(3), MetricTime(1),
MetricTime(2), MetricTime(2),
}, batch) }, batch)
} }
@ -177,8 +177,8 @@ func TestBuffer_BatchLatestWrap(t *testing.T) {
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(5), MetricTime(2),
MetricTime(4), MetricTime(3),
}, batch) }, batch)
} }
@ -193,17 +193,17 @@ func TestBuffer_MultipleBatch(t *testing.T) {
batch := b.Batch(5) batch := b.Batch(5)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(6), MetricTime(1),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2), MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch) }, batch)
b.Accept(batch) b.Accept(batch)
batch = b.Batch(5) batch = b.Batch(5)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(1), MetricTime(6),
}, batch) }, batch)
b.Accept(batch) b.Accept(batch)
} }
@ -223,11 +223,11 @@ func TestBuffer_RejectWithRoom(t *testing.T) {
batch = b.Batch(5) batch = b.Batch(5)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1), MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch) }, batch)
} }
@ -246,11 +246,11 @@ func TestBuffer_RejectNothingNewFull(t *testing.T) {
batch = b.Batch(5) batch = b.Batch(5)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1), MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch) }, batch)
} }
@ -275,11 +275,11 @@ func TestBuffer_RejectNoRoom(t *testing.T) {
batch = b.Batch(5) batch = b.Batch(5)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(8),
MetricTime(7),
MetricTime(6),
MetricTime(5),
MetricTime(4), MetricTime(4),
MetricTime(5),
MetricTime(6),
MetricTime(7),
MetricTime(8),
}, batch) }, batch)
} }
@ -299,11 +299,11 @@ func TestBuffer_RejectRoomExact(t *testing.T) {
batch = b.Batch(5) batch = b.Batch(5)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1), MetricTime(1),
MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
}, batch) }, batch)
} }
@ -324,11 +324,11 @@ func TestBuffer_RejectRoomOverwriteOld(t *testing.T) {
batch = b.Batch(5) batch = b.Batch(5)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2), MetricTime(2),
MetricTime(3),
MetricTime(4),
MetricTime(5),
MetricTime(6),
}, batch) }, batch)
} }
@ -351,11 +351,11 @@ func TestBuffer_RejectPartialRoom(t *testing.T) {
batch = b.Batch(5) batch = b.Batch(5)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(7),
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3), MetricTime(3),
MetricTime(4),
MetricTime(5),
MetricTime(6),
MetricTime(7),
}, batch) }, batch)
} }
@ -394,11 +394,11 @@ func TestBuffer_RejectNewMetricsWrapped(t *testing.T) {
batch = b.Batch(5) batch = b.Batch(5)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11), MetricTime(11),
MetricTime(12),
MetricTime(13),
MetricTime(14),
MetricTime(15),
}, batch) }, batch)
} }
@ -425,11 +425,11 @@ func TestBuffer_RejectWrapped(t *testing.T) {
batch = b.Batch(5) batch = b.Batch(5)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(12),
MetricTime(11),
MetricTime(10),
MetricTime(9),
MetricTime(8), MetricTime(8),
MetricTime(9),
MetricTime(10),
MetricTime(11),
MetricTime(12),
}, batch) }, batch)
} }
@ -467,16 +467,16 @@ func TestBuffer_RejectAdjustFirst(t *testing.T) {
batch = b.Batch(10) batch = b.Batch(10)
testutil.RequireMetricsEqual(t, testutil.RequireMetricsEqual(t,
[]telegraf.Metric{ []telegraf.Metric{
MetricTime(19),
MetricTime(18),
MetricTime(17),
MetricTime(16),
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
MetricTime(10), MetricTime(10),
MetricTime(11),
MetricTime(12),
MetricTime(13),
MetricTime(14),
MetricTime(15),
MetricTime(16),
MetricTime(17),
MetricTime(18),
MetricTime(19),
}, batch) }, batch)
} }

View File

@ -360,7 +360,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) {
// Verify that 10 metrics were written // Verify that 10 metrics were written
assert.Len(t, m.Metrics(), 10) assert.Len(t, m.Metrics(), 10)
// Verify that they are in order // Verify that they are in order
expected := append(reverse(next5), reverse(first5)...) expected := append(first5, next5...)
assert.Equal(t, expected, m.Metrics()) assert.Equal(t, expected, m.Metrics())
} }
@ -421,9 +421,9 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
// Verify that 20 metrics were written // Verify that 20 metrics were written
assert.Len(t, m.Metrics(), 20) assert.Len(t, m.Metrics(), 20)
// Verify that they are in order // Verify that they are in order
expected := append(reverse(next5), reverse(first5)...) expected := append(first5, next5...)
expected = append(expected, reverse(next5)...) expected = append(expected, first5...)
expected = append(expected, reverse(first5)...) expected = append(expected, next5...)
assert.Equal(t, expected, m.Metrics()) assert.Equal(t, expected, m.Metrics())
} }
@ -464,7 +464,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) {
// Verify that 6 metrics were written // Verify that 6 metrics were written
assert.Len(t, m.Metrics(), 6) assert.Len(t, m.Metrics(), 6)
// Verify that they are in order // Verify that they are in order
expected := []telegraf.Metric{next5[0], first5[4], first5[3], first5[2], first5[1], first5[0]} expected := []telegraf.Metric{first5[0], first5[1], first5[2], first5[3], first5[4], next5[0]}
assert.Equal(t, expected, m.Metrics()) assert.Equal(t, expected, m.Metrics())
} }