chore(processors.unpivot): Cleanup code and improve performance (#16299)

This commit is contained in:
Sven Rebhan 2024-12-13 18:23:50 +01:00 committed by GitHub
parent e15a3c8dc6
commit 2bd4559bc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 124 additions and 68 deletions

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/processors"
) )
@ -18,30 +19,15 @@ type Unpivot struct {
ValueKey string `toml:"value_key"` ValueKey string `toml:"value_key"`
} }
func copyWithoutFields(metric telegraf.Metric) telegraf.Metric {
m := metric.Copy()
fieldKeys := make([]string, 0, len(m.FieldList()))
for _, field := range m.FieldList() {
fieldKeys = append(fieldKeys, field.Key)
}
for _, fk := range fieldKeys {
m.RemoveField(fk)
}
return m
}
func (*Unpivot) SampleConfig() string { func (*Unpivot) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (p *Unpivot) Init() error { func (p *Unpivot) Init() error {
switch p.FieldNameAs { switch p.FieldNameAs {
case "metric":
case "", "tag": case "", "tag":
p.FieldNameAs = "tag" p.FieldNameAs = "tag"
case "metric":
default: default:
return fmt.Errorf("unrecognized metric mode: %q", p.FieldNameAs) return fmt.Errorf("unrecognized metric mode: %q", p.FieldNameAs)
} }
@ -63,27 +49,28 @@ func (p *Unpivot) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
} }
results := make([]telegraf.Metric, 0, fieldCount) results := make([]telegraf.Metric, 0, fieldCount)
for _, m := range metrics { for _, src := range metrics {
base := m // Create a copy without fields and tracking information
if wm, ok := m.(telegraf.UnwrappableMetric); ok { base := metric.New(src.Name(), make(map[string]string), make(map[string]interface{}), src.Time())
base = wm.Unwrap() for _, t := range src.TagList() {
base.AddTag(t.Key, t.Value)
} }
base = copyWithoutFields(base)
for _, field := range m.FieldList() { // Create a new metric per field and add it to the output
newMetric := base.Copy() for _, field := range src.FieldList() {
newMetric.AddField(p.ValueKey, field.Value) m := base.Copy()
m.AddField(p.ValueKey, field.Value)
switch p.FieldNameAs { switch p.FieldNameAs {
case "metric": case "metric":
newMetric.SetName(field.Key) m.SetName(field.Key)
case "", "tag": case "tag":
newMetric.AddTag(p.TagKey, field.Key) m.AddTag(p.TagKey, field.Key)
} }
results = append(results, newMetric) results = append(results, m)
} }
m.Accept() src.Accept()
} }
return results return results
} }

View File

@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestUnpivot_defaults(t *testing.T) { func TestDefaults(t *testing.T) {
unpivot := &Unpivot{} unpivot := &Unpivot{}
require.NoError(t, unpivot.Init()) require.NoError(t, unpivot.Init())
require.Equal(t, "tag", unpivot.FieldNameAs) require.Equal(t, "tag", unpivot.FieldNameAs)
@ -20,25 +20,25 @@ func TestUnpivot_defaults(t *testing.T) {
require.Equal(t, "value", unpivot.ValueKey) require.Equal(t, "value", unpivot.ValueKey)
} }
func TestUnpivot_invalidMetricMode(t *testing.T) { func TestInvalidMetricMode(t *testing.T) {
unpivot := &Unpivot{FieldNameAs: "unknown"} unpivot := &Unpivot{FieldNameAs: "unknown"}
require.Error(t, unpivot.Init()) require.Error(t, unpivot.Init())
} }
func TestUnpivot_originalMode(t *testing.T) { func TestOriginalMode(t *testing.T) {
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
name string name string
unpivot *Unpivot tagKey string
valueKey string
metrics []telegraf.Metric metrics []telegraf.Metric
expected []telegraf.Metric expected []telegraf.Metric
}{ }{
{ {
name: "simple", name: "simple",
unpivot: &Unpivot{ tagKey: "name",
TagKey: "name", valueKey: "value",
ValueKey: "value",
},
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric("cpu", testutil.MustMetric("cpu",
map[string]string{}, map[string]string{},
@ -62,10 +62,8 @@ func TestUnpivot_originalMode(t *testing.T) {
}, },
{ {
name: "multi fields", name: "multi fields",
unpivot: &Unpivot{ tagKey: "name",
TagKey: "name", valueKey: "value",
ValueKey: "value",
},
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric("cpu", testutil.MustMetric("cpu",
map[string]string{}, map[string]string{},
@ -100,27 +98,33 @@ func TestUnpivot_originalMode(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
actual := tt.unpivot.Apply(tt.metrics...) plugin := &Unpivot{
TagKey: tt.tagKey,
ValueKey: tt.valueKey,
}
require.NoError(t, plugin.Init())
actual := plugin.Apply(tt.metrics...)
testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.SortMetrics()) testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.SortMetrics())
}) })
} }
} }
func TestUnpivot_fieldMode(t *testing.T) { func TestFieldMode(t *testing.T) {
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
name string name string
unpivot *Unpivot fieldNameAs string
tagKey string
valueKey string
metrics []telegraf.Metric metrics []telegraf.Metric
expected []telegraf.Metric expected []telegraf.Metric
}{ }{
{ {
name: "simple", name: "simple",
unpivot: &Unpivot{ fieldNameAs: "metric",
FieldNameAs: "metric", tagKey: "name",
TagKey: "name", valueKey: "value",
ValueKey: "value",
},
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric("cpu", testutil.MustMetric("cpu",
map[string]string{}, map[string]string{},
@ -142,11 +146,9 @@ func TestUnpivot_fieldMode(t *testing.T) {
}, },
{ {
name: "multi fields", name: "multi fields",
unpivot: &Unpivot{ fieldNameAs: "metric",
FieldNameAs: "metric", tagKey: "name",
TagKey: "name", valueKey: "value",
ValueKey: "value",
},
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric("cpu", testutil.MustMetric("cpu",
map[string]string{}, map[string]string{},
@ -176,11 +178,9 @@ func TestUnpivot_fieldMode(t *testing.T) {
}, },
{ {
name: "multi fields and tags", name: "multi fields and tags",
unpivot: &Unpivot{ fieldNameAs: "metric",
FieldNameAs: "metric", tagKey: "name",
TagKey: "name", valueKey: "value",
ValueKey: "value",
},
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric("cpu", testutil.MustMetric("cpu",
map[string]string{ map[string]string{
@ -217,7 +217,14 @@ func TestUnpivot_fieldMode(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
actual := tt.unpivot.Apply(tt.metrics...) plugin := &Unpivot{
FieldNameAs: tt.fieldNameAs,
TagKey: tt.tagKey,
ValueKey: tt.valueKey,
}
require.NoError(t, plugin.Init())
actual := plugin.Apply(tt.metrics...)
testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.SortMetrics()) testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.SortMetrics())
}) })
} }
@ -247,6 +254,8 @@ func TestTrackedMetricNotLost(t *testing.T) {
// Process expected metrics and compare with resulting metrics // Process expected metrics and compare with resulting metrics
plugin := &Unpivot{TagKey: "name", ValueKey: "value"} plugin := &Unpivot{TagKey: "name", ValueKey: "value"}
require.NoError(t, plugin.Init())
actual := plugin.Apply(input...) actual := plugin.Apply(input...)
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
@ -262,3 +271,63 @@ func TestTrackedMetricNotLost(t *testing.T) {
return len(input) == len(delivered) return len(input) == len(delivered)
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(input)) }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(input))
} }
func BenchmarkAsTag(b *testing.B) {
input := metric.New(
"test",
map[string]string{
"source": "device A",
"location": "main building",
},
map[string]interface{}{
"field0": 0.1,
"field1": 1.2,
"field2": 2.3,
"field3": 3.4,
"field4": 4.5,
"field5": 5.6,
"field6": 6.7,
"field7": 7.8,
"field8": 8.9,
"field9": 9.0,
},
time.Now(),
)
plugin := &Unpivot{}
require.NoError(b, plugin.Init())
for n := 0; n < b.N; n++ {
plugin.Apply(input)
}
}
func BenchmarkAsMetric(b *testing.B) {
input := metric.New(
"test",
map[string]string{
"source": "device A",
"location": "main building",
},
map[string]interface{}{
"field0": 0.1,
"field1": 1.2,
"field2": 2.3,
"field3": 3.4,
"field4": 4.5,
"field5": 5.6,
"field6": 6.7,
"field7": 7.8,
"field8": 8.9,
"field9": 9.0,
},
time.Now(),
)
plugin := &Unpivot{FieldNameAs: "metric"}
require.NoError(b, plugin.Init())
for n := 0; n < b.N; n++ {
plugin.Apply(input)
}
}