diff --git a/plugins/processors/topk/topk.go b/plugins/processors/topk/topk.go index 5ba973cba..05fa1ecb5 100644 --- a/plugins/processors/topk/topk.go +++ b/plugins/processors/topk/topk.go @@ -163,7 +163,7 @@ func (t *TopK) Apply(in ...telegraf.Metric) []telegraf.Metric { // holding undelivered metrics while the input waits for metrics to be // delivered. Instead, treat all handled metrics as delivered and // produced metrics as untracked in a similar way to aggregators. - m.Drop() + m.Accept() // Check if the metric has any of the fields over which we are aggregating hasField := false diff --git a/plugins/processors/topk/topk_test.go b/plugins/processors/topk/topk_test.go index 8e2550a35..de148a158 100644 --- a/plugins/processors/topk/topk_test.go +++ b/plugins/processors/topk/topk_test.go @@ -1,11 +1,15 @@ package topk import ( + "sync" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) @@ -55,11 +59,11 @@ func generateAns(input []telegraf.Metric, changeSet map[int]metricChange) []tele // For every input metric, we check if there is a change we need to apply // If there is no change for a given input metric, the metric is dropped - for i, metric := range input { + for i, m := range input { change, ok := changeSet[i] if ok { // Deep copy the metric - newMetric := metric.Copy() + newMetric := m.Copy() // Add new fields if change.newFields != nil { @@ -501,3 +505,77 @@ func TestTopkGroupByKeyTag(t *testing.T) { // Run the test runAndCompare(&topk, input, answer, "GroupByKeyTag test", t) } + +func TestTracking(t *testing.T) { + inputRaw := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 100}, time.Unix(0, 0)), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 22}, time.Unix(0, 0)), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 1}, time.Unix(0, 0)), + } + + var mu sync.Mutex + delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw)) + notify := func(di telegraf.DeliveryInfo) { + mu.Lock() + defer mu.Unlock() + delivered = append(delivered, di) + } + + input := make([]telegraf.Metric, 0, len(inputRaw)) + for _, m := range inputRaw { + tm, _ := metric.WithTracking(m, notify) + input = append(input, tm) + } + + expected := []telegraf.Metric{ + metric.New( + "foo", + map[string]string{}, + map[string]interface{}{"value": 100}, + time.Unix(0, 0), + ), + metric.New( + "bar", + map[string]string{}, + map[string]interface{}{"value": 22}, + time.Unix(0, 0), + ), + metric.New( + "baz", + map[string]string{}, + map[string]interface{}{"value": 1}, + time.Unix(0, 0), + ), + } + + // Only doing this over 1 period, so we should expect the same number of + // metrics back. + plugin := &TopK{ + Period: 1, + K: 3, + Aggregation: "mean", + Fields: []string{"value"}, + Log: testutil.Logger{}, + } + plugin.Reset() + + // Process expected metrics and compare with resulting metrics + var actual []telegraf.Metric + require.Eventuallyf(t, func() bool { + actual = plugin.Apply(input...) + return len(actual) > 0 + }, time.Second, 100*time.Millisecond, "never got any metrics") + testutil.RequireMetricsEqual(t, expected, actual) + + // Simulate output acknowledging delivery + for _, m := range actual { + m.Accept() + } + + // Check delivery + require.Eventuallyf(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(expected) == len(delivered) + }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected)) +}