From a01d9624e20afc90896dfe279ed3e64171113251 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 8 Feb 2024 21:57:12 +0100 Subject: [PATCH] test(processors.clone): Add unit-test for tracking metrics (#14737) --- plugins/processors/clone/clone.go | 19 +- plugins/processors/clone/clone_test.go | 240 ++++++++++++++++++++----- 2 files changed, 209 insertions(+), 50 deletions(-) diff --git a/plugins/processors/clone/clone.go b/plugins/processors/clone/clone.go index 007317a8a..cfe1bd071 100644 --- a/plugins/processors/clone/clone.go +++ b/plugins/processors/clone/clone.go @@ -23,25 +23,26 @@ func (*Clone) SampleConfig() string { } func (c *Clone) Apply(in ...telegraf.Metric) []telegraf.Metric { - cloned := []telegraf.Metric{} - - for _, metric := range in { - cloned = append(cloned, metric.Copy()) + out := make([]telegraf.Metric, 0, 2*len(in)) + for _, original := range in { + m := original.Copy() if len(c.NameOverride) > 0 { - metric.SetName(c.NameOverride) + m.SetName(c.NameOverride) } if len(c.NamePrefix) > 0 { - metric.AddPrefix(c.NamePrefix) + m.AddPrefix(c.NamePrefix) } if len(c.NameSuffix) > 0 { - metric.AddSuffix(c.NameSuffix) + m.AddSuffix(c.NameSuffix) } for key, value := range c.Tags { - metric.AddTag(key, value) + m.AddTag(key, value) } + out = append(out, m) } - return append(in, cloned...) + + return append(out, in...) } func init() { diff --git a/plugins/processors/clone/clone_test.go b/plugins/processors/clone/clone_test.go index 93d54a93a..09d9a1fdc 100644 --- a/plugins/processors/clone/clone_test.go +++ b/plugins/processors/clone/clone_test.go @@ -1,6 +1,7 @@ package clone import ( + "sync" "testing" "time" @@ -8,77 +9,234 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" ) -func createTestMetric() telegraf.Metric { - m := metric.New("m1", +func TestRetainsTags(t *testing.T) { + input := metric.New( + "m1", map[string]string{"metric_tag": "from_metric"}, map[string]interface{}{"value": int64(1)}, - time.Now(), + time.Unix(0, 0), ) - return m -} -func calculateProcessedTags(processor Clone, m telegraf.Metric) map[string]string { - processed := processor.Apply(m) - return processed[0].Tags() -} + expected := []telegraf.Metric{ + metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + } -func TestRetainsTags(t *testing.T) { - processor := Clone{} - - tags := calculateProcessedTags(processor, createTestMetric()) - - value, present := tags["metric_tag"] - require.True(t, present, "Tag of metric was not present") - require.Equal(t, "from_metric", value, "Value of Tag was changed") + plugin := &Clone{} + actual := plugin.Apply(input) + testutil.RequireMetricsEqual(t, expected, actual) } func TestAddTags(t *testing.T) { - processor := Clone{Tags: map[string]string{"added_tag": "from_config", "another_tag": ""}} + input := metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ) - tags := calculateProcessedTags(processor, createTestMetric()) + expected := []telegraf.Metric{ + metric.New( + "m1", + map[string]string{ + "metric_tag": "from_metric", + "added_tag": "from_config", + "another_tag": "", + }, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + } - value, present := tags["added_tag"] - require.True(t, present, "Additional Tag of metric was not present") - require.Equal(t, "from_config", value, "Value of Tag was changed") - require.Len(t, tags, 3, "Should have one previous and two added tags.") + plugin := &Clone{ + Tags: map[string]string{ + "added_tag": "from_config", + "another_tag": "", + }, + } + actual := plugin.Apply(input) + testutil.RequireMetricsEqual(t, expected, actual) } func TestOverwritesPresentTagValues(t *testing.T) { - processor := Clone{Tags: map[string]string{"metric_tag": "from_config"}} + input := metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ) - tags := calculateProcessedTags(processor, createTestMetric()) + expected := []telegraf.Metric{ + metric.New( + "m1", + map[string]string{"metric_tag": "from_config"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + } - value, present := tags["metric_tag"] - require.True(t, present, "Tag of metric was not present") - require.Len(t, tags, 1, "Should only have one tag.") - require.Equal(t, "from_config", value, "Value of Tag was not changed") + plugin := &Clone{ + Tags: map[string]string{"metric_tag": "from_config"}, + } + actual := plugin.Apply(input) + testutil.RequireMetricsEqual(t, expected, actual) } func TestOverridesName(t *testing.T) { - processor := Clone{NameOverride: "overridden"} + input := metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ) - processed := processor.Apply(createTestMetric()) + expected := []telegraf.Metric{ + metric.New( + "overridden", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + } - require.Equal(t, "overridden", processed[0].Name(), "Name was not overridden") - require.Equal(t, "m1", processed[1].Name(), "Original metric was modified") + plugin := &Clone{NameOverride: "overridden"} + actual := plugin.Apply(input) + testutil.RequireMetricsEqual(t, expected, actual) } func TestNamePrefix(t *testing.T) { - processor := Clone{NamePrefix: "Pre-"} + input := metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ) - processed := processor.Apply(createTestMetric()) + expected := []telegraf.Metric{ + metric.New( + "Pre-m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + } - require.Equal(t, "Pre-m1", processed[0].Name(), "Prefix was not applied") - require.Equal(t, "m1", processed[1].Name(), "Original metric was modified") + plugin := &Clone{NamePrefix: "Pre-"} + actual := plugin.Apply(input) + testutil.RequireMetricsEqual(t, expected, actual) } func TestNameSuffix(t *testing.T) { - processor := Clone{NameSuffix: "-suff"} + input := metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ) - processed := processor.Apply(createTestMetric()) + expected := []telegraf.Metric{ + metric.New( + "m1-suff", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Unix(0, 0), + ), + } - require.Equal(t, "m1-suff", processed[0].Name(), "Suffix was not applied") - require.Equal(t, "m1", processed[1].Name(), "Original metric was modified") + plugin := &Clone{NameSuffix: "-suff"} + actual := plugin.Apply(input) + testutil.RequireMetricsEqual(t, expected, actual) +} + +func TestTracking(t *testing.T) { + inputRaw := []telegraf.Metric{ + metric.New( + "m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Now(), + ), + metric.New( + "m2", + map[string]string{"metric_tag": "foo_metric"}, + map[string]interface{}{"value": int64(2)}, + time.Now(), + ), + } + + var mu sync.Mutex + delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw)) + notify := func(di telegraf.DeliveryInfo) { + mu.Lock() + defer mu.Unlock() + delivered = append(delivered, di) + } + input := make([]telegraf.Metric, 0, len(inputRaw)) + expected := make([]telegraf.Metric, 0, 2*len(input)) + for _, m := range inputRaw { + tm, _ := metric.WithTracking(m, notify) + input = append(input, tm) + expected = append(expected, m) + } + expected = append(expected, input...) + + // Process expected metrics and compare with resulting metrics + plugin := &Clone{} + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) + + // Simulate output acknowledging delivery + for _, m := range actual { + m.Accept() + } + + // Check delivery + require.Eventuallyf(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(input) == len(delivered) + }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected)) }