feat(parsers/logfmt): Add tag support (#11060)
This commit is contained in:
parent
1b10e15424
commit
442728b03e
|
|
@ -15,6 +15,9 @@ The `logfmt` data format parses data in [logfmt] format.
|
||||||
## more about them here:
|
## more about them here:
|
||||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
data_format = "logfmt"
|
data_format = "logfmt"
|
||||||
|
|
||||||
|
## Array of key names which should be collected as tags. Globs accepted.
|
||||||
|
logfmt_tag_keys = ["method","host"]
|
||||||
```
|
```
|
||||||
|
|
||||||
## Metrics
|
## Metrics
|
||||||
|
|
@ -26,5 +29,5 @@ of the field is automatically determined based on the contents of the value.
|
||||||
|
|
||||||
```text
|
```text
|
||||||
- method=GET host=example.org ts=2018-07-24T19:43:40.275Z connect=4ms service=8ms status=200 bytes=1653
|
- method=GET host=example.org ts=2018-07-24T19:43:40.275Z connect=4ms service=8ms status=200 bytes=1653
|
||||||
+ logfmt method="GET",host="example.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i
|
+ logfmt,host=example.org,method=GET ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i
|
||||||
```
|
```
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/go-logfmt/logfmt"
|
"github.com/go-logfmt/logfmt"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/filter"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -17,17 +18,22 @@ var (
|
||||||
|
|
||||||
// Parser decodes logfmt formatted messages into metrics.
|
// Parser decodes logfmt formatted messages into metrics.
|
||||||
type Parser struct {
|
type Parser struct {
|
||||||
|
TagKeys []string `toml:"logfmt_tag_keys"`
|
||||||
|
|
||||||
MetricName string
|
MetricName string
|
||||||
DefaultTags map[string]string
|
DefaultTags map[string]string
|
||||||
Now func() time.Time
|
Now func() time.Time
|
||||||
|
|
||||||
|
tagFilter filter.Filter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewParser creates a parser.
|
// NewParser creates a parser.
|
||||||
func NewParser(metricName string, defaultTags map[string]string) *Parser {
|
func NewParser(metricName string, defaultTags map[string]string, tagKeys []string) *Parser {
|
||||||
return &Parser{
|
return &Parser{
|
||||||
MetricName: metricName,
|
MetricName: metricName,
|
||||||
DefaultTags: defaultTags,
|
DefaultTags: defaultTags,
|
||||||
Now: time.Now,
|
Now: time.Now,
|
||||||
|
TagKeys: tagKeys,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -46,6 +52,7 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
for decoder.ScanKeyval() {
|
for decoder.ScanKeyval() {
|
||||||
if string(decoder.Value()) == "" {
|
if string(decoder.Value()) == "" {
|
||||||
continue
|
continue
|
||||||
|
|
@ -53,7 +60,9 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
|
||||||
|
|
||||||
//type conversions
|
//type conversions
|
||||||
value := string(decoder.Value())
|
value := string(decoder.Value())
|
||||||
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
|
if p.tagFilter != nil && p.tagFilter.Match(string(decoder.Key())) {
|
||||||
|
tags[string(decoder.Key())] = value
|
||||||
|
} else if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
|
||||||
fields[string(decoder.Key())] = iValue
|
fields[string(decoder.Key())] = iValue
|
||||||
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
|
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
|
||||||
fields[string(decoder.Key())] = fValue
|
fields[string(decoder.Key())] = fValue
|
||||||
|
|
@ -63,11 +72,11 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
|
||||||
fields[string(decoder.Key())] = value
|
fields[string(decoder.Key())] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(fields) == 0 {
|
if len(fields) == 0 && len(tags) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
m := metric.New(p.MetricName, map[string]string{}, fields, p.Now())
|
m := metric.New(p.MetricName, tags, fields, p.Now())
|
||||||
|
|
||||||
metrics = append(metrics, m)
|
metrics = append(metrics, m)
|
||||||
}
|
}
|
||||||
|
|
@ -106,3 +115,14 @@ func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Parser) Init() error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Compile tag key patterns
|
||||||
|
if p.tagFilter, err = filter.Compile(p.TagKeys); err != nil {
|
||||||
|
return fmt.Errorf("error compiling tag pattern: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,17 +5,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/metric"
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric {
|
|
||||||
t.Helper()
|
|
||||||
v := metric.New(m.Measurement, m.Tags, m.Fields, m.Time)
|
|
||||||
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestParse(t *testing.T) {
|
func TestParse(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
|
@ -222,3 +215,79 @@ func TestParseLine(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTags(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
measurement string
|
||||||
|
tagKeys []string
|
||||||
|
s string
|
||||||
|
want telegraf.Metric
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "logfmt parser returns tags and fields",
|
||||||
|
measurement: "testlog",
|
||||||
|
tagKeys: []string{"lvl"},
|
||||||
|
s: "ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST",
|
||||||
|
want: testutil.MustMetric(
|
||||||
|
"testlog",
|
||||||
|
map[string]string{
|
||||||
|
"lvl": "info",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"msg": "http request",
|
||||||
|
"method": "POST",
|
||||||
|
"ts": "2018-07-24T19:43:40.275Z",
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "logfmt parser returns no empty metrics",
|
||||||
|
measurement: "testlog",
|
||||||
|
tagKeys: []string{"lvl"},
|
||||||
|
s: "lvl=info",
|
||||||
|
want: testutil.MustMetric(
|
||||||
|
"testlog",
|
||||||
|
map[string]string{
|
||||||
|
"lvl": "info",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "logfmt parser returns all keys as tag",
|
||||||
|
measurement: "testlog",
|
||||||
|
tagKeys: []string{"*"},
|
||||||
|
s: "ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST",
|
||||||
|
want: testutil.MustMetric(
|
||||||
|
"testlog",
|
||||||
|
map[string]string{
|
||||||
|
"lvl": "info",
|
||||||
|
"msg": "http request",
|
||||||
|
"method": "POST",
|
||||||
|
"ts": "2018-07-24T19:43:40.275Z",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
l := NewParser(tt.measurement, map[string]string{}, tt.tagKeys)
|
||||||
|
assert.NoError(t, l.Init())
|
||||||
|
|
||||||
|
got, err := l.ParseLine(tt.s)
|
||||||
|
|
||||||
|
if tt.wantErr {
|
||||||
|
assert.Error(t, err)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
testutil.RequireMetricEqual(t, tt.want, got, testutil.IgnoreTime())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -195,6 +195,9 @@ type Config struct {
|
||||||
|
|
||||||
// Influx configuration
|
// Influx configuration
|
||||||
InfluxParserType string `toml:"influx_parser_type"`
|
InfluxParserType string `toml:"influx_parser_type"`
|
||||||
|
|
||||||
|
// LogFmt configuration
|
||||||
|
LogFmtTagKeys []string `toml:"logfmt_tag_keys"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type XPathConfig xpath.Config
|
type XPathConfig xpath.Config
|
||||||
|
|
@ -262,7 +265,7 @@ func NewParser(config *Config) (Parser, error) {
|
||||||
config.GrokTimezone,
|
config.GrokTimezone,
|
||||||
config.GrokUniqueTimestamp)
|
config.GrokUniqueTimestamp)
|
||||||
case "logfmt":
|
case "logfmt":
|
||||||
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
|
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags, config.LogFmtTagKeys)
|
||||||
case "form_urlencoded":
|
case "form_urlencoded":
|
||||||
parser, err = NewFormUrlencodedParser(
|
parser, err = NewFormUrlencodedParser(
|
||||||
config.MetricName,
|
config.MetricName,
|
||||||
|
|
@ -389,8 +392,10 @@ func NewDropwizardParser(
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLogFmtParser returns a logfmt parser with the default options.
|
// NewLogFmtParser returns a logfmt parser with the default options.
|
||||||
func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) {
|
func NewLogFmtParser(metricName string, defaultTags map[string]string, tagKeys []string) (Parser, error) {
|
||||||
return logfmt.NewParser(metricName, defaultTags), nil
|
parser := logfmt.NewParser(metricName, defaultTags, tagKeys)
|
||||||
|
err := parser.Init()
|
||||||
|
return parser, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
|
func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue