fix(processors.starlark): Maintain tracking information post-apply (#14137)

This commit is contained in:
Joshua Powers 2023-11-15 11:03:55 -07:00 committed by GitHub
parent 4581186943
commit edf230bc44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 13 deletions

View File

@ -12,6 +12,8 @@ import (
)
type Metric struct {
ID uint64
metric telegraf.Metric
tagIterCount int
fieldIterCount int
@ -20,6 +22,7 @@ type Metric struct {
// Wrap updates the starlark.Metric to wrap a new telegraf.Metric.
func (m *Metric) Wrap(metric telegraf.Metric) {
m.ID = metric.HashID()
m.metric = metric
m.tagIterCount = 0
m.fieldIterCount = 0

View File

@ -47,12 +47,12 @@ func (s *Starlark) Start(_ telegraf.Accumulator) error {
return nil
}
func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) error {
parameters, found := s.GetParameters("apply")
if !found {
return fmt.Errorf("the parameters of the apply function could not be found")
}
parameters[0].(*common.Metric).Wrap(metric)
parameters[0].(*common.Metric).Wrap(origMetric)
rv, err := s.Call("apply")
if err != nil {
@ -65,6 +65,7 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
iter := rv.Iterate()
defer iter.Done()
var v starlark.Value
var origFound bool
for iter.Next(&v) {
switch v := v.(type) {
case *common.Metric:
@ -73,6 +74,17 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
s.Log.Errorf("Duplicate metric reference detected")
continue
}
// Previous metric was found, accept the starlark metric, add
// the original metric to the accumulator
if v.ID == origMetric.HashID() {
origFound = true
m.Accept()
s.results = append(s.results, origMetric)
acc.AddMetric(origMetric)
continue
}
s.results = append(s.results, m)
acc.AddMetric(m)
default:
@ -82,8 +94,8 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
// If the script didn't return the original metrics, mark it as
// successfully handled.
if !containsMetric(s.results, metric) {
metric.Accept()
if !origFound {
origMetric.Drop()
}
// clear results
@ -93,15 +105,17 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
s.results = s.results[:0]
case *common.Metric:
m := rv.Unwrap()
// If the script returned a different metric, mark this metric as
// successfully handled.
if m != metric {
metric.Accept()
// If we got the original metric back, use that and drop the new one.
// Otherwise mark the original as accepted and use the new metric.
if origMetric.HashID() == rv.ID {
m.Accept()
acc.AddMetric(origMetric)
} else {
origMetric.Accept()
acc.AddMetric(m)
}
acc.AddMetric(m)
case starlark.NoneType:
metric.Drop()
origMetric.Drop()
default:
return fmt.Errorf("invalid type returned: %T", rv)
}
@ -111,9 +125,9 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
func (s *Starlark) Stop() {
}
func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool {
func containsMetric(metrics []telegraf.Metric, target telegraf.Metric) bool {
for _, m := range metrics {
if m == metric {
if m == target {
return true
}
}

View File

@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
@ -16,6 +17,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
common "github.com/influxdata/telegraf/plugins/common/starlark"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
@ -3332,6 +3334,113 @@ func TestAllScriptTestData(t *testing.T) {
}
}
func TestTracking(t *testing.T) {
var testCases = []struct {
name string
source string
numMetrics int
}{
{
name: "return none",
numMetrics: 0,
source: `
def apply(metric):
return None
`,
},
{
name: "return empty list of metrics",
numMetrics: 0,
source: `
def apply(metric):
return []
`,
},
{
name: "return original metric",
numMetrics: 1,
source: `
def apply(metric):
return metric
`,
},
{
name: "return original metric in a list",
numMetrics: 1,
source: `
def apply(metric):
return [metric]
`,
},
{
name: "return new metric",
numMetrics: 1,
source: `
def apply(metric):
newmetric = Metric("new_metric")
newmetric.fields["vaue"] = 42
return newmetric
`,
},
{
name: "return new metric in a list",
numMetrics: 1,
source: `
def apply(metric):
newmetric = Metric("new_metric")
newmetric.fields["vaue"] = 42
return [newmetric]
`,
},
{
name: "return original and new metric in a list",
numMetrics: 2,
source: `
def apply(metric):
newmetric = Metric("new_metric")
newmetric.fields["vaue"] = 42
return [metric, newmetric]
`,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
// Create a tracking metric and tap the delivery information
var mu sync.Mutex
delivered := make([]telegraf.DeliveryInfo, 0, 1)
notify := func(di telegraf.DeliveryInfo) {
mu.Lock()
defer mu.Unlock()
delivered = append(delivered, di)
}
// Configure the plugin
plugin := newStarlarkFromSource(tt.source)
require.NoError(t, plugin.Init())
acc := &testutil.Accumulator{}
require.NoError(t, plugin.Start(acc))
// Process expected metrics and compare with resulting metrics
input, _ := metric.WithTracking(testutil.TestMetric(1.23), notify)
require.NoError(t, plugin.Add(input, acc))
plugin.Stop()
// Ensure we get back the correct number of metrics
require.Len(t, acc.GetTelegrafMetrics(), tt.numMetrics)
for _, m := range acc.GetTelegrafMetrics() {
m.Accept()
}
// Simulate output acknowledging delivery of metrics and check delivery
require.Eventuallyf(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(delivered) == 1
}, 1*time.Second, 100*time.Millisecond, "orignal metric not delivered")
})
}
}
// parses metric lines out of line protocol following a header, with a trailing blank line
func parseMetricsFrom(t *testing.T, lines []string, header string) (metrics []telegraf.Metric) {
parser := &influx.Parser{}