diff --git a/plugins/processors/parser/parser.go b/plugins/processors/parser/parser.go index 578b1c762..350b4bcdb 100644 --- a/plugins/processors/parser/parser.go +++ b/plugins/processors/parser/parser.go @@ -48,6 +48,8 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric { newMetrics := []telegraf.Metric{} if !p.DropOriginal { newMetrics = append(newMetrics, metric) + } else { + metric.Drop() } // parse fields diff --git a/plugins/processors/parser/parser_test.go b/plugins/processors/parser/parser_test.go index 8df011234..b662224f8 100644 --- a/plugins/processors/parser/parser_test.go +++ b/plugins/processors/parser/parser_test.go @@ -1,6 +1,7 @@ package parser import ( + "sync" "testing" "time" @@ -811,6 +812,78 @@ func TestBadApply(t *testing.T) { } } +func TestTracking(t *testing.T) { + var testCases = []struct { + name string + numMetrics int + parser Parser + payload string + }{ + { + name: "keep all", + numMetrics: 2, + parser: Parser{ + DropOriginal: false, + ParseFields: []string{"payload"}, + parser: &json.Parser{}, + }, + payload: `{"value": 1}`, + }, + { + name: "drop original", + numMetrics: 1, + parser: Parser{ + DropOriginal: true, + ParseFields: []string{"payload"}, + parser: &json.Parser{}, + }, + payload: `{"value": 1}`, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + // Create a tracking metric and tap the delivery information + var mu sync.Mutex + delivered := make([]telegraf.DeliveryInfo, 0, 1) + notify := func(di telegraf.DeliveryInfo) { + mu.Lock() + defer mu.Unlock() + delivered = append(delivered, di) + } + + // Configure the plugin + plugin := tt.parser + require.NoError(t, plugin.Init()) + + // Process expected metrics and compare with resulting metrics + testMetric := metric.New( + "test", + map[string]string{}, + map[string]interface{}{ + "payload": tt.payload, + }, + time.Unix(0, 0), + ) + + input, _ := metric.WithTracking(testMetric, notify) + result := plugin.Apply(input) + + // Ensure we get back the correct number of metrics + require.Len(t, result, tt.numMetrics) + for _, m := range result { + m.Accept() + } + + // Simulate output acknowledging delivery of metrics and check delivery + require.Eventuallyf(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(delivered) == 1 + }, 1*time.Second, 100*time.Millisecond, "original metric not delivered") + }) + } +} + // Benchmarks func getMetricFields(m telegraf.Metric) interface{} {