test(processors.topk): Add unit-test for tracking metrics (#14810)

Co-authored-by: Sven Rebhan <36194019+srebhan@users.noreply.github.com>
This commit is contained in:
Joshua Powers 2024-02-14 14:56:00 -05:00 committed by GitHub
parent 6b67fae8a8
commit 75efce9201
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 81 additions and 3 deletions

View File

@ -163,7 +163,7 @@ func (t *TopK) Apply(in ...telegraf.Metric) []telegraf.Metric {
// holding undelivered metrics while the input waits for metrics to be
// delivered. Instead, treat all handled metrics as delivered and
// produced metrics as untracked in a similar way to aggregators.
m.Drop()
m.Accept()
// Check if the metric has any of the fields over which we are aggregating
hasField := false

View File

@ -1,11 +1,15 @@
package topk
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
@ -55,11 +59,11 @@ func generateAns(input []telegraf.Metric, changeSet map[int]metricChange) []tele
// For every input metric, we check if there is a change we need to apply
// If there is no change for a given input metric, the metric is dropped
for i, metric := range input {
for i, m := range input {
change, ok := changeSet[i]
if ok {
// Deep copy the metric
newMetric := metric.Copy()
newMetric := m.Copy()
// Add new fields
if change.newFields != nil {
@ -501,3 +505,77 @@ func TestTopkGroupByKeyTag(t *testing.T) {
// Run the test
runAndCompare(&topk, input, answer, "GroupByKeyTag test", t)
}
func TestTracking(t *testing.T) {
inputRaw := []telegraf.Metric{
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 100}, time.Unix(0, 0)),
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 22}, time.Unix(0, 0)),
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 1}, time.Unix(0, 0)),
}
var mu sync.Mutex
delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw))
notify := func(di telegraf.DeliveryInfo) {
mu.Lock()
defer mu.Unlock()
delivered = append(delivered, di)
}
input := make([]telegraf.Metric, 0, len(inputRaw))
for _, m := range inputRaw {
tm, _ := metric.WithTracking(m, notify)
input = append(input, tm)
}
expected := []telegraf.Metric{
metric.New(
"foo",
map[string]string{},
map[string]interface{}{"value": 100},
time.Unix(0, 0),
),
metric.New(
"bar",
map[string]string{},
map[string]interface{}{"value": 22},
time.Unix(0, 0),
),
metric.New(
"baz",
map[string]string{},
map[string]interface{}{"value": 1},
time.Unix(0, 0),
),
}
// Only doing this over 1 period, so we should expect the same number of
// metrics back.
plugin := &TopK{
Period: 1,
K: 3,
Aggregation: "mean",
Fields: []string{"value"},
Log: testutil.Logger{},
}
plugin.Reset()
// Process expected metrics and compare with resulting metrics
var actual []telegraf.Metric
require.Eventuallyf(t, func() bool {
actual = plugin.Apply(input...)
return len(actual) > 0
}, time.Second, 100*time.Millisecond, "never got any metrics")
testutil.RequireMetricsEqual(t, expected, actual)
// Simulate output acknowledging delivery
for _, m := range actual {
m.Accept()
}
// Check delivery
require.Eventuallyf(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(expected) == len(delivered)
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected))
}