From 442728b03e2f8baf9f73bad61566292d68a6872b Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Tue, 17 May 2022 22:08:03 +0200 Subject: [PATCH] feat(parsers/logfmt): Add tag support (#11060) --- plugins/parsers/logfmt/README.md | 5 +- plugins/parsers/logfmt/parser.go | 28 +++++++-- plugins/parsers/logfmt/parser_test.go | 85 ++++++++++++++++++++++++--- plugins/parsers/registry.go | 11 +++- 4 files changed, 113 insertions(+), 16 deletions(-) diff --git a/plugins/parsers/logfmt/README.md b/plugins/parsers/logfmt/README.md index 4b19aa347..f4c8232e5 100644 --- a/plugins/parsers/logfmt/README.md +++ b/plugins/parsers/logfmt/README.md @@ -15,6 +15,9 @@ The `logfmt` data format parses data in [logfmt] format. ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "logfmt" + + ## Array of key names which should be collected as tags. Globs accepted. + logfmt_tag_keys = ["method","host"] ``` ## Metrics @@ -26,5 +29,5 @@ of the field is automatically determined based on the contents of the value. ```text - method=GET host=example.org ts=2018-07-24T19:43:40.275Z connect=4ms service=8ms status=200 bytes=1653 -+ logfmt method="GET",host="example.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i ++ logfmt,host=example.org,method=GET ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i ``` diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 01da916a2..f612c8e2e 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -8,6 +8,7 @@ import ( "github.com/go-logfmt/logfmt" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/metric" ) @@ -17,17 +18,22 @@ var ( // Parser decodes logfmt formatted messages into metrics. type Parser struct { + TagKeys []string `toml:"logfmt_tag_keys"` + MetricName string DefaultTags map[string]string Now func() time.Time + + tagFilter filter.Filter } // NewParser creates a parser. -func NewParser(metricName string, defaultTags map[string]string) *Parser { +func NewParser(metricName string, defaultTags map[string]string, tagKeys []string) *Parser { return &Parser{ MetricName: metricName, DefaultTags: defaultTags, Now: time.Now, + TagKeys: tagKeys, } } @@ -46,6 +52,7 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { break } fields := make(map[string]interface{}) + tags := make(map[string]string) for decoder.ScanKeyval() { if string(decoder.Value()) == "" { continue @@ -53,7 +60,9 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { //type conversions value := string(decoder.Value()) - if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { + if p.tagFilter != nil && p.tagFilter.Match(string(decoder.Key())) { + tags[string(decoder.Key())] = value + } else if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { fields[string(decoder.Key())] = iValue } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { fields[string(decoder.Key())] = fValue @@ -63,11 +72,11 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { fields[string(decoder.Key())] = value } } - if len(fields) == 0 { + if len(fields) == 0 && len(tags) == 0 { continue } - m := metric.New(p.MetricName, map[string]string{}, fields, p.Now()) + m := metric.New(p.MetricName, tags, fields, p.Now()) metrics = append(metrics, m) } @@ -106,3 +115,14 @@ func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) { } } } + +func (p *Parser) Init() error { + var err error + + // Compile tag key patterns + if p.tagFilter, err = filter.Compile(p.TagKeys); err != nil { + return fmt.Errorf("error compiling tag pattern: %w", err) + } + + return nil +} diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index f2a717489..a2ee8178f 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -5,17 +5,10 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" ) -func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric { - t.Helper() - v := metric.New(m.Measurement, m.Tags, m.Fields, m.Time) - - return v -} - func TestParse(t *testing.T) { tests := []struct { name string @@ -222,3 +215,79 @@ func TestParseLine(t *testing.T) { }) } } + +func TestTags(t *testing.T) { + tests := []struct { + name string + measurement string + tagKeys []string + s string + want telegraf.Metric + wantErr bool + }{ + { + name: "logfmt parser returns tags and fields", + measurement: "testlog", + tagKeys: []string{"lvl"}, + s: "ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST", + want: testutil.MustMetric( + "testlog", + map[string]string{ + "lvl": "info", + }, + map[string]interface{}{ + "msg": "http request", + "method": "POST", + "ts": "2018-07-24T19:43:40.275Z", + }, + time.Unix(0, 0), + ), + }, + { + name: "logfmt parser returns no empty metrics", + measurement: "testlog", + tagKeys: []string{"lvl"}, + s: "lvl=info", + want: testutil.MustMetric( + "testlog", + map[string]string{ + "lvl": "info", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + { + name: "logfmt parser returns all keys as tag", + measurement: "testlog", + tagKeys: []string{"*"}, + s: "ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST", + want: testutil.MustMetric( + "testlog", + map[string]string{ + "lvl": "info", + "msg": "http request", + "method": "POST", + "ts": "2018-07-24T19:43:40.275Z", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := NewParser(tt.measurement, map[string]string{}, tt.tagKeys) + assert.NoError(t, l.Init()) + + got, err := l.ParseLine(tt.s) + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + testutil.RequireMetricEqual(t, tt.want, got, testutil.IgnoreTime()) + }) + } +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index b56b6d094..e880560b3 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -195,6 +195,9 @@ type Config struct { // Influx configuration InfluxParserType string `toml:"influx_parser_type"` + + // LogFmt configuration + LogFmtTagKeys []string `toml:"logfmt_tag_keys"` } type XPathConfig xpath.Config @@ -262,7 +265,7 @@ func NewParser(config *Config) (Parser, error) { config.GrokTimezone, config.GrokUniqueTimestamp) case "logfmt": - parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags) + parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags, config.LogFmtTagKeys) case "form_urlencoded": parser, err = NewFormUrlencodedParser( config.MetricName, @@ -389,8 +392,10 @@ func NewDropwizardParser( } // NewLogFmtParser returns a logfmt parser with the default options. -func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) { - return logfmt.NewParser(metricName, defaultTags), nil +func NewLogFmtParser(metricName string, defaultTags map[string]string, tagKeys []string) (Parser, error) { + parser := logfmt.NewParser(metricName, defaultTags, tagKeys) + err := parser.Init() + return parser, err } func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {