From 3c3c0d835209e62273824fd7e13c642290416a63 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Tue, 18 Apr 2023 17:46:22 +0200 Subject: [PATCH] fix(processors.lookup): Fix tracking metrics (#13092) --- metric/tracking.go | 5 ++ plugins/processors/lookup/lookup.go | 11 +++- plugins/processors/lookup/lookup_test.go | 64 +++++++++++++++++++ .../testcases/non_existing_tag/expected.out | 3 + .../testcases/non_existing_tag/input.influx | 3 + .../testcases/non_existing_tag/lut.json | 8 +++ .../testcases/non_existing_tag/telegraf.conf | 3 + 7 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 plugins/processors/lookup/testcases/non_existing_tag/expected.out create mode 100644 plugins/processors/lookup/testcases/non_existing_tag/input.influx create mode 100644 plugins/processors/lookup/testcases/non_existing_tag/lut.json create mode 100644 plugins/processors/lookup/testcases/non_existing_tag/telegraf.conf diff --git a/metric/tracking.go b/metric/tracking.go index 15c590274..c7672f30f 100644 --- a/metric/tracking.go +++ b/metric/tracking.go @@ -150,6 +150,11 @@ func (m *trackingMetric) decr() { } } +// Unwrap allows to access the underlying metric directly e.g. for go-templates +func (m *trackingMetric) Unwrap() telegraf.Metric { + return m.Metric +} + type deliveryInfo struct { id telegraf.TrackingID accepted int diff --git a/plugins/processors/lookup/lookup.go b/plugins/processors/lookup/lookup.go index 46a18fd4c..ad9c4eb91 100644 --- a/plugins/processors/lookup/lookup.go +++ b/plugins/processors/lookup/lookup.go @@ -17,6 +17,8 @@ import ( "github.com/influxdata/telegraf/plugins/processors" ) +type unwrappableMetric interface{ Unwrap() telegraf.Metric } + //go:embed sample.conf var sampleConfig string @@ -64,7 +66,14 @@ func (p *Processor) Init() error { func (p *Processor) Apply(in ...telegraf.Metric) []telegraf.Metric { out := make([]telegraf.Metric, 0, len(in)) - for _, m := range in { + for _, raw := range in { + var m telegraf.Metric + if wm, ok := raw.(unwrappableMetric); ok { + m = wm.Unwrap() + } else { + m = raw + } + var buf bytes.Buffer if err := p.tmpl.Execute(&buf, m); err != nil { p.Log.Errorf("generating key failed: %v", err) diff --git a/plugins/processors/lookup/lookup_test.go b/plugins/processors/lookup/lookup_test.go index a3a384cd0..6b654aa96 100644 --- a/plugins/processors/lookup/lookup_test.go +++ b/plugins/processors/lookup/lookup_test.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/testutil" @@ -92,3 +93,66 @@ func TestCases(t *testing.T) { }) } } + +func TestCasesTracking(t *testing.T) { + // Get all directories in testcases + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + // Make sure tests contains data + require.NotEmpty(t, folders) + + // Set up for file inputs + processors.Add("lookup", func() telegraf.Processor { + return &Processor{Log: testutil.Logger{}} + }) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + fname := f.Name() + testdataPath := filepath.Join("testcases", fname) + configFilename := filepath.Join(testdataPath, "telegraf.conf") + inputFilename := filepath.Join(testdataPath, "input.influx") + expectedFilename := filepath.Join(testdataPath, "expected.out") + + t.Run(fname, func(t *testing.T) { + // Get parser to parse input and expected output + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + inputRaw, err := testutil.ParseMetricsFromFile(inputFilename, parser) + require.NoError(t, err) + input := make([]telegraf.Metric, 0, len(inputRaw)) + notify := func(_ telegraf.DeliveryInfo) {} + for _, m := range inputRaw { + tm, _ := metric.WithTracking(m, notify) + input = append(input, tm) + } + + var expected []telegraf.Metric + if _, err := os.Stat(expectedFilename); err == nil { + var err error + expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser) + require.NoError(t, err) + } + + // Configure the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(configFilename)) + require.Len(t, cfg.Processors, 1, "wrong number of processors") + + type unwrappableProcessor interface{ Unwrap() telegraf.Processor } + proc := cfg.Processors[0].Processor.(unwrappableProcessor) + plugin := proc.Unwrap().(*Processor) + require.NoError(t, plugin.Init()) + + // Process expected metrics and compare with resulting metrics + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) + }) + } +} diff --git a/plugins/processors/lookup/testcases/non_existing_tag/expected.out b/plugins/processors/lookup/testcases/non_existing_tag/expected.out new file mode 100644 index 000000000..f080c548a --- /dev/null +++ b/plugins/processors/lookup/testcases/non_existing_tag/expected.out @@ -0,0 +1,3 @@ +test,lutkey=peters_friend,location=home value=42i 1678124473000000123 +other,status=alert value=-1i 1678124473000000575 +test,lutkey=marys_son,location=party value=666i 1678124473000000944 diff --git a/plugins/processors/lookup/testcases/non_existing_tag/input.influx b/plugins/processors/lookup/testcases/non_existing_tag/input.influx new file mode 100644 index 000000000..816eb6882 --- /dev/null +++ b/plugins/processors/lookup/testcases/non_existing_tag/input.influx @@ -0,0 +1,3 @@ +test,lutkey=peters_friend value=42i 1678124473000000123 +other,status=alert value=-1i 1678124473000000575 +test,lutkey=marys_son value=666i 1678124473000000944 diff --git a/plugins/processors/lookup/testcases/non_existing_tag/lut.json b/plugins/processors/lookup/testcases/non_existing_tag/lut.json new file mode 100644 index 000000000..ca0ed2248 --- /dev/null +++ b/plugins/processors/lookup/testcases/non_existing_tag/lut.json @@ -0,0 +1,8 @@ +{ + "peters_friend": { + "location": "home" + }, + "marys_son": { + "location": "party" + } +} \ No newline at end of file diff --git a/plugins/processors/lookup/testcases/non_existing_tag/telegraf.conf b/plugins/processors/lookup/testcases/non_existing_tag/telegraf.conf new file mode 100644 index 000000000..90249194c --- /dev/null +++ b/plugins/processors/lookup/testcases/non_existing_tag/telegraf.conf @@ -0,0 +1,3 @@ +[[processors.lookup]] + files = ["testcases/non_existing_tag/lut.json"] + key = '{{.Tag "lutkey"}}'