From f8766bc1b1412909379a6bbb655252eb8e8f080f Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 30 Jun 2022 00:11:27 +0200 Subject: [PATCH] feat: Migrate dropwizard parser to new style (#11371) --- config/config.go | 2 - plugins/parsers/all/all.go | 1 + plugins/parsers/dropwizard/parser.go | 137 ++++++++++------------ plugins/parsers/dropwizard/parser_test.go | 134 +++++++++++---------- plugins/parsers/registry.go | 36 ------ plugins/parsers/registry_test.go | 2 +- 6 files changed, 131 insertions(+), 181 deletions(-) diff --git a/config/config.go b/config/config.go index 6eb60e1d4..adab8ce3c 100644 --- a/config/config.go +++ b/config/config.go @@ -1426,8 +1426,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { // Parser options to ignore case "data_type", "separator", "tag_keys", // "templates", // shared with serializers - "dropwizard_metric_registry_path", "dropwizard_tags_path", "dropwizard_tag_paths", - "dropwizard_time_format", "dropwizard_time_path", "grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns", "grok_timezone", "grok_unique_timestamp", "influx_parser_type", diff --git a/plugins/parsers/all/all.go b/plugins/parsers/all/all.go index cd9cb6c6f..f9eec51ea 100644 --- a/plugins/parsers/all/all.go +++ b/plugins/parsers/all/all.go @@ -4,6 +4,7 @@ import ( //Blank imports for plugins to register themselves _ "github.com/influxdata/telegraf/plugins/parsers/collectd" _ "github.com/influxdata/telegraf/plugins/parsers/csv" + _ "github.com/influxdata/telegraf/plugins/parsers/dropwizard" _ "github.com/influxdata/telegraf/plugins/parsers/form_urlencoded" _ "github.com/influxdata/telegraf/plugins/parsers/graphite" _ "github.com/influxdata/telegraf/plugins/parsers/json" diff --git a/plugins/parsers/dropwizard/parser.go b/plugins/parsers/dropwizard/parser.go index 250f08a29..d4f50f33f 100644 --- a/plugins/parsers/dropwizard/parser.go +++ b/plugins/parsers/dropwizard/parser.go @@ -10,62 +10,32 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/templating" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/influx" ) -type TimeFunc func() time.Time - // Parser parses json inputs containing dropwizard metrics, // either top-level or embedded inside a json field. // This parser is using gjson for retrieving paths within the json file. -type parser struct { - // an optional json path containing the metric registry object - // if left empty, the whole json object is parsed as a metric registry - MetricRegistryPath string +type Parser struct { + MetricRegistryPath string `toml:"dropwizard_metric_registry_path"` + TimePath string `toml:"dropwizard_time_path"` + TimeFormat string `toml:"dropwizard_time_format"` + TagsPath string `toml:"dropwizard_tags_path"` + TagPathsMap map[string]string `toml:"dropwizard_tag_paths_map"` + Separator string `toml:"separator"` + Templates []string `toml:"templates"` + DefaultTags map[string]string `toml:"-"` + Log telegraf.Logger `toml:"-"` - // an optional json path containing the default time of the metrics - // if left empty, or if cannot be parsed the current processing time is used as the time of the metrics - TimePath string - - // time format to use for parsing the time field - // defaults to time.RFC3339 - TimeFormat string - - // an optional json path pointing to a json object with tag key/value pairs - // takes precedence over TagPathsMap - TagsPath string - - // an optional map containing tag names as keys and json paths to retrieve the tag values from as values - // used if TagsPath is empty or doesn't return any tags - TagPathsMap map[string]string - - // an optional map of default tags to use for metrics - DefaultTags map[string]string - - Log telegraf.Logger `toml:"-"` - - separator string templateEngine *templating.Engine - timeFunc TimeFunc - // seriesParser parses line protocol measurement + tags seriesParser *influx.Parser } -func NewParser() *parser { - handler := influx.NewMetricHandler() - seriesParser := influx.NewSeriesParser(handler) - - parser := &parser{ - timeFunc: time.Now, - seriesParser: seriesParser, - } - return parser -} - // Parse parses the input bytes to an array of metrics -func (p *parser) Parse(buf []byte) ([]telegraf.Metric, error) { +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { metrics := make([]telegraf.Metric, 0) metricTime, err := p.parseTime(buf) @@ -111,38 +81,17 @@ func (p *parser) Parse(buf []byte) ([]telegraf.Metric, error) { return metrics, nil } -func (p *parser) SetTemplates(separator string, templates []string) error { - if len(templates) == 0 { - p.templateEngine = nil - return nil - } - - defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*") - if err != nil { - return err - } - - templateEngine, err := templating.NewEngine(separator, defaultTemplate, templates) - if err != nil { - return err - } - - p.separator = separator - p.templateEngine = templateEngine - return nil -} - // ParseLine is not supported by the dropwizard format -func (p *parser) ParseLine(line string) (telegraf.Metric, error) { +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { return nil, fmt.Errorf("ParseLine not supported: %s, for data format: dropwizard", line) } // SetDefaultTags sets the default tags -func (p *parser) SetDefaultTags(tags map[string]string) { +func (p *Parser) SetDefaultTags(tags map[string]string) { p.DefaultTags = tags } -func (p *parser) readTags(buf []byte) map[string]string { +func (p *Parser) readTags(buf []byte) map[string]string { if p.TagsPath != "" { var tagsBytes []byte tagsResult := gjson.GetBytes(buf, p.TagsPath) @@ -167,7 +116,7 @@ func (p *parser) readTags(buf []byte) map[string]string { return tags } -func (p *parser) parseTime(buf []byte) (time.Time, error) { +func (p *Parser) parseTime(buf []byte) (time.Time, error) { if p.TimePath != "" { timeFormat := p.TimeFormat if timeFormat == "" { @@ -175,20 +124,18 @@ func (p *parser) parseTime(buf []byte) (time.Time, error) { } timeString := gjson.GetBytes(buf, p.TimePath).String() if timeString == "" { - err := fmt.Errorf("time not found in JSON path %s", p.TimePath) - return p.timeFunc(), err + return time.Time{}, fmt.Errorf("time not found in JSON path %s", p.TimePath) } t, err := time.Parse(timeFormat, timeString) if err != nil { - err = fmt.Errorf("time %s cannot be parsed with format %s, %s", timeString, timeFormat, err) - return p.timeFunc(), err + return time.Time{}, fmt.Errorf("time %s cannot be parsed with format %s, %s", timeString, timeFormat, err) } return t.UTC(), nil } - return p.timeFunc(), nil + return time.Now(), nil } -func (p *parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) { +func (p *Parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) { var registryBytes []byte if p.MetricRegistryPath != "" { regResult := gjson.GetBytes(buf, p.MetricRegistryPath) @@ -213,7 +160,7 @@ func (p *parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) { return jsonOut, nil } -func (p *parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric { +func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric { if dwmsTyped, ok := dwms.(map[string]interface{}); ok { for dwmName, dwmFields := range dwmsTyped { measurementName := dwmName @@ -222,7 +169,7 @@ func (p *parser) readDWMetrics(metricType string, dwms interface{}, metrics []te if p.templateEngine != nil { measurementName, tags, fieldPrefix, _ = p.templateEngine.Apply(dwmName) if len(fieldPrefix) > 0 { - fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.separator) + fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.Separator) } } @@ -258,6 +205,42 @@ func (p *parser) readDWMetrics(metricType string, dwms interface{}, metrics []te return metrics } -func (p *parser) SetTimeFunc(f TimeFunc) { - p.timeFunc = f +func (p *Parser) Init() error { + handler := influx.NewMetricHandler() + p.seriesParser = influx.NewSeriesParser(handler) + + if len(p.Templates) != 0 { + defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*") + if err != nil { + return err + } + + templateEngine, err := templating.NewEngine(p.Separator, defaultTemplate, p.Templates) + if err != nil { + return err + } + p.templateEngine = templateEngine + } + + return nil +} + +func init() { + parsers.Add("dropwizard", + func(defaultMetricName string) telegraf.Parser { + return &Parser{} + }) +} + +func (p *Parser) InitFromConfig(config *parsers.Config) error { + p.MetricRegistryPath = config.DropwizardMetricRegistryPath + p.TimePath = config.DropwizardTimePath + p.TimeFormat = config.DropwizardTimeFormat + p.TagsPath = config.DropwizardTagsPath + p.TagPathsMap = config.DropwizardTagPathsMap + p.Separator = config.Separator + p.Templates = append(p.Templates, config.Templates...) + p.DefaultTags = config.DefaultTags + + return nil } diff --git a/plugins/parsers/dropwizard/parser_test.go b/plugins/parsers/dropwizard/parser_test.go index dfd05f4b7..0c3abd1ee 100644 --- a/plugins/parsers/dropwizard/parser_test.go +++ b/plugins/parsers/dropwizard/parser_test.go @@ -2,20 +2,17 @@ package dropwizard import ( "fmt" - "github.com/influxdata/telegraf/testutil" "testing" "time" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" ) -var testTimeFunc = func() time.Time { - return time.Unix(0, 0) -} - // validEmptyJSON is a valid dropwizard json document, but without any metrics const validEmptyJSON = ` { @@ -29,7 +26,8 @@ const validEmptyJSON = ` ` func TestParseValidEmptyJSON(t *testing.T) { - parser := NewParser() + parser := &Parser{} + require.NoError(t, parser.Init()) // Most basic vanilla test metrics, err := parser.Parse([]byte(validEmptyJSON)) @@ -54,7 +52,8 @@ const validCounterJSON = ` ` func TestParseValidCounterJSON(t *testing.T) { - parser := NewParser() + parser := &Parser{} + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte(validCounterJSON)) require.NoError(t, err) @@ -92,10 +91,12 @@ const validEmbeddedCounterJSON = ` func TestParseValidEmbeddedCounterJSON(t *testing.T) { timeFormat := "2006-01-02T15:04:05Z07:00" metricTime, _ := time.Parse(timeFormat, "2017-02-22T15:33:03.662+03:00") - parser := NewParser() - parser.MetricRegistryPath = "metrics" - parser.TagsPath = "tags" - parser.TimePath = "time" + parser := &Parser{ + MetricRegistryPath: "metrics", + TagsPath: "tags", + TimePath: "time", + } + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte(validEmbeddedCounterJSON)) require.NoError(t, err) @@ -113,10 +114,12 @@ func TestParseValidEmbeddedCounterJSON(t *testing.T) { require.True(t, metricTime.Equal(metrics[0].Time()), fmt.Sprintf("%s should be equal to %s", metrics[0].Time(), metricTime)) // now test json tags through TagPathsMap - parser2 := NewParser() - parser2.MetricRegistryPath = "metrics" - parser2.TagPathsMap = map[string]string{"tag1": "tags.tag1"} - parser2.TimePath = "time" + parser2 := &Parser{ + MetricRegistryPath: "metrics", + TagPathsMap: map[string]string{"tag1": "tags.tag1"}, + TimePath: "time", + } + require.NoError(t, parser2.Init()) metrics2, err2 := parser2.Parse([]byte(validEmbeddedCounterJSON)) require.NoError(t, err2) require.Equal(t, map[string]string{"metric_type": "counter", "tag1": "green"}, metrics2[0].Tags()) @@ -144,7 +147,8 @@ const validMeterJSON1 = ` ` func TestParseValidMeterJSON1(t *testing.T) { - parser := NewParser() + parser := &Parser{} + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte(validMeterJSON1)) require.NoError(t, err) @@ -184,7 +188,8 @@ const validMeterJSON2 = ` ` func TestParseValidMeterJSON2(t *testing.T) { - parser := NewParser() + parser := &Parser{} + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte(validMeterJSON2)) require.NoError(t, err) @@ -218,7 +223,8 @@ const validGaugeJSON = ` ` func TestParseValidGaugeJSON(t *testing.T) { - parser := NewParser() + parser := &Parser{} + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte(validGaugeJSON)) require.NoError(t, err) @@ -257,7 +263,8 @@ const validHistogramJSON = ` ` func TestParseValidHistogramJSON(t *testing.T) { - parser := NewParser() + parser := &Parser{} + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte(validHistogramJSON)) require.NoError(t, err) @@ -312,7 +319,8 @@ const validTimerJSON = ` ` func TestParseValidTimerJSON(t *testing.T) { - parser := NewParser() + parser := &Parser{} + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte(validTimerJSON)) require.NoError(t, err) @@ -363,7 +371,8 @@ const validAllJSON = ` ` func TestParseValidAllJSON(t *testing.T) { - parser := NewParser() + parser := &Parser{} + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte(validAllJSON)) require.NoError(t, err) @@ -372,21 +381,26 @@ func TestParseValidAllJSON(t *testing.T) { func TestTagParsingProblems(t *testing.T) { // giving a wrong path results in empty tags - parser1 := NewParser() - parser1.MetricRegistryPath = "metrics" - parser1.TagsPath = "tags1" - parser1.Log = testutil.Logger{} + parser1 := &Parser{ + MetricRegistryPath: "metrics", + TagsPath: "tags1", + Log: testutil.Logger{}, + } + require.NoError(t, parser1.Init()) + metrics1, err1 := parser1.Parse([]byte(validEmbeddedCounterJSON)) require.NoError(t, err1) require.Len(t, metrics1, 1) require.Equal(t, map[string]string{"metric_type": "counter"}, metrics1[0].Tags()) // giving a wrong TagsPath falls back to TagPathsMap - parser2 := NewParser() - parser2.MetricRegistryPath = "metrics" - parser2.TagsPath = "tags1" - parser2.TagPathsMap = map[string]string{"tag1": "tags.tag1"} - parser2.Log = testutil.Logger{} + parser2 := &Parser{ + MetricRegistryPath: "metrics", + TagsPath: "tags1", + TagPathsMap: map[string]string{"tag1": "tags.tag1"}, + Log: testutil.Logger{}, + } + require.NoError(t, parser2.Init()) metrics2, err2 := parser2.Parse([]byte(validEmbeddedCounterJSON)) require.NoError(t, err2) require.Len(t, metrics2, 1) @@ -431,12 +445,14 @@ const sampleTemplateJSON = ` ` func TestParseSampleTemplateJSON(t *testing.T) { - parser := NewParser() - err := parser.SetTemplates("_", []string{ - "jenkins.* measurement.metric.metric.field", - "vm.* measurement.measurement.pool.field", - }) - require.NoError(t, err) + parser := &Parser{ + Separator: "_", + Templates: []string{ + "jenkins.* measurement.metric.metric.field", + "vm.* measurement.measurement.pool.field", + }, + } + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte(sampleTemplateJSON)) require.NoError(t, err) @@ -498,16 +514,12 @@ func containsAll(t1 map[string]string, t2 map[string]string) bool { return true } -func NoError(t *testing.T, err error) { - require.NoError(t, err) -} - func TestDropWizard(t *testing.T) { tests := []struct { - name string - input []byte - metrics []telegraf.Metric - errFunc func(t *testing.T, err error) + name string + input []byte + metrics []telegraf.Metric + expectError bool }{ { name: "minimal", @@ -521,10 +533,9 @@ func TestDropWizard(t *testing.T) { map[string]interface{}{ "value": 42.0, }, - testTimeFunc(), + time.Unix(0, 0), ), }, - errFunc: NoError, }, { name: "name with space unescaped", @@ -538,17 +549,14 @@ func TestDropWizard(t *testing.T) { map[string]interface{}{ "value": 42.0, }, - testTimeFunc(), + time.Unix(0, 0), ), }, - errFunc: NoError, }, { - name: "name with space single slash escaped is not valid JSON", - input: []byte(`{"version": "3.0.0", "counters": {"hello\ world": {"value": 42}}}`), - errFunc: func(t *testing.T, err error) { - require.Error(t, err) - }, + name: "name with space single slash escaped is not valid JSON", + input: []byte(`{"version": "3.0.0", "counters": {"hello\ world": {"value": 42}}}`), + expectError: true, }, { name: "name with space double slash escape", @@ -562,27 +570,23 @@ func TestDropWizard(t *testing.T) { map[string]interface{}{ "value": 42.0, }, - testTimeFunc(), + time.Unix(0, 0), ), }, - errFunc: NoError, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - parser := NewParser() - parser.SetTimeFunc(testTimeFunc) + parser := &Parser{} + require.NoError(t, parser.Init()) metrics, err := parser.Parse(tt.input) - tt.errFunc(t, err) - - require.Equal(t, len(tt.metrics), len(metrics)) - for i, expected := range tt.metrics { - require.Equal(t, expected.Name(), metrics[i].Name()) - require.Equal(t, expected.Tags(), metrics[i].Tags()) - require.Equal(t, expected.Fields(), metrics[i].Fields()) - require.Equal(t, expected.Time(), metrics[i].Time()) + if tt.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) } + testutil.RequireMetricsEqual(t, tt.metrics, metrics, testutil.IgnoreTime()) }) } } diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 3391288ee..368b34ee0 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream" @@ -207,16 +206,6 @@ func NewParser(config *Config) (Parser, error) { } case "nagios": parser, err = NewNagiosParser() - case "dropwizard": - parser, err = NewDropwizardParser( - config.DropwizardMetricRegistryPath, - config.DropwizardTimePath, - config.DropwizardTimeFormat, - config.DropwizardTagsPath, - config.DropwizardTagPathsMap, - config.DefaultTags, - config.Separator, - config.Templates) case "grok": parser, err = newGrokParser( config.MetricName, @@ -282,31 +271,6 @@ func NewInfluxUpstreamParser() (Parser, error) { return influx_upstream.NewParser(), nil } -func NewDropwizardParser( - metricRegistryPath string, - timePath string, - timeFormat string, - tagsPath string, - tagPathsMap map[string]string, - defaultTags map[string]string, - separator string, - templates []string, - -) (Parser, error) { - parser := dropwizard.NewParser() - parser.MetricRegistryPath = metricRegistryPath - parser.TimePath = timePath - parser.TimeFormat = timeFormat - parser.TagsPath = tagsPath - parser.TagPathsMap = tagPathsMap - parser.DefaultTags = defaultTags - err := parser.SetTemplates(separator, templates) - if err != nil { - return nil, err - } - return parser, err -} - func NewPrometheusParser(defaultTags map[string]string, ignoreTimestamp bool) (Parser, error) { return &prometheus.Parser{ DefaultTags: defaultTags, diff --git a/plugins/parsers/registry_test.go b/plugins/parsers/registry_test.go index 20c94b013..0ae77ed41 100644 --- a/plugins/parsers/registry_test.go +++ b/plugins/parsers/registry_test.go @@ -77,7 +77,7 @@ func TestRegistry_BackwardCompatibility(t *testing.T) { options = append(options, cmpopts.IgnoreFields(stype, settings.mask...)) } - // Do a manual comparision as require.EqualValues will also work on unexported fields + // Do a manual comparison as require.EqualValues will also work on unexported fields // that cannot be cleared or ignored. diff := cmp.Diff(expected, actual, options...) require.Emptyf(t, diff, "Difference for %q", name)