From edf230bc4456d678e3b71ad48bfeeea772ad99b5 Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Wed, 15 Nov 2023 11:03:55 -0700 Subject: [PATCH] fix(processors.starlark): Maintain tracking information post-apply (#14137) --- plugins/common/starlark/metric.go | 3 + plugins/processors/starlark/starlark.go | 40 ++++--- plugins/processors/starlark/starlark_test.go | 109 +++++++++++++++++++ 3 files changed, 139 insertions(+), 13 deletions(-) diff --git a/plugins/common/starlark/metric.go b/plugins/common/starlark/metric.go index f1632312c..02e4e0e70 100644 --- a/plugins/common/starlark/metric.go +++ b/plugins/common/starlark/metric.go @@ -12,6 +12,8 @@ import ( ) type Metric struct { + ID uint64 + metric telegraf.Metric tagIterCount int fieldIterCount int @@ -20,6 +22,7 @@ type Metric struct { // Wrap updates the starlark.Metric to wrap a new telegraf.Metric. func (m *Metric) Wrap(metric telegraf.Metric) { + m.ID = metric.HashID() m.metric = metric m.tagIterCount = 0 m.fieldIterCount = 0 diff --git a/plugins/processors/starlark/starlark.go b/plugins/processors/starlark/starlark.go index 2d7e9e655..3e3cb1e63 100644 --- a/plugins/processors/starlark/starlark.go +++ b/plugins/processors/starlark/starlark.go @@ -47,12 +47,12 @@ func (s *Starlark) Start(_ telegraf.Accumulator) error { return nil } -func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { +func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) error { parameters, found := s.GetParameters("apply") if !found { return fmt.Errorf("the parameters of the apply function could not be found") } - parameters[0].(*common.Metric).Wrap(metric) + parameters[0].(*common.Metric).Wrap(origMetric) rv, err := s.Call("apply") if err != nil { @@ -65,6 +65,7 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { iter := rv.Iterate() defer iter.Done() var v starlark.Value + var origFound bool for iter.Next(&v) { switch v := v.(type) { case *common.Metric: @@ -73,6 +74,17 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { s.Log.Errorf("Duplicate metric reference detected") continue } + + // Previous metric was found, accept the starlark metric, add + // the original metric to the accumulator + if v.ID == origMetric.HashID() { + origFound = true + m.Accept() + s.results = append(s.results, origMetric) + acc.AddMetric(origMetric) + continue + } + s.results = append(s.results, m) acc.AddMetric(m) default: @@ -82,8 +94,8 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { // If the script didn't return the original metrics, mark it as // successfully handled. - if !containsMetric(s.results, metric) { - metric.Accept() + if !origFound { + origMetric.Drop() } // clear results @@ -93,15 +105,17 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { s.results = s.results[:0] case *common.Metric: m := rv.Unwrap() - - // If the script returned a different metric, mark this metric as - // successfully handled. - if m != metric { - metric.Accept() + // If we got the original metric back, use that and drop the new one. + // Otherwise mark the original as accepted and use the new metric. + if origMetric.HashID() == rv.ID { + m.Accept() + acc.AddMetric(origMetric) + } else { + origMetric.Accept() + acc.AddMetric(m) } - acc.AddMetric(m) case starlark.NoneType: - metric.Drop() + origMetric.Drop() default: return fmt.Errorf("invalid type returned: %T", rv) } @@ -111,9 +125,9 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { func (s *Starlark) Stop() { } -func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool { +func containsMetric(metrics []telegraf.Metric, target telegraf.Metric) bool { for _, m := range metrics { - if m == metric { + if m == target { return true } } diff --git a/plugins/processors/starlark/starlark_test.go b/plugins/processors/starlark/starlark_test.go index 5b30daea5..310da2d6e 100644 --- a/plugins/processors/starlark/starlark_test.go +++ b/plugins/processors/starlark/starlark_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -16,6 +17,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" common "github.com/influxdata/telegraf/plugins/common/starlark" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" @@ -3332,6 +3334,113 @@ func TestAllScriptTestData(t *testing.T) { } } +func TestTracking(t *testing.T) { + var testCases = []struct { + name string + source string + numMetrics int + }{ + { + name: "return none", + numMetrics: 0, + source: ` +def apply(metric): + return None +`, + }, + { + name: "return empty list of metrics", + numMetrics: 0, + source: ` +def apply(metric): + return [] +`, + }, + { + name: "return original metric", + numMetrics: 1, + source: ` +def apply(metric): + return metric +`, + }, + { + name: "return original metric in a list", + numMetrics: 1, + source: ` +def apply(metric): + return [metric] +`, + }, + { + name: "return new metric", + numMetrics: 1, + source: ` +def apply(metric): + newmetric = Metric("new_metric") + newmetric.fields["vaue"] = 42 + return newmetric +`, + }, + { + name: "return new metric in a list", + numMetrics: 1, + source: ` +def apply(metric): + newmetric = Metric("new_metric") + newmetric.fields["vaue"] = 42 + return [newmetric] +`, + }, + { + name: "return original and new metric in a list", + numMetrics: 2, + source: ` +def apply(metric): + newmetric = Metric("new_metric") + newmetric.fields["vaue"] = 42 + return [metric, newmetric] +`, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + // Create a tracking metric and tap the delivery information + var mu sync.Mutex + delivered := make([]telegraf.DeliveryInfo, 0, 1) + notify := func(di telegraf.DeliveryInfo) { + mu.Lock() + defer mu.Unlock() + delivered = append(delivered, di) + } + + // Configure the plugin + plugin := newStarlarkFromSource(tt.source) + require.NoError(t, plugin.Init()) + acc := &testutil.Accumulator{} + require.NoError(t, plugin.Start(acc)) + + // Process expected metrics and compare with resulting metrics + input, _ := metric.WithTracking(testutil.TestMetric(1.23), notify) + require.NoError(t, plugin.Add(input, acc)) + plugin.Stop() + + // Ensure we get back the correct number of metrics + require.Len(t, acc.GetTelegrafMetrics(), tt.numMetrics) + for _, m := range acc.GetTelegrafMetrics() { + m.Accept() + } + + // Simulate output acknowledging delivery of metrics and check delivery + require.Eventuallyf(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(delivered) == 1 + }, 1*time.Second, 100*time.Millisecond, "orignal metric not delivered") + }) + } +} + // parses metric lines out of line protocol following a header, with a trailing blank line func parseMetricsFrom(t *testing.T, lines []string, header string) (metrics []telegraf.Metric) { parser := &influx.Parser{}