feat(processors.parser): Add option to parse tags (#11228)

This commit is contained in:
Thomas Casteleyn 2022-09-12 17:37:03 +02:00 committed by GitHub
parent b59066ba44
commit d976158fa5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 22 deletions

View File

@ -1,21 +1,25 @@
# Parser Processor Plugin
This plugin parses defined fields containing the specified data format and
creates new metrics based on the contents of the field.
This plugin parses defined fields or tags containing the specified data format
and creates new metrics based on the contents of the field or tag.
## Configuration
```toml @sample.conf
# Parse a value in a specified field/tag(s) and add the result in a new metric
# Parse a value in a specified field(s)/tag(s) and add the result in a new metric
[[processors.parser]]
## The name of the fields whose value will be parsed.
parse_fields = ["message"]
## The name of the tags whose value will be parsed.
# parse_tags = []
## If true, incoming metrics are not emitted.
drop_original = false
# drop_original = false
## If set to override, emitted metrics will be merged by overriding the
## original metric using the newly parsed metrics.
## Only has effect when drop_original is set to false.
merge = "override"
## The dataformat to be read from files

View File

@ -20,6 +20,7 @@ type Parser struct {
DropOriginal bool `toml:"drop_original"`
Merge string `toml:"merge"`
ParseFields []string `toml:"parse_fields"`
ParseTags []string `toml:"parse_tags"`
Log telegraf.Logger `toml:"-"`
parser telegraf.Parser
}
@ -47,12 +48,13 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
newMetrics = append(newMetrics, metric)
}
// parse fields
for _, key := range p.ParseFields {
for _, field := range metric.FieldList() {
if field.Key == key {
switch value := field.Value.(type) {
case string:
fromFieldMetric, err := p.parseField(value)
fromFieldMetric, err := p.parseValue(value)
if err != nil {
p.Log.Errorf("could not parse field %s: %v", key, err)
}
@ -74,6 +76,24 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
}
}
// parse tags
for _, key := range p.ParseTags {
if value, ok := metric.GetTag(key); ok {
fromTagMetric, err := p.parseValue(value)
if err != nil {
p.Log.Errorf("could not parse tag %s: %v", key, err)
}
for _, m := range fromTagMetric {
if m.Name() == "" {
m.SetName(metric.Name())
}
}
newMetrics = append(newMetrics, fromTagMetric...)
}
}
if len(newMetrics) == 0 {
continue
}
@ -100,7 +120,7 @@ func merge(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric {
return base
}
func (p *Parser) parseField(value string) ([]telegraf.Metric, error) {
func (p *Parser) parseValue(value string) ([]telegraf.Metric, error) {
return p.parser.Parse([]byte(value))
}

View File

@ -12,24 +12,13 @@ import (
_ "github.com/influxdata/telegraf/plugins/parsers/all"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
// compares metrics without comparing time
func compareMetrics(t *testing.T, expected, actual []telegraf.Metric) {
require.Equal(t, len(expected), len(actual))
for i, m := range actual {
require.Equal(t, expected[i].Name(), m.Name())
require.Equal(t, expected[i].Fields(), m.Fields())
require.Equal(t, expected[i].Tags(), m.Tags())
}
}
func TestApply(t *testing.T) {
tests := []struct {
name string
parseFields []string
parseTags []string
config parsers.Config
dropOriginal bool
merge string
@ -361,6 +350,93 @@ func TestApply(t *testing.T) {
time.Unix(0, 0)),
},
},
{
name: "parse one tag drop original",
parseTags: []string{"sample"},
dropOriginal: true,
config: parsers.Config{
DataFormat: "logfmt",
},
input: metric.New(
"singleTag",
map[string]string{
"some": "tag",
"sample": `ts=2018-07-24T19:43:40.275Z`,
},
map[string]interface{}{},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"singleTag",
map[string]string{},
map[string]interface{}{
"ts": "2018-07-24T19:43:40.275Z",
},
time.Unix(0, 0)),
},
},
{
name: "parse one tag with merge",
parseTags: []string{"sample"},
dropOriginal: false,
merge: "override",
config: parsers.Config{
DataFormat: "logfmt",
},
input: metric.New(
"singleTag",
map[string]string{
"some": "tag",
"sample": `ts=2018-07-24T19:43:40.275Z`,
},
map[string]interface{}{},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"singleTag",
map[string]string{
"some": "tag",
"sample": `ts=2018-07-24T19:43:40.275Z`,
},
map[string]interface{}{
"ts": "2018-07-24T19:43:40.275Z",
},
time.Unix(0, 0)),
},
},
{
name: "parse one tag keep",
parseTags: []string{"sample"},
dropOriginal: false,
config: parsers.Config{
DataFormat: "logfmt",
},
input: metric.New(
"singleTag",
map[string]string{
"some": "tag",
"sample": `ts=2018-07-24T19:43:40.275Z`,
},
map[string]interface{}{},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"singleTag",
map[string]string{
"some": "tag",
"sample": `ts=2018-07-24T19:43:40.275Z`,
},
map[string]interface{}{},
time.Unix(0, 0)),
metric.New(
"singleTag",
map[string]string{},
map[string]interface{}{
"ts": "2018-07-24T19:43:40.275Z",
},
time.Unix(0, 0)),
},
},
{
name: "Fail to parse one field but parses other [keep]",
parseFields: []string{"good", "bad"},
@ -506,6 +582,7 @@ func TestApply(t *testing.T) {
parser := Parser{
Config: tt.config,
ParseFields: tt.parseFields,
ParseTags: tt.parseTags,
DropOriginal: tt.dropOriginal,
Merge: tt.merge,
Log: testutil.Logger{Name: "processor.parser"},
@ -513,7 +590,7 @@ func TestApply(t *testing.T) {
output := parser.Apply(tt.input)
t.Logf("Testing: %s", tt.name)
compareMetrics(t, tt.expected, output)
testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime())
})
}
}
@ -584,7 +661,7 @@ func TestBadApply(t *testing.T) {
output := parser.Apply(tt.input)
compareMetrics(t, output, tt.expected)
testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime())
})
}
}

View File

@ -1,13 +1,17 @@
# Parse a value in a specified field/tag(s) and add the result in a new metric
# Parse a value in a specified field(s)/tag(s) and add the result in a new metric
[[processors.parser]]
## The name of the fields whose value will be parsed.
parse_fields = ["message"]
## The name of the tags whose value will be parsed.
# parse_tags = []
## If true, incoming metrics are not emitted.
drop_original = false
# drop_original = false
## If set to override, emitted metrics will be merged by overriding the
## original metric using the newly parsed metrics.
## Only has effect when drop_original is set to false.
merge = "override"
## The dataformat to be read from files