fix(processors.unpivot): Handle tracking metrics correctly (#14832)

This commit is contained in:
Dane Strandboge 2024-02-18 15:31:43 -06:00 committed by GitHub
parent 35e79fd824
commit e490983fe1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 49 additions and 1 deletions

View File

@ -64,7 +64,12 @@ func (p *Unpivot) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
results := make([]telegraf.Metric, 0, fieldCount)
for _, m := range metrics {
base := copyWithoutFields(m)
base := m
if wm, ok := m.(telegraf.UnwrappableMetric); ok {
base = wm.Unwrap()
}
base = copyWithoutFields(base)
for _, field := range m.FieldList() {
newMetric := base.Copy()
newMetric.AddField(p.ValueKey, field.Value)

View File

@ -1,10 +1,13 @@
package unpivot
import (
"strconv"
"sync"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
@ -219,3 +222,43 @@ func TestUnpivot_fieldMode(t *testing.T) {
})
}
}
func TestTrackedMetricNotLost(t *testing.T) {
var mu sync.Mutex
delivered := make([]telegraf.DeliveryInfo, 0, 3)
notify := func(di telegraf.DeliveryInfo) {
mu.Lock()
defer mu.Unlock()
delivered = append(delivered, di)
}
input := make([]telegraf.Metric, 0, 3)
expected := make([]telegraf.Metric, 0, 6)
for i := 0; i < 3; i++ {
strI := strconv.Itoa(i)
m := metric.New("m"+strI, map[string]string{}, map[string]interface{}{"x": int64(1), "y": int64(2)}, time.Unix(0, 0))
tm, _ := metric.WithTracking(m, notify)
input = append(input, tm)
unpivot1 := metric.New("m"+strI, map[string]string{"name": "x"}, map[string]interface{}{"value": int64(1)}, time.Unix(0, 0))
unpivot2 := metric.New("m"+strI, map[string]string{"name": "y"}, map[string]interface{}{"value": int64(2)}, time.Unix(0, 0))
expected = append(expected, unpivot1, unpivot2)
}
// Process expected metrics and compare with resulting metrics
plugin := &Unpivot{TagKey: "name", ValueKey: "value"}
actual := plugin.Apply(input...)
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
// Simulate output acknowledging delivery
for _, m := range actual {
m.Accept()
}
// Check delivery
require.Eventuallyf(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(input) == len(delivered)
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(input))
}