diff --git a/plugins/processors/parser/README.md b/plugins/processors/parser/README.md index fbd242004..529315075 100644 --- a/plugins/processors/parser/README.md +++ b/plugins/processors/parser/README.md @@ -26,10 +26,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## If true, incoming metrics are not emitted. # drop_original = false - ## If set to override, emitted metrics will be merged by overriding the - ## original metric using the newly parsed metrics. - ## Only has effect when drop_original is set to false. - merge = "override" + ## Merge Behavior + ## Only has effect when drop_original is set to false. Possible options + ## include: + ## * override: emitted metrics are merged by overriding the original metric + ## using the newly parsed metrics, but retains the original metric + ## timestamp. + ## * override-with-timestamp: the same as "override", but the timestamp is + ## set based on the new metrics if present. + # merge = "" ## The dataformat to be read from files ## Each data format has its own unique set of configuration options, read diff --git a/plugins/processors/parser/parser.go b/plugins/processors/parser/parser.go index 95154c24a..182a954fc 100644 --- a/plugins/processors/parser/parser.go +++ b/plugins/processors/parser/parser.go @@ -3,6 +3,7 @@ package parser import ( _ "embed" + "fmt" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/processors" @@ -20,6 +21,16 @@ type Parser struct { parser telegraf.Parser } +func (p *Parser) Init() error { + switch p.Merge { + case "", "override", "override-with-timestamp": + default: + return fmt.Errorf("unrecognized merge value: %s", p.Merge) + } + + return nil +} + func (*Parser) SampleConfig() string { return sampleConfig } @@ -100,6 +111,8 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric { if p.Merge == "override" { results = append(results, merge(newMetrics[0], newMetrics[1:])) + } else if p.Merge == "override-with-timestamp" { + results = append(results, mergeWithTimestamp(newMetrics[0], newMetrics[1:])) } else { results = append(results, newMetrics...) } @@ -120,6 +133,22 @@ func merge(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric { return base } +func mergeWithTimestamp(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric { + for _, metric := range metrics { + for _, field := range metric.FieldList() { + base.AddField(field.Key, field.Value) + } + for _, tag := range metric.TagList() { + base.AddTag(tag.Key, tag.Value) + } + base.SetName(metric.Name()) + if !metric.Time().IsZero() { + base.SetTime(metric.Time()) + } + } + return base +} + func (p *Parser) parseValue(value string) ([]telegraf.Metric, error) { return p.parser.Parse([]byte(value)) } diff --git a/plugins/processors/parser/parser_test.go b/plugins/processors/parser/parser_test.go index 6abf71a6a..ec135dcc8 100644 --- a/plugins/processors/parser/parser_test.go +++ b/plugins/processors/parser/parser_test.go @@ -603,6 +603,31 @@ func TestApply(t *testing.T) { time.Unix(0, 0)), }, }, + { + name: "override with timestamp", + parseFields: []string{"value"}, + merge: "override-with-timestamp", + parser: &json.Parser{ + TimeKey: "timestamp", + TimeFormat: "2006-01-02 15:04:05", + }, + input: metric.New( + "myname", + map[string]string{}, + map[string]interface{}{ + "value": `{"timestamp": "2020-06-27 19:43:40", "value": 42.1}`, + }, + time.Unix(0, 0)), + expected: []telegraf.Metric{ + metric.New( + "myname", + map[string]string{}, + map[string]interface{}{ + "value": float64(42.1), + }, + time.Unix(1593287020, 0)), + }, + }, } for _, tt := range tests { @@ -621,11 +646,22 @@ func TestApply(t *testing.T) { output := plugin.Apply(tt.input) t.Logf("Testing: %s", tt.name) - testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime()) + + // check timestamp when using with-timestamp merge type + if tt.merge == "override-with-timestamp" { + testutil.RequireMetricsEqual(t, tt.expected, output) + } else { + testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime()) + } }) } } +func TestInvalidMerge(t *testing.T) { + plugin := Parser{Merge: "fake"} + require.Error(t, plugin.Init()) +} + func TestBadApply(t *testing.T) { tests := []struct { name string diff --git a/plugins/processors/parser/sample.conf b/plugins/processors/parser/sample.conf index 4fa9a5572..5275f282a 100644 --- a/plugins/processors/parser/sample.conf +++ b/plugins/processors/parser/sample.conf @@ -9,10 +9,15 @@ ## If true, incoming metrics are not emitted. # drop_original = false - ## If set to override, emitted metrics will be merged by overriding the - ## original metric using the newly parsed metrics. - ## Only has effect when drop_original is set to false. - merge = "override" + ## Merge Behavior + ## Only has effect when drop_original is set to false. Possible options + ## include: + ## * override: emitted metrics are merged by overriding the original metric + ## using the newly parsed metrics, but retains the original metric + ## timestamp. + ## * override-with-timestamp: the same as "override", but the timestamp is + ## set based on the new metrics if present. + # merge = "" ## The dataformat to be read from files ## Each data format has its own unique set of configuration options, read