feat(processors.parser): Add merge with timestamp option (#13147)

This commit is contained in:
Joshua Powers 2023-05-03 01:50:32 -06:00 committed by GitHub
parent e9f55a848f
commit 6dd3bcb41a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 9 deletions

View File

@ -26,10 +26,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## If true, incoming metrics are not emitted. ## If true, incoming metrics are not emitted.
# drop_original = false # drop_original = false
## If set to override, emitted metrics will be merged by overriding the ## Merge Behavior
## original metric using the newly parsed metrics. ## Only has effect when drop_original is set to false. Possible options
## Only has effect when drop_original is set to false. ## include:
merge = "override" ## * override: emitted metrics are merged by overriding the original metric
## using the newly parsed metrics, but retains the original metric
## timestamp.
## * override-with-timestamp: the same as "override", but the timestamp is
## set based on the new metrics if present.
# merge = ""
## The dataformat to be read from files ## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read

View File

@ -3,6 +3,7 @@ package parser
import ( import (
_ "embed" _ "embed"
"fmt"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/processors"
@ -20,6 +21,16 @@ type Parser struct {
parser telegraf.Parser parser telegraf.Parser
} }
func (p *Parser) Init() error {
switch p.Merge {
case "", "override", "override-with-timestamp":
default:
return fmt.Errorf("unrecognized merge value: %s", p.Merge)
}
return nil
}
func (*Parser) SampleConfig() string { func (*Parser) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -100,6 +111,8 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
if p.Merge == "override" { if p.Merge == "override" {
results = append(results, merge(newMetrics[0], newMetrics[1:])) results = append(results, merge(newMetrics[0], newMetrics[1:]))
} else if p.Merge == "override-with-timestamp" {
results = append(results, mergeWithTimestamp(newMetrics[0], newMetrics[1:]))
} else { } else {
results = append(results, newMetrics...) results = append(results, newMetrics...)
} }
@ -120,6 +133,22 @@ func merge(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric {
return base return base
} }
func mergeWithTimestamp(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric {
for _, metric := range metrics {
for _, field := range metric.FieldList() {
base.AddField(field.Key, field.Value)
}
for _, tag := range metric.TagList() {
base.AddTag(tag.Key, tag.Value)
}
base.SetName(metric.Name())
if !metric.Time().IsZero() {
base.SetTime(metric.Time())
}
}
return base
}
func (p *Parser) parseValue(value string) ([]telegraf.Metric, error) { func (p *Parser) parseValue(value string) ([]telegraf.Metric, error) {
return p.parser.Parse([]byte(value)) return p.parser.Parse([]byte(value))
} }

View File

@ -603,6 +603,31 @@ func TestApply(t *testing.T) {
time.Unix(0, 0)), time.Unix(0, 0)),
}, },
}, },
{
name: "override with timestamp",
parseFields: []string{"value"},
merge: "override-with-timestamp",
parser: &json.Parser{
TimeKey: "timestamp",
TimeFormat: "2006-01-02 15:04:05",
},
input: metric.New(
"myname",
map[string]string{},
map[string]interface{}{
"value": `{"timestamp": "2020-06-27 19:43:40", "value": 42.1}`,
},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"myname",
map[string]string{},
map[string]interface{}{
"value": float64(42.1),
},
time.Unix(1593287020, 0)),
},
},
} }
for _, tt := range tests { for _, tt := range tests {
@ -621,11 +646,22 @@ func TestApply(t *testing.T) {
output := plugin.Apply(tt.input) output := plugin.Apply(tt.input)
t.Logf("Testing: %s", tt.name) t.Logf("Testing: %s", tt.name)
// check timestamp when using with-timestamp merge type
if tt.merge == "override-with-timestamp" {
testutil.RequireMetricsEqual(t, tt.expected, output)
} else {
testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime()) testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime())
}
}) })
} }
} }
func TestInvalidMerge(t *testing.T) {
plugin := Parser{Merge: "fake"}
require.Error(t, plugin.Init())
}
func TestBadApply(t *testing.T) { func TestBadApply(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -9,10 +9,15 @@
## If true, incoming metrics are not emitted. ## If true, incoming metrics are not emitted.
# drop_original = false # drop_original = false
## If set to override, emitted metrics will be merged by overriding the ## Merge Behavior
## original metric using the newly parsed metrics. ## Only has effect when drop_original is set to false. Possible options
## Only has effect when drop_original is set to false. ## include:
merge = "override" ## * override: emitted metrics are merged by overriding the original metric
## using the newly parsed metrics, but retains the original metric
## timestamp.
## * override-with-timestamp: the same as "override", but the timestamp is
## set based on the new metrics if present.
# merge = ""
## The dataformat to be read from files ## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read