fix(processors.parser): Drop original tracking metrics (#14655)

This commit is contained in:
Joshua Powers 2024-01-31 04:10:53 -07:00 committed by GitHub
parent 043ae3e8a0
commit 93783f813b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 75 additions and 0 deletions

View File

@ -48,6 +48,8 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
newMetrics := []telegraf.Metric{}
if !p.DropOriginal {
newMetrics = append(newMetrics, metric)
} else {
metric.Drop()
}
// parse fields

View File

@ -1,6 +1,7 @@
package parser
import (
"sync"
"testing"
"time"
@ -811,6 +812,78 @@ func TestBadApply(t *testing.T) {
}
}
func TestTracking(t *testing.T) {
var testCases = []struct {
name string
numMetrics int
parser Parser
payload string
}{
{
name: "keep all",
numMetrics: 2,
parser: Parser{
DropOriginal: false,
ParseFields: []string{"payload"},
parser: &json.Parser{},
},
payload: `{"value": 1}`,
},
{
name: "drop original",
numMetrics: 1,
parser: Parser{
DropOriginal: true,
ParseFields: []string{"payload"},
parser: &json.Parser{},
},
payload: `{"value": 1}`,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
// Create a tracking metric and tap the delivery information
var mu sync.Mutex
delivered := make([]telegraf.DeliveryInfo, 0, 1)
notify := func(di telegraf.DeliveryInfo) {
mu.Lock()
defer mu.Unlock()
delivered = append(delivered, di)
}
// Configure the plugin
plugin := tt.parser
require.NoError(t, plugin.Init())
// Process expected metrics and compare with resulting metrics
testMetric := metric.New(
"test",
map[string]string{},
map[string]interface{}{
"payload": tt.payload,
},
time.Unix(0, 0),
)
input, _ := metric.WithTracking(testMetric, notify)
result := plugin.Apply(input)
// Ensure we get back the correct number of metrics
require.Len(t, result, tt.numMetrics)
for _, m := range result {
m.Accept()
}
// Simulate output acknowledging delivery of metrics and check delivery
require.Eventuallyf(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(delivered) == 1
}, 1*time.Second, 100*time.Millisecond, "original metric not delivered")
})
}
}
// Benchmarks
func getMetricFields(m telegraf.Metric) interface{} {