diff --git a/plugins/processors/split/split_test.go b/plugins/processors/split/split_test.go index de95be811..698eedd5b 100644 --- a/plugins/processors/split/split_test.go +++ b/plugins/processors/split/split_test.go @@ -3,14 +3,18 @@ package split import ( "os" "path/filepath" + "sync" "testing" + "time" + + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) func TestCases(t *testing.T) { @@ -65,3 +69,87 @@ func TestCases(t *testing.T) { }) } } + +func TestTrackingMetrics(t *testing.T) { + type testcase struct { + name string + dropOriginal bool + input []telegraf.Metric + expected []telegraf.Metric + } + testcases := []testcase{ + { + name: "keep all", + dropOriginal: false, + input: []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, time.Unix(0, 0)), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 99}, time.Unix(0, 0)), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 1}, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, time.Unix(0, 0)), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 99}, time.Unix(0, 0)), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 1}, time.Unix(0, 0)), + metric.New("new", map[string]string{}, map[string]interface{}{"value": 42}, time.Unix(0, 0)), + metric.New("new", map[string]string{}, map[string]interface{}{"value": 99}, time.Unix(0, 0)), + metric.New("new", map[string]string{}, map[string]interface{}{"value": 1}, time.Unix(0, 0)), + }, + }, + { + name: "drop original", + dropOriginal: true, + input: []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, time.Unix(0, 0)), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 99}, time.Unix(0, 0)), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 1}, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + metric.New("new", map[string]string{}, map[string]interface{}{"value": 42}, time.Unix(0, 0)), + metric.New("new", map[string]string{}, map[string]interface{}{"value": 99}, time.Unix(0, 0)), + metric.New("new", map[string]string{}, map[string]interface{}{"value": 1}, time.Unix(0, 0)), + }, + }, + } + for _, tc := range testcases { + var mu sync.Mutex + delivered := make([]telegraf.DeliveryInfo, 0, len(tc.input)) + notify := func(di telegraf.DeliveryInfo) { + mu.Lock() + defer mu.Unlock() + delivered = append(delivered, di) + } + + input := make([]telegraf.Metric, 0, len(tc.input)) + for _, m := range tc.input { + tm, _ := metric.WithTracking(m, notify) + input = append(input, tm) + } + + plugin := &Split{ + DropOriginal: tc.dropOriginal, + Templates: []template{ + { + Name: "new", + Fields: []string{"value"}, + }, + }, + } + require.NoError(t, plugin.Init()) + + // Process expected metrics and compare with resulting metrics + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, tc.expected, actual, testutil.SortMetrics()) + + // Simulate output acknowledging delivery + for _, m := range actual { + m.Accept() + } + + // Check delivery + require.Eventuallyf(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(tc.input) == len(delivered) + }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(tc.input)) + } +}