2024-07-18 02:42:45 +08:00
|
|
|
package models
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"path/filepath"
|
|
|
|
|
"testing"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
|
"github.com/tidwall/wal"
|
|
|
|
|
|
|
|
|
|
"github.com/influxdata/telegraf"
|
|
|
|
|
"github.com/influxdata/telegraf/metric"
|
|
|
|
|
"github.com/influxdata/telegraf/testutil"
|
|
|
|
|
)
|
|
|
|
|
|
2024-11-05 22:27:40 +08:00
|
|
|
func TestDiskBufferRetainsTrackingInformation(t *testing.T) {
|
|
|
|
|
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
|
|
|
|
|
|
|
|
|
|
var delivered int
|
|
|
|
|
mm, _ := metric.WithTracking(m, func(telegraf.DeliveryInfo) { delivered++ })
|
2024-07-18 02:42:45 +08:00
|
|
|
|
2024-11-05 22:27:40 +08:00
|
|
|
buf, err := NewBuffer("test", "123", "", 0, "disk", t.TempDir())
|
2024-07-18 02:42:45 +08:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
buf.Stats().MetricsAdded.Set(0)
|
|
|
|
|
buf.Stats().MetricsWritten.Set(0)
|
|
|
|
|
buf.Stats().MetricsDropped.Set(0)
|
2024-11-05 22:27:40 +08:00
|
|
|
defer buf.Close()
|
2024-07-18 02:42:45 +08:00
|
|
|
|
2024-11-05 22:27:40 +08:00
|
|
|
buf.Add(mm)
|
2024-12-05 04:55:11 +08:00
|
|
|
tx := buf.BeginTransaction(1)
|
|
|
|
|
tx.AcceptAll()
|
|
|
|
|
buf.EndTransaction(tx)
|
2024-07-18 02:42:45 +08:00
|
|
|
require.Equal(t, 1, delivered)
|
|
|
|
|
}
|
|
|
|
|
|
2024-11-05 22:27:40 +08:00
|
|
|
func TestDiskBufferTrackingDroppedFromOldWal(t *testing.T) {
|
|
|
|
|
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
|
2024-07-18 02:42:45 +08:00
|
|
|
|
2024-11-05 22:27:40 +08:00
|
|
|
tm, _ := metric.WithTracking(m, func(telegraf.DeliveryInfo) {})
|
2024-07-18 02:42:45 +08:00
|
|
|
metrics := []telegraf.Metric{
|
|
|
|
|
// Basic metric with 1 field, 0 timestamp
|
2024-11-05 22:27:40 +08:00
|
|
|
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
|
2024-07-18 02:42:45 +08:00
|
|
|
// Basic metric with 1 field, different timestamp
|
2024-11-05 22:27:40 +08:00
|
|
|
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 20.0}, time.Now()),
|
2024-07-18 02:42:45 +08:00
|
|
|
// Metric with a field
|
2024-11-05 22:27:40 +08:00
|
|
|
metric.New("cpu", map[string]string{"x": "y"}, map[string]interface{}{"value": 18.0}, time.Now()),
|
2024-07-18 02:42:45 +08:00
|
|
|
// Tracking metric
|
|
|
|
|
tm,
|
|
|
|
|
// Metric with lots of tag types
|
|
|
|
|
metric.New(
|
|
|
|
|
"cpu",
|
|
|
|
|
map[string]string{},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"value_f64": 20.0,
|
|
|
|
|
"value_uint64": uint64(10),
|
|
|
|
|
"value_int16": int16(5),
|
|
|
|
|
"value_string": "foo",
|
|
|
|
|
"value_boolean": true,
|
|
|
|
|
"value_byte_array": []byte{1, 2, 3, 4, 5},
|
|
|
|
|
},
|
|
|
|
|
time.Now(),
|
|
|
|
|
),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// call manually so that we can properly use metric.ToBytes() without having initialized a buffer
|
|
|
|
|
registerGob()
|
|
|
|
|
|
2024-11-05 22:27:40 +08:00
|
|
|
// Prefill the WAL file
|
|
|
|
|
path := t.TempDir()
|
|
|
|
|
walfile, err := wal.Open(filepath.Join(path, "123"), nil)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
defer walfile.Close()
|
2024-07-18 02:42:45 +08:00
|
|
|
for i, m := range metrics {
|
|
|
|
|
data, err := metric.ToBytes(m)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.NoError(t, walfile.Write(uint64(i+1), data))
|
|
|
|
|
}
|
2024-11-05 22:27:40 +08:00
|
|
|
walfile.Close()
|
|
|
|
|
|
|
|
|
|
// Create a buffer
|
|
|
|
|
buf, err := NewBuffer("123", "123", "", 0, "disk", path)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
buf.Stats().MetricsAdded.Set(0)
|
|
|
|
|
buf.Stats().MetricsWritten.Set(0)
|
|
|
|
|
buf.Stats().MetricsDropped.Set(0)
|
|
|
|
|
defer buf.Close()
|
|
|
|
|
|
2024-12-05 04:55:11 +08:00
|
|
|
tx := buf.BeginTransaction(4)
|
2024-07-18 02:42:45 +08:00
|
|
|
|
2024-11-05 22:27:40 +08:00
|
|
|
// Check that the tracking metric is skipped
|
2024-07-18 02:42:45 +08:00
|
|
|
expected := []telegraf.Metric{
|
|
|
|
|
metrics[0], metrics[1], metrics[2], metrics[4],
|
|
|
|
|
}
|
2024-12-05 04:55:11 +08:00
|
|
|
testutil.RequireMetricsEqual(t, expected, tx.Batch)
|
2024-07-18 02:42:45 +08:00
|
|
|
}
|