fix(processors.starlark): Avoid negative refcounts for tracking metrics (#14395)

This commit is contained in:
Sven Rebhan 2023-12-07 16:45:09 +01:00 committed by GitHub
parent aa681be594
commit 57fbc73814
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 187 additions and 88 deletions

View File

@ -142,3 +142,9 @@ type UnwrappableMetric interface {
// wraps it in the first place.
Unwrap() Metric
}
type TrackingMetric interface {
// TrackingID returns the ID used for tracking the metric
TrackingID() TrackingID
UnwrappableMetric
}

View File

@ -150,6 +150,11 @@ func (m *trackingMetric) decr() {
}
}
// Unwrap allows to access the underlying metric directly e.g. for go-templates
func (m *trackingMetric) TrackingID() telegraf.TrackingID {
return m.d.id
}
// Unwrap allows to access the underlying metric directly e.g. for go-templates
func (m *trackingMetric) Unwrap() telegraf.Metric {
return m.Metric

View File

@ -7,6 +7,7 @@ import (
"go.starlark.net/starlark"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
@ -49,13 +50,20 @@ func items(value starlark.Value, errorMsg string) ([]starlark.Tuple, error) {
func deepcopy(_ *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var sm *Metric
if err := starlark.UnpackPositionalArgs("deepcopy", args, kwargs, 1, &sm); err != nil {
var track bool
if err := starlark.UnpackArgs("deepcopy", args, kwargs, "source", &sm, "track?", &track); err != nil {
return nil, err
}
dup := sm.metric.Copy()
dup.Drop()
return &Metric{metric: dup}, nil
// In case we copy a tracking metric but do not want to track the result,
// we have to strip the tracking information. This can be done by unwrapping
// the metric.
if tm, ok := sm.metric.(telegraf.TrackingMetric); ok && !track {
return &Metric{metric: tm.Unwrap().Copy()}, nil
}
// Copy the whole metric including potential tracking information
return &Metric{metric: sm.metric.Copy()}, nil
}
// catch(f) evaluates f() and returns its evaluation error message

View File

@ -2024,38 +2024,34 @@ func TestParse_DeltaCounter(t *testing.T) {
require.Eventuallyf(t, func() bool {
require.NoError(t, statsd.Gather(acc))
acc.Lock()
defer acc.Unlock()
fmt.Println(acc.NMetrics())
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
"temporality": "delta",
},
map[string]interface{}{
"value": 42,
},
time.Now(),
telegraf.Counter,
),
}
got := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, got, testutil.IgnoreTime(), testutil.IgnoreFields("start_time"))
startTime, ok := got[0].GetField("start_time")
require.True(t, ok, "expected start_time field")
startTimeStr, ok := startTime.(string)
require.True(t, ok, "expected start_time field to be a string")
_, err = time.Parse(time.RFC3339, startTimeStr)
require.NoError(t, err, "execpted start_time field to be in RFC3339 format")
return acc.NMetrics() >= 1
}, time.Second, 100*time.Millisecond, "Expected 1 metric found %d", acc.NMetrics())
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
"temporality": "delta",
},
map[string]interface{}{
"value": 42,
},
time.Now(),
telegraf.Counter,
),
}
got := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, got, testutil.IgnoreTime(), testutil.IgnoreFields("start_time"))
startTime, ok := got[0].GetField("start_time")
require.True(t, ok, "expected start_time field")
startTimeStr, ok := startTime.(string)
require.True(t, ok, "expected start_time field to be a string")
_, err = time.Parse(time.RFC3339, startTimeStr)
require.NoError(t, err, "execpted start_time field to be in RFC3339 format")
require.NoError(t, conn.Close())
}

View File

@ -356,17 +356,7 @@ cpu,42
return &parser, err
})
err = plugin.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
defer plugin.Stop()
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
plugin.Stop()
require.NoError(t, plugin.Init())
expected := []telegraf.Metric{
testutil.MustMetric("cpu",
@ -386,6 +376,15 @@ cpu,42
},
time.Unix(0, 0)),
}
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
require.NoError(t, plugin.Gather(&acc))
require.Eventually(t, func() bool {
return acc.NFields() >= len(expected)
}, 3*time.Second, 100*time.Millisecond)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}

View File

@ -84,7 +84,12 @@ of type int, float, string, or bool.
The timestamp of the metric as an integer in nanoseconds since the Unix
epoch.
- **deepcopy(*metric*)**: Make a copy of an existing metric.
- **deepcopy(*metric*, *track=false*)**:
Copy an existing metric with or without tracking information. If `track` is set
to `true`, the tracking information is copied.
**Caution:** Make sure to always return *all* metrics with tracking information!
Otherwise, the corresponding inputs will never receive the delivery information
and potentially overrun!
### Python Differences

View File

@ -79,7 +79,6 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err
// the original metric to the accumulator
if v.ID == origMetric.HashID() {
origFound = true
m.Accept()
s.results = append(s.results, origMetric)
acc.AddMetric(origMetric)
continue
@ -108,7 +107,6 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err
// If we got the original metric back, use that and drop the new one.
// Otherwise mark the original as accepted and use the new metric.
if origMetric.HashID() == rv.ID {
m.Accept()
acc.AddMetric(origMetric)
} else {
origMetric.Accept()

View File

@ -3400,6 +3400,32 @@ def apply(metric):
newmetric = Metric("new_metric")
newmetric.fields["vaue"] = 42
return [metric, newmetric]
`,
},
{
name: "return original and deep-copy",
numMetrics: 2,
source: `
def apply(metric):
return [metric, deepcopy(metric, track=True)]
`,
},
{
name: "deep-copy but do not return",
numMetrics: 1,
source: `
def apply(metric):
x = deepcopy(metric)
return [metric]
`,
},
{
name: "deep-copy but do not return original metric",
numMetrics: 1,
source: `
def apply(metric):
x = deepcopy(metric, track=True)
return [x]
`,
},
}
@ -3426,8 +3452,9 @@ def apply(metric):
plugin.Stop()
// Ensure we get back the correct number of metrics
require.Len(t, acc.GetTelegrafMetrics(), tt.numMetrics)
for _, m := range acc.GetTelegrafMetrics() {
actual := acc.GetTelegrafMetrics()
require.Lenf(t, actual, tt.numMetrics, "expected %d metrics but got %d", tt.numMetrics, len(actual))
for _, m := range actual {
m.Accept()
}

View File

@ -1,7 +1,7 @@
# Filter metrics by value
'''
In this example we look at the `value` field of the metric.
If the value is zeor, we delete all the fields, effectively dropping the metric.
If the value is zero, we delete all the fields, effectively dropping the metric.
Example Input:
temperature sensor="001A0",value=111.48 1618488000000000999

View File

@ -12,17 +12,9 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
var (
lastID uint64
)
func newTrackingID() telegraf.TrackingID {
id := atomic.AddUint64(&lastID, 1)
return telegraf.TrackingID(id)
}
// Metric defines a single point measurement
type Metric struct {
Measurement string
@ -38,33 +30,49 @@ func (p *Metric) String() string {
// Accumulator defines a mocked out accumulator
type Accumulator struct {
sync.Mutex
*sync.Cond
Metrics []*Metric
nMetrics uint64
Discard bool
Errors []error
debug bool
delivered chan telegraf.DeliveryInfo
nMetrics uint64 // Needs to be first to avoid unaligned atomic operations on 32-bit archs
Metrics []*Metric
accumulated []telegraf.Metric
Discard bool
Errors []error
debug bool
deliverChan chan telegraf.DeliveryInfo
delivered []telegraf.DeliveryInfo
TimeFunc func() time.Time
sync.Mutex
*sync.Cond
}
func (a *Accumulator) NMetrics() uint64 {
return atomic.LoadUint64(&a.nMetrics)
}
func (a *Accumulator) NDelivered() int {
a.Lock()
defer a.Unlock()
return len(a.delivered)
}
// GetTelegrafMetrics returns all the metrics collected by the accumulator
// If you are getting race conditions here then you are not waiting for all of your metrics to arrive: see Wait()
func (a *Accumulator) GetTelegrafMetrics() []telegraf.Metric {
metrics := []telegraf.Metric{}
for _, m := range a.Metrics {
metrics = append(metrics, FromTestMetric(m))
}
a.Lock()
defer a.Unlock()
metrics := make([]telegraf.Metric, 0, len(a.accumulated))
metrics = append(metrics, a.accumulated...)
return metrics
}
func (a *Accumulator) GetDeliveries() []telegraf.DeliveryInfo {
a.Lock()
defer a.Unlock()
info := make([]telegraf.DeliveryInfo, 0, len(a.delivered))
info = append(info, a.delivered...)
return info
}
func (a *Accumulator) FirstError() error {
if len(a.Errors) == 0 {
return nil
@ -77,6 +85,7 @@ func (a *Accumulator) ClearMetrics() {
defer a.Unlock()
atomic.StoreUint64(&a.nMetrics, 0)
a.Metrics = make([]*Metric, 0)
a.accumulated = make([]telegraf.Metric, 0)
}
func (a *Accumulator) addMeasurement(
@ -129,7 +138,7 @@ func (a *Accumulator) addMeasurement(
fmt.Print(msg)
}
p := &Metric{
m := &Metric{
Measurement: measurement,
Fields: fieldsCopy,
Tags: tagsCopy,
@ -137,7 +146,8 @@ func (a *Accumulator) addMeasurement(
Type: tp,
}
a.Metrics = append(a.Metrics, p)
a.Metrics = append(a.Metrics, m)
a.accumulated = append(a.accumulated, FromTestMetric(m))
}
// AddFields adds a measurement point with a specified timestamp.
@ -170,7 +180,7 @@ func (a *Accumulator) AddGauge(
func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) {
for _, m := range metrics {
a.addMeasurement(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time())
a.AddMetric(m)
}
}
@ -193,32 +203,57 @@ func (a *Accumulator) AddHistogram(
}
func (a *Accumulator) AddMetric(m telegraf.Metric) {
a.addMeasurement(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time())
a.Lock()
defer a.Unlock()
atomic.AddUint64(&a.nMetrics, 1)
if a.Cond != nil {
a.Cond.Broadcast()
}
if a.Discard {
return
}
// Drop metrics without fields
if len(m.FieldList()) == 0 {
return
}
a.Metrics = append(a.Metrics, ToTestMetric(m))
a.accumulated = append(a.accumulated, m)
}
func (a *Accumulator) WithTracking(_ int) telegraf.TrackingAccumulator {
func (a *Accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator {
a.deliverChan = make(chan telegraf.DeliveryInfo, maxTracked)
a.delivered = make([]telegraf.DeliveryInfo, 0, maxTracked)
return a
}
func (a *Accumulator) AddTrackingMetric(m telegraf.Metric) telegraf.TrackingID {
a.AddMetric(m)
return newTrackingID()
dm, id := metric.WithTracking(m, a.onDelivery)
a.AddMetric(dm)
return id
}
func (a *Accumulator) AddTrackingMetricGroup(group []telegraf.Metric) telegraf.TrackingID {
for _, m := range group {
db, id := metric.WithGroupTracking(group, a.onDelivery)
for _, m := range db {
a.AddMetric(m)
}
return newTrackingID()
return id
}
func (a *Accumulator) onDelivery(info telegraf.DeliveryInfo) {
select {
case a.deliverChan <- info:
default:
// This is a programming error in the input. More items were sent for
// tracking than space requested.
panic("channel is full")
}
}
func (a *Accumulator) Delivered() <-chan telegraf.DeliveryInfo {
a.Lock()
if a.delivered == nil {
a.delivered = make(chan telegraf.DeliveryInfo)
}
a.Unlock()
return a.delivered
return a.deliverChan
}
// AddError appends the given error to Accumulator.Errors.

View File

@ -372,3 +372,23 @@ func FromTestMetric(met *Metric) telegraf.Metric {
m := telegrafMetric.New(met.Measurement, met.Tags, met.Fields, met.Time, met.Type)
return m
}
func ToTestMetric(tm telegraf.Metric) *Metric {
tags := make(map[string]string, len(tm.TagList()))
for _, t := range tm.TagList() {
tags[t.Key] = t.Value
}
fields := make(map[string]interface{}, len(tm.FieldList()))
for _, f := range tm.FieldList() {
fields[f.Key] = f.Value
}
return &Metric{
Measurement: tm.Name(),
Fields: fields,
Tags: tags,
Time: tm.Time(),
Type: tm.Type(),
}
}