From e490983fe1badad2f1e3ad19b236a9a2de0e3030 Mon Sep 17 00:00:00 2001 From: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> Date: Sun, 18 Feb 2024 15:31:43 -0600 Subject: [PATCH] fix(processors.unpivot): Handle tracking metrics correctly (#14832) --- plugins/processors/unpivot/unpivot.go | 7 +++- plugins/processors/unpivot/unpivot_test.go | 43 ++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/plugins/processors/unpivot/unpivot.go b/plugins/processors/unpivot/unpivot.go index 7163d1b28..3f41d6bb7 100644 --- a/plugins/processors/unpivot/unpivot.go +++ b/plugins/processors/unpivot/unpivot.go @@ -64,7 +64,12 @@ func (p *Unpivot) Apply(metrics ...telegraf.Metric) []telegraf.Metric { results := make([]telegraf.Metric, 0, fieldCount) for _, m := range metrics { - base := copyWithoutFields(m) + base := m + if wm, ok := m.(telegraf.UnwrappableMetric); ok { + base = wm.Unwrap() + } + base = copyWithoutFields(base) + for _, field := range m.FieldList() { newMetric := base.Copy() newMetric.AddField(p.ValueKey, field.Value) diff --git a/plugins/processors/unpivot/unpivot_test.go b/plugins/processors/unpivot/unpivot_test.go index 6d3c8795b..015251315 100644 --- a/plugins/processors/unpivot/unpivot_test.go +++ b/plugins/processors/unpivot/unpivot_test.go @@ -1,10 +1,13 @@ package unpivot import ( + "strconv" + "sync" "testing" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -219,3 +222,43 @@ func TestUnpivot_fieldMode(t *testing.T) { }) } } + +func TestTrackedMetricNotLost(t *testing.T) { + var mu sync.Mutex + delivered := make([]telegraf.DeliveryInfo, 0, 3) + notify := func(di telegraf.DeliveryInfo) { + mu.Lock() + defer mu.Unlock() + delivered = append(delivered, di) + } + input := make([]telegraf.Metric, 0, 3) + expected := make([]telegraf.Metric, 0, 6) + for i := 0; i < 3; i++ { + strI := strconv.Itoa(i) + + m := metric.New("m"+strI, map[string]string{}, map[string]interface{}{"x": int64(1), "y": int64(2)}, time.Unix(0, 0)) + tm, _ := metric.WithTracking(m, notify) + input = append(input, tm) + + unpivot1 := metric.New("m"+strI, map[string]string{"name": "x"}, map[string]interface{}{"value": int64(1)}, time.Unix(0, 0)) + unpivot2 := metric.New("m"+strI, map[string]string{"name": "y"}, map[string]interface{}{"value": int64(2)}, time.Unix(0, 0)) + expected = append(expected, unpivot1, unpivot2) + } + + // Process expected metrics and compare with resulting metrics + plugin := &Unpivot{TagKey: "name", ValueKey: "value"} + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) + + // Simulate output acknowledging delivery + for _, m := range actual { + m.Accept() + } + + // Check delivery + require.Eventuallyf(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(input) == len(delivered) + }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(input)) +}