diff --git a/plugins/processors/unpivot/unpivot.go b/plugins/processors/unpivot/unpivot.go index 3f41d6bb7..53bfb25ae 100644 --- a/plugins/processors/unpivot/unpivot.go +++ b/plugins/processors/unpivot/unpivot.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/processors" ) @@ -18,30 +19,15 @@ type Unpivot struct { ValueKey string `toml:"value_key"` } -func copyWithoutFields(metric telegraf.Metric) telegraf.Metric { - m := metric.Copy() - - fieldKeys := make([]string, 0, len(m.FieldList())) - for _, field := range m.FieldList() { - fieldKeys = append(fieldKeys, field.Key) - } - - for _, fk := range fieldKeys { - m.RemoveField(fk) - } - - return m -} - func (*Unpivot) SampleConfig() string { return sampleConfig } func (p *Unpivot) Init() error { switch p.FieldNameAs { - case "metric": case "", "tag": p.FieldNameAs = "tag" + case "metric": default: return fmt.Errorf("unrecognized metric mode: %q", p.FieldNameAs) } @@ -63,27 +49,28 @@ func (p *Unpivot) Apply(metrics ...telegraf.Metric) []telegraf.Metric { } results := make([]telegraf.Metric, 0, fieldCount) - for _, m := range metrics { - base := m - if wm, ok := m.(telegraf.UnwrappableMetric); ok { - base = wm.Unwrap() + for _, src := range metrics { + // Create a copy without fields and tracking information + base := metric.New(src.Name(), make(map[string]string), make(map[string]interface{}), src.Time()) + for _, t := range src.TagList() { + base.AddTag(t.Key, t.Value) } - base = copyWithoutFields(base) - for _, field := range m.FieldList() { - newMetric := base.Copy() - newMetric.AddField(p.ValueKey, field.Value) + // Create a new metric per field and add it to the output + for _, field := range src.FieldList() { + m := base.Copy() + m.AddField(p.ValueKey, field.Value) switch p.FieldNameAs { case "metric": - newMetric.SetName(field.Key) - case "", "tag": - newMetric.AddTag(p.TagKey, field.Key) + m.SetName(field.Key) + case "tag": + m.AddTag(p.TagKey, field.Key) } - results = append(results, newMetric) + results = append(results, m) } - m.Accept() + src.Accept() } return results } diff --git a/plugins/processors/unpivot/unpivot_test.go b/plugins/processors/unpivot/unpivot_test.go index 015251315..b44632db3 100644 --- a/plugins/processors/unpivot/unpivot_test.go +++ b/plugins/processors/unpivot/unpivot_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestUnpivot_defaults(t *testing.T) { +func TestDefaults(t *testing.T) { unpivot := &Unpivot{} require.NoError(t, unpivot.Init()) require.Equal(t, "tag", unpivot.FieldNameAs) @@ -20,25 +20,25 @@ func TestUnpivot_defaults(t *testing.T) { require.Equal(t, "value", unpivot.ValueKey) } -func TestUnpivot_invalidMetricMode(t *testing.T) { +func TestInvalidMetricMode(t *testing.T) { unpivot := &Unpivot{FieldNameAs: "unknown"} require.Error(t, unpivot.Init()) } -func TestUnpivot_originalMode(t *testing.T) { +func TestOriginalMode(t *testing.T) { now := time.Now() tests := []struct { name string - unpivot *Unpivot + tagKey string + valueKey string + metrics []telegraf.Metric expected []telegraf.Metric }{ { - name: "simple", - unpivot: &Unpivot{ - TagKey: "name", - ValueKey: "value", - }, + name: "simple", + tagKey: "name", + valueKey: "value", metrics: []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{}, @@ -61,11 +61,9 @@ func TestUnpivot_originalMode(t *testing.T) { }, }, { - name: "multi fields", - unpivot: &Unpivot{ - TagKey: "name", - ValueKey: "value", - }, + name: "multi fields", + tagKey: "name", + valueKey: "value", metrics: []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{}, @@ -100,27 +98,33 @@ func TestUnpivot_originalMode(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actual := tt.unpivot.Apply(tt.metrics...) + plugin := &Unpivot{ + TagKey: tt.tagKey, + ValueKey: tt.valueKey, + } + require.NoError(t, plugin.Init()) + + actual := plugin.Apply(tt.metrics...) testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.SortMetrics()) }) } } -func TestUnpivot_fieldMode(t *testing.T) { +func TestFieldMode(t *testing.T) { now := time.Now() tests := []struct { - name string - unpivot *Unpivot - metrics []telegraf.Metric - expected []telegraf.Metric + name string + fieldNameAs string + tagKey string + valueKey string + metrics []telegraf.Metric + expected []telegraf.Metric }{ { - name: "simple", - unpivot: &Unpivot{ - FieldNameAs: "metric", - TagKey: "name", - ValueKey: "value", - }, + name: "simple", + fieldNameAs: "metric", + tagKey: "name", + valueKey: "value", metrics: []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{}, @@ -141,12 +145,10 @@ func TestUnpivot_fieldMode(t *testing.T) { }, }, { - name: "multi fields", - unpivot: &Unpivot{ - FieldNameAs: "metric", - TagKey: "name", - ValueKey: "value", - }, + name: "multi fields", + fieldNameAs: "metric", + tagKey: "name", + valueKey: "value", metrics: []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{}, @@ -175,12 +177,10 @@ func TestUnpivot_fieldMode(t *testing.T) { }, }, { - name: "multi fields and tags", - unpivot: &Unpivot{ - FieldNameAs: "metric", - TagKey: "name", - ValueKey: "value", - }, + name: "multi fields and tags", + fieldNameAs: "metric", + tagKey: "name", + valueKey: "value", metrics: []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{ @@ -217,7 +217,14 @@ func TestUnpivot_fieldMode(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actual := tt.unpivot.Apply(tt.metrics...) + plugin := &Unpivot{ + FieldNameAs: tt.fieldNameAs, + TagKey: tt.tagKey, + ValueKey: tt.valueKey, + } + require.NoError(t, plugin.Init()) + + actual := plugin.Apply(tt.metrics...) testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.SortMetrics()) }) } @@ -247,6 +254,8 @@ func TestTrackedMetricNotLost(t *testing.T) { // Process expected metrics and compare with resulting metrics plugin := &Unpivot{TagKey: "name", ValueKey: "value"} + require.NoError(t, plugin.Init()) + actual := plugin.Apply(input...) testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) @@ -262,3 +271,63 @@ func TestTrackedMetricNotLost(t *testing.T) { return len(input) == len(delivered) }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(input)) } + +func BenchmarkAsTag(b *testing.B) { + input := metric.New( + "test", + map[string]string{ + "source": "device A", + "location": "main building", + }, + map[string]interface{}{ + "field0": 0.1, + "field1": 1.2, + "field2": 2.3, + "field3": 3.4, + "field4": 4.5, + "field5": 5.6, + "field6": 6.7, + "field7": 7.8, + "field8": 8.9, + "field9": 9.0, + }, + time.Now(), + ) + + plugin := &Unpivot{} + require.NoError(b, plugin.Init()) + + for n := 0; n < b.N; n++ { + plugin.Apply(input) + } +} + +func BenchmarkAsMetric(b *testing.B) { + input := metric.New( + "test", + map[string]string{ + "source": "device A", + "location": "main building", + }, + map[string]interface{}{ + "field0": 0.1, + "field1": 1.2, + "field2": 2.3, + "field3": 3.4, + "field4": 4.5, + "field5": 5.6, + "field6": 6.7, + "field7": 7.8, + "field8": 8.9, + "field9": 9.0, + }, + time.Now(), + ) + + plugin := &Unpivot{FieldNameAs: "metric"} + require.NoError(b, plugin.Init()) + + for n := 0; n < b.N; n++ { + plugin.Apply(input) + } +}