diff --git a/plugins/processors/converter/converter_test.go b/plugins/processors/converter/converter_test.go index e8376deb9..1c8a88679 100644 --- a/plugins/processors/converter/converter_test.go +++ b/plugins/processors/converter/converter_test.go @@ -2,12 +2,14 @@ package converter import ( "math" + "sync" "testing" "time" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) @@ -667,14 +669,13 @@ func TestConverter(t *testing.T) { }, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.converter.Log = testutil.Logger{} + require.NoError(t, tt.converter.Init()) - err := tt.converter.Init() - require.NoError(t, err) actual := tt.converter.Apply(tt.input) - testutil.RequireMetricsEqual(t, tt.expected, actual) }) } @@ -773,11 +774,9 @@ func TestMeasurement(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.converter.Log = testutil.Logger{} - err := tt.converter.Init() - require.NoError(t, err) + require.NoError(t, tt.converter.Init()) actual := tt.converter.Apply(tt.input) - testutil.RequireMetricsEqual(t, tt.expected, actual) }) } @@ -787,6 +786,56 @@ func TestEmptyConfigInitError(t *testing.T) { converter := &Converter{ Log: testutil.Logger{}, } - err := converter.Init() - require.Error(t, err) + require.Error(t, converter.Init()) +} + +func TestTracking(t *testing.T) { + inputRaw := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "topic": "telegraf"}, time.Unix(0, 0)), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "topic": "telegraf"}, time.Unix(0, 0)), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "topic": "telegraf"}, time.Unix(0, 0)), + } + + var mu sync.Mutex + delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw)) + notify := func(di telegraf.DeliveryInfo) { + mu.Lock() + defer mu.Unlock() + delivered = append(delivered, di) + } + + input := make([]telegraf.Metric, 0, len(inputRaw)) + for _, m := range inputRaw { + tm, _ := metric.WithTracking(m, notify) + input = append(input, tm) + } + + expected := []telegraf.Metric{ + metric.New("telegraf", map[string]string{}, map[string]interface{}{"value": 42}, time.Unix(0, 0)), + metric.New("telegraf", map[string]string{}, map[string]interface{}{"value": 42}, time.Unix(0, 0)), + metric.New("telegraf", map[string]string{}, map[string]interface{}{"value": 42}, time.Unix(0, 0)), + } + + plugin := &Converter{ + Fields: &Conversion{ + Measurement: []string{"topic"}, + }, + } + require.NoError(t, plugin.Init()) + + // Process expected metrics and compare with resulting metrics + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) + + // Simulate output acknowledging delivery + for _, m := range actual { + m.Accept() + } + + // Check delivery + require.Eventuallyf(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(expected) == len(delivered) + }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected)) }