fix(agent): Fix buffer not flushing if all metrics are written (#15969)

This commit is contained in:
Dane Strandboge 2024-10-07 11:32:28 -05:00 committed by GitHub
parent 52e4c79ac6
commit 7435208b90
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 53 additions and 9 deletions

View File

@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"os"
"path/filepath" "path/filepath"
"sync" "sync"
@ -27,6 +26,11 @@ type DiskBuffer struct {
// Ending point of metrics read from disk on telegraf launch. // Ending point of metrics read from disk on telegraf launch.
// Used to know whether to discard tracking metrics. // Used to know whether to discard tracking metrics.
originalEnd uint64 originalEnd uint64
// The WAL library currently has no way to "fully empty" the walfile. In this case,
// we have to do our best and track that the walfile "should" be empty, so that next
// write, we can remove the invalid entry (also skipping this entry if it is being read).
isEmpty bool
} }
func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) { func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) {
@ -53,6 +57,9 @@ func (b *DiskBuffer) Len() int {
} }
func (b *DiskBuffer) length() int { func (b *DiskBuffer) length() int {
if b.isEmpty {
return 0
}
// Special case for when the read index is zero, it must be empty (otherwise it would be >= 1) // Special case for when the read index is zero, it must be empty (otherwise it would be >= 1)
if b.readIndex() == 0 { if b.readIndex() == 0 {
return 0 return 0
@ -87,6 +94,8 @@ func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int {
if !b.addSingleMetric(m) { if !b.addSingleMetric(m) {
dropped++ dropped++
} }
// as soon as a new metric is added, if this was empty, try to flush the "empty" metric out
b.handleEmptyFile()
} }
b.BufferSize.Set(int64(b.length())) b.BufferSize.Set(int64(b.length()))
return dropped return dropped
@ -169,7 +178,7 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
b.metricWritten(m) b.metricWritten(m)
} }
if b.length() == len(batch) { if b.length() == len(batch) {
b.resetWalFile() b.emptyFile()
} else { } else {
err := b.file.TruncateFront(b.batchFirst + uint64(len(batch))) err := b.file.TruncateFront(b.batchFirst + uint64(len(batch)))
if err != nil { if err != nil {
@ -205,15 +214,27 @@ func (b *DiskBuffer) resetBatch() {
} }
// This is very messy and not ideal, but serves as the only way I can find currently // This is very messy and not ideal, but serves as the only way I can find currently
// to actually clear the walfile completely if needed, since Truncate() calls require // to actually treat the walfile as empty if needed, since Truncate() calls require
// that at least one entry remains in them otherwise they return an error. // that at least one entry remains in them otherwise they return an error.
// Related issue: https://github.com/tidwall/wal/issues/20 // Related issue: https://github.com/tidwall/wal/issues/20
func (b *DiskBuffer) resetWalFile() { func (b *DiskBuffer) handleEmptyFile() {
b.file.Close() if !b.isEmpty {
os.Remove(b.path) return
walFile, err := wal.Open(b.path, nil) }
if err != nil { if err := b.file.TruncateFront(b.readIndex() + 1); err != nil {
log.Printf("E! readIndex: %d, buffer len: %d", b.readIndex(), b.length())
panic(err) panic(err)
} }
b.file = walFile b.isEmpty = false
}
func (b *DiskBuffer) emptyFile() {
if b.isEmpty || b.length() == 0 {
return
}
if err := b.file.TruncateFront(b.writeIndex() - 1); err != nil {
log.Printf("E! writeIndex: %d, buffer len: %d", b.writeIndex(), b.length())
panic(err)
}
b.isEmpty = true
} }

View File

@ -809,3 +809,26 @@ func (s *BufferSuiteTest) TestBuffer_RejectEmptyBatch() {
s.NotNil(m) s.NotNil(m)
} }
} }
func (s *BufferSuiteTest) TestBuffer_FlushedPartial() {
b := s.newTestBuffer(5)
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
s.Len(batch, 2)
b.Accept(batch)
s.Equal(1, b.Len())
}
func (s *BufferSuiteTest) TestBuffer_FlushedFull() {
b := s.newTestBuffer(5)
b.Add(MetricTime(1))
b.Add(MetricTime(2))
batch := b.Batch(2)
s.Len(batch, 2)
b.Accept(batch)
s.Equal(0, b.Len())
}