fix(agent): Correctly truncate the disk buffer (#16697)

Co-authored-by: Sven Rebhan <srebhan@influxdata.com>
This commit is contained in:
nagaflokhu 2025-04-24 12:10:11 -04:00 committed by GitHub
parent 16610ce351
commit 5329835618
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 75 additions and 17 deletions

View File

@ -146,14 +146,12 @@ func (b *DiskBuffer) BeginTransaction(batchSize int) *Transaction {
offsets := make([]int, 0, batchSize) offsets := make([]int, 0, batchSize)
readIndex := b.batchFirst readIndex := b.batchFirst
endIndex := b.writeIndex() endIndex := b.writeIndex()
offset := 0 for offset := 0; batchSize > 0 && readIndex < endIndex; offset++ {
for batchSize > 0 && readIndex < endIndex {
data, err := b.file.Read(readIndex) data, err := b.file.Read(readIndex)
if err != nil { if err != nil {
panic(err) panic(err)
} }
readIndex++ readIndex++
offset++
if slices.Contains(b.mask, offset) { if slices.Contains(b.mask, offset) {
// Metric is masked by a previous write and is scheduled for removal // Metric is masked by a previous write and is scheduled for removal
@ -230,34 +228,35 @@ func (b *DiskBuffer) EndTransaction(tx *Transaction) {
} }
// Determine up to which index we can remove the entries from the WAL file // Determine up to which index we can remove the entries from the WAL file
var removeIdx int var correction int
for i, offset := range b.mask { for i, offset := range b.mask {
if offset != i { if offset != i {
break break
} }
removeIdx = offset correction = offset
} }
// The 'correction' denotes the offset to subtract from the remaining mask
// (if any) and the 'removalIdx' denotes the index to use when truncating
// the file and mask. Keep them separate to be able to handle the special
// "the file cannot be empty" property of the WAL file.
removeIdx := correction + 1
// Remove the metrics in front from the WAL file // Remove the metrics in front from the WAL file
b.isEmpty = b.entries()-removeIdx-1 <= 0 b.isEmpty = b.entries()-removeIdx <= 0
if b.isEmpty { if b.isEmpty {
// WAL files cannot be fully empty but need to contain at least one // WAL files cannot be fully empty but need to contain at least one
// item to not throw an error // item to not throw an error
if err := b.file.TruncateFront(b.writeIndex()); err != nil { removeIdx--
}
if err := b.file.TruncateFront(b.batchFirst + uint64(removeIdx)); err != nil {
log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize) log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize)
panic(err) panic(err)
} }
} else {
if err := b.file.TruncateFront(b.batchFirst + uint64(removeIdx+1)); err != nil {
log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize)
panic(err)
}
}
// Truncate the mask and update the relative offsets // Truncate the mask and update the relative offsets
b.mask = b.mask[:removeIdx] b.mask = b.mask[removeIdx:]
for i := range b.mask { for i := range b.mask {
b.mask[i] -= removeIdx b.mask[i] -= correction
} }
// check if the original end index is still valid, clear if not // check if the original end index is still valid, clear if not

View File

@ -93,3 +93,62 @@ func TestDiskBufferTrackingDroppedFromOldWal(t *testing.T) {
} }
testutil.RequireMetricsEqual(t, expected, tx.Batch) testutil.RequireMetricsEqual(t, expected, tx.Batch)
} }
// TestDiskBufferTruncate is a regression test for
// https://github.com/influxdata/telegraf/issues/16696
func TestDiskBufferTruncate(t *testing.T) {
// Create a disk buffer
buf, err := NewBuffer("test", "id123", "", 0, "disk", t.TempDir())
require.NoError(t, err)
defer buf.Close()
diskBuf, ok := buf.(*DiskBuffer)
require.True(t, ok, "buffer is not a disk buffer")
// Add some metrics to the buffer
expected := make([]telegraf.Metric, 0, 10)
for i := range 10 {
m := metric.New("test", map[string]string{}, map[string]interface{}{"value": i}, time.Now())
buf.Add(m)
expected = append(expected, m)
}
// Get a batch, test the metrics and acknowledge all metrics
tx := buf.BeginTransaction(4)
testutil.RequireMetricsEqual(t, expected[:4], tx.Batch)
tx.AcceptAll()
buf.EndTransaction(tx)
// The buffer must have been truncated on disk and the mask should be empty
require.Equal(t, 6, diskBuf.entries())
require.Empty(t, diskBuf.mask)
// Get a second batch, test the metrics and acknowledge all metrics except
// for the first one.
tx = buf.BeginTransaction(4)
testutil.RequireMetricsEqual(t, expected[4:8], tx.Batch)
tx.Accept = []int{1, 2, 3}
buf.EndTransaction(tx)
// The buffer cannot be truncated on disk as the first metric must be kept.
// However, the mask now must contain the accepted indices...
require.Equal(t, 6, diskBuf.entries())
require.Equal(t, []int{1, 2, 3}, diskBuf.mask)
// Get a third batch with all the remaining metrics, test them and
// acknowledge all
tx = buf.BeginTransaction(4)
remaining := append([]telegraf.Metric{expected[4]}, expected[8:]...)
testutil.RequireMetricsEqual(t, remaining, tx.Batch)
tx.AcceptAll()
buf.EndTransaction(tx)
// The buffer should be truncated completely, however due to the WAL
// implementation the file cannot be completely empty. So we expect one
// entry left on disk but this one being masked...
require.Equal(t, 1, diskBuf.entries())
require.Equal(t, []int{0}, diskBuf.mask)
// We shouldn't get any metric when requesting a new batch
tx = buf.BeginTransaction(4)
require.Empty(t, tx.Batch)
}