fix(processors.starlark): Use tracking ID to identify tracking metrics (#14523)

This commit is contained in:
Sven Rebhan 2024-01-05 15:48:28 +01:00 committed by GitHub
parent 1410ea649b
commit cbaca43e36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 106 additions and 7 deletions

View File

@ -12,8 +12,7 @@ import (
)
type Metric struct {
ID uint64
ID telegraf.TrackingID
metric telegraf.Metric
tagIterCount int
fieldIterCount int
@ -22,7 +21,9 @@ type Metric struct {
// Wrap updates the starlark.Metric to wrap a new telegraf.Metric.
func (m *Metric) Wrap(metric telegraf.Metric) {
m.ID = metric.HashID()
if tm, ok := metric.(telegraf.TrackingMetric); ok {
m.ID = tm.TrackingID()
}
m.metric = metric
m.tagIterCount = 0
m.fieldIterCount = 0
@ -49,6 +50,9 @@ func (m *Metric) String() string {
buf.WriteString(", time=")
buf.WriteString(m.Time().String())
buf.WriteString(")")
if m.ID != 0 {
fmt.Fprintf(buf, "[tracking ID=%v]", m.ID)
}
return buf.String()
}

View File

@ -54,13 +54,13 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err
}
parameters[0].(*common.Metric).Wrap(origMetric)
rv, err := s.Call("apply")
returnValue, err := s.Call("apply")
if err != nil {
s.LogError(err)
return err
}
switch rv := rv.(type) {
switch rv := returnValue.(type) {
case *starlark.List:
iter := rv.Iterate()
defer iter.Done()
@ -77,7 +77,7 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err
// Previous metric was found, accept the starlark metric, add
// the original metric to the accumulator
if v.ID == origMetric.HashID() {
if v.ID != 0 {
origFound = true
s.results = append(s.results, origMetric)
acc.AddMetric(origMetric)
@ -106,7 +106,7 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err
m := rv.Unwrap()
// If we got the original metric back, use that and drop the new one.
// Otherwise mark the original as accepted and use the new metric.
if origMetric.HashID() == rv.ID {
if rv.ID != 0 {
acc.AddMetric(origMetric)
} else {
origMetric.Accept()

View File

@ -3426,6 +3426,15 @@ def apply(metric):
def apply(metric):
x = deepcopy(metric, track=True)
return [x]
`,
},
{
name: "issue #14484",
numMetrics: 1,
source: `
def apply(metric):
metric.tags.pop("tag1")
return [metric]
`,
},
}
@ -3468,6 +3477,92 @@ def apply(metric):
}
}
func TestTrackingStateful(t *testing.T) {
var testCases = []struct {
name string
source string
results int
loops int
delivery int
}{
{
name: "delayed release",
loops: 4,
results: 3,
delivery: 4,
source: `
state = {"last": None}
def apply(metric):
previous = state["last"]
state["last"] = deepcopy(metric)
return previous
`,
},
{
name: "delayed release with tracking",
loops: 4,
results: 3,
delivery: 3,
source: `
state = {"last": None}
def apply(metric):
previous = state["last"]
state["last"] = deepcopy(metric, track=True)
return previous
`,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
// Create a tracking metric and tap the delivery information
var mu sync.Mutex
delivered := make([]telegraf.TrackingID, 0, tt.delivery)
notify := func(di telegraf.DeliveryInfo) {
mu.Lock()
defer mu.Unlock()
delivered = append(delivered, di.ID())
}
// Configure the plugin
plugin := newStarlarkFromSource(tt.source)
require.NoError(t, plugin.Init())
acc := &testutil.Accumulator{}
require.NoError(t, plugin.Start(acc))
// Do the requested number of loops
expected := make([]telegraf.TrackingID, 0, tt.loops)
for i := 0; i < tt.loops; i++ {
// Process expected metrics and compare with resulting metrics
input, tid := metric.WithTracking(testutil.TestMetric(i), notify)
expected = append(expected, tid)
require.NoError(t, plugin.Add(input, acc))
}
plugin.Stop()
expected = expected[:tt.delivery]
// Simulate output acknowledging delivery of metrics and check delivery
actual := acc.GetTelegrafMetrics()
// Ensure we get back the correct number of metrics
require.Lenf(t, actual, tt.results, "expected %d metrics but got %d", tt.results, len(actual))
for _, m := range actual {
m.Accept()
}
require.Eventuallyf(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(delivered) >= tt.delivery
}, 1*time.Second, 100*time.Millisecond, "original metric(s) not delivered")
mu.Lock()
defer mu.Unlock()
require.ElementsMatch(t, expected, delivered, "mismatch in delivered metrics")
})
}
}
// parses metric lines out of line protocol following a header, with a trailing blank line
func parseMetricsFrom(t *testing.T, lines []string, header string) (metrics []telegraf.Metric) {
parser := &influx.Parser{}