From 53298356187e8deebd48aada02fb40459dde0473 Mon Sep 17 00:00:00 2001 From: nagaflokhu <8855309+nagaflokhu@users.noreply.github.com> Date: Thu, 24 Apr 2025 12:10:11 -0400 Subject: [PATCH] fix(agent): Correctly truncate the disk buffer (#16697) Co-authored-by: Sven Rebhan --- models/buffer_disk.go | 33 +++++++++++---------- models/buffer_disk_test.go | 59 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 17 deletions(-) diff --git a/models/buffer_disk.go b/models/buffer_disk.go index 799ac2475..9c891112c 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -146,14 +146,12 @@ func (b *DiskBuffer) BeginTransaction(batchSize int) *Transaction { offsets := make([]int, 0, batchSize) readIndex := b.batchFirst endIndex := b.writeIndex() - offset := 0 - for batchSize > 0 && readIndex < endIndex { + for offset := 0; batchSize > 0 && readIndex < endIndex; offset++ { data, err := b.file.Read(readIndex) if err != nil { panic(err) } readIndex++ - offset++ if slices.Contains(b.mask, offset) { // Metric is masked by a previous write and is scheduled for removal @@ -230,34 +228,35 @@ func (b *DiskBuffer) EndTransaction(tx *Transaction) { } // Determine up to which index we can remove the entries from the WAL file - var removeIdx int + var correction int for i, offset := range b.mask { if offset != i { break } - removeIdx = offset + correction = offset } + // The 'correction' denotes the offset to subtract from the remaining mask + // (if any) and the 'removalIdx' denotes the index to use when truncating + // the file and mask. Keep them separate to be able to handle the special + // "the file cannot be empty" property of the WAL file. + removeIdx := correction + 1 // Remove the metrics in front from the WAL file - b.isEmpty = b.entries()-removeIdx-1 <= 0 + b.isEmpty = b.entries()-removeIdx <= 0 if b.isEmpty { // WAL files cannot be fully empty but need to contain at least one // item to not throw an error - if err := b.file.TruncateFront(b.writeIndex()); err != nil { - log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize) - panic(err) - } - } else { - if err := b.file.TruncateFront(b.batchFirst + uint64(removeIdx+1)); err != nil { - log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize) - panic(err) - } + removeIdx-- + } + if err := b.file.TruncateFront(b.batchFirst + uint64(removeIdx)); err != nil { + log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize) + panic(err) } // Truncate the mask and update the relative offsets - b.mask = b.mask[:removeIdx] + b.mask = b.mask[removeIdx:] for i := range b.mask { - b.mask[i] -= removeIdx + b.mask[i] -= correction } // check if the original end index is still valid, clear if not diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go index 15ff25a73..5e8b8219f 100644 --- a/models/buffer_disk_test.go +++ b/models/buffer_disk_test.go @@ -93,3 +93,62 @@ func TestDiskBufferTrackingDroppedFromOldWal(t *testing.T) { } testutil.RequireMetricsEqual(t, expected, tx.Batch) } + +// TestDiskBufferTruncate is a regression test for +// https://github.com/influxdata/telegraf/issues/16696 +func TestDiskBufferTruncate(t *testing.T) { + // Create a disk buffer + buf, err := NewBuffer("test", "id123", "", 0, "disk", t.TempDir()) + require.NoError(t, err) + defer buf.Close() + diskBuf, ok := buf.(*DiskBuffer) + require.True(t, ok, "buffer is not a disk buffer") + + // Add some metrics to the buffer + expected := make([]telegraf.Metric, 0, 10) + for i := range 10 { + m := metric.New("test", map[string]string{}, map[string]interface{}{"value": i}, time.Now()) + buf.Add(m) + expected = append(expected, m) + } + + // Get a batch, test the metrics and acknowledge all metrics + tx := buf.BeginTransaction(4) + testutil.RequireMetricsEqual(t, expected[:4], tx.Batch) + tx.AcceptAll() + buf.EndTransaction(tx) + + // The buffer must have been truncated on disk and the mask should be empty + require.Equal(t, 6, diskBuf.entries()) + require.Empty(t, diskBuf.mask) + + // Get a second batch, test the metrics and acknowledge all metrics except + // for the first one. + tx = buf.BeginTransaction(4) + testutil.RequireMetricsEqual(t, expected[4:8], tx.Batch) + tx.Accept = []int{1, 2, 3} + buf.EndTransaction(tx) + + // The buffer cannot be truncated on disk as the first metric must be kept. + // However, the mask now must contain the accepted indices... + require.Equal(t, 6, diskBuf.entries()) + require.Equal(t, []int{1, 2, 3}, diskBuf.mask) + + // Get a third batch with all the remaining metrics, test them and + // acknowledge all + tx = buf.BeginTransaction(4) + remaining := append([]telegraf.Metric{expected[4]}, expected[8:]...) + testutil.RequireMetricsEqual(t, remaining, tx.Batch) + tx.AcceptAll() + buf.EndTransaction(tx) + + // The buffer should be truncated completely, however due to the WAL + // implementation the file cannot be completely empty. So we expect one + // entry left on disk but this one being masked... + require.Equal(t, 1, diskBuf.entries()) + require.Equal(t, []int{0}, diskBuf.mask) + + // We shouldn't get any metric when requesting a new batch + tx = buf.BeginTransaction(4) + require.Empty(t, tx.Batch) +}