From 8a9c2eec853b283649269cdce31501018f9adba8 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Mon, 26 Sep 2022 22:24:34 +0200 Subject: [PATCH] chore(processors): Convert processors to `ParserPlugin`s (#11600) --- config/config.go | 95 +++++++----- config/config_test.go | 144 +++++++++++++++++++ config/testdata/processors_with_parsers.toml | 60 ++++++++ parser.go | 8 +- plugins/processors/execd/execd.go | 15 +- plugins/processors/execd/execd_test.go | 8 ++ plugins/processors/parser/parser.go | 17 +-- plugins/processors/parser/parser_test.go | 113 +++++++-------- 8 files changed, 337 insertions(+), 123 deletions(-) create mode 100644 config/testdata/processors_with_parsers.toml diff --git a/config/config.go b/config/config.go index ac2d9570c..a2bba393a 100644 --- a/config/config.go +++ b/config/config.go @@ -687,12 +687,12 @@ func (c *Config) probeParser(table *ast.Table) bool { return ok } -func (c *Config) addParser(parentname string, table *ast.Table) (*models.RunningParser, error) { +func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) { var dataformat string c.getFieldString(table, "data_format", &dataformat) if dataformat == "" { - if parentname == "exec" { + if parentcategory == "inputs" && parentname == "exec" { // Legacy support, exec plugin originally parsed JSON by default. dataformat = "json" } else { @@ -735,51 +735,82 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { } return fmt.Errorf("Undefined but requested processor: %s", name) } + streamingProcessor := creator() + + // For processors with parsers we need to compute the set of + // options that is not covered by both, the parser and the processor. + // We achieve this by keeping a local book of missing entries + // that counts the number of misses. In case we have a parser + // for the input both need to miss the entry. We count the + // missing entries at the end. + missCount := make(map[string]int) + c.setLocalMissingTomlFieldTracker(missCount) + defer c.resetMissingTomlFieldTracker() processorConfig, err := c.buildProcessor(name, table) if err != nil { return err } - rf, err := c.newRunningProcessor(creator, processorConfig, table) - if err != nil { + var processor interface{} + processor = streamingProcessor + if p, ok := streamingProcessor.(unwrappable); ok { + processor = p.Unwrap() + } + + // If the (underlying) processor has a SetParser or SetParserFunc function, + // it can accept arbitrary data-formats, so build the requested parser and + // set it. + if t, ok := processor.(telegraf.ParserPlugin); ok { + parser, err := c.addParser("processors", name, table) + if err != nil { + return fmt.Errorf("adding parser failed: %w", err) + } + t.SetParser(parser) + } + + if t, ok := processor.(telegraf.ParserFuncPlugin); ok { + if !c.probeParser(table) { + return errors.New("parser not found") + } + t.SetParserFunc(func() (telegraf.Parser, error) { + return c.addParser("processors", name, table) + }) + } + + // Setup the processor + if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil { return err } + + rf := models.NewRunningProcessor(streamingProcessor, processorConfig) c.Processors = append(c.Processors, rf) - // save a copy for the aggregator - rf, err = c.newRunningProcessor(creator, processorConfig, table) - if err != nil { + // Save a copy for the aggregator + if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil { return err } + + rf = models.NewRunningProcessor(streamingProcessor, processorConfig) c.AggProcessors = append(c.AggProcessors, rf) return nil } -func (c *Config) newRunningProcessor( - creator processors.StreamingCreator, - processorConfig *models.ProcessorConfig, - table *ast.Table, -) (*models.RunningProcessor, error) { - processor := creator() - +func (c *Config) setupProcessorOptions(name string, processor telegraf.StreamingProcessor, table *ast.Table) error { if p, ok := processor.(unwrappable); ok { - if err := c.toml.UnmarshalTable(table, p.Unwrap()); err != nil { - return nil, err - } - } else { - if err := c.toml.UnmarshalTable(table, processor); err != nil { - return nil, err + unwrapped := p.Unwrap() + if err := c.toml.UnmarshalTable(table, unwrapped); err != nil { + return fmt.Errorf("unmarshalling unwrappable failed: %w", err) } + return c.printUserDeprecation("processors", name, unwrapped) } - if err := c.printUserDeprecation("processors", processorConfig.Name, processor); err != nil { - return nil, err + if err := c.toml.UnmarshalTable(table, processor); err != nil { + return fmt.Errorf("unmarshalling failed: %w", err) } - rf := models.NewRunningProcessor(processor, processorConfig) - return rf, nil + return c.printUserDeprecation("processors", name, processor) } func (c *Config) addOutput(name string, table *ast.Table) error { @@ -860,8 +891,8 @@ func (c *Config) addInput(name string, table *ast.Table) error { // If the input has a SetParser or SetParserFunc function, it can accept // arbitrary data-formats, so build the requested parser and set it. - if t, ok := input.(telegraf.ParserInput); ok { - parser, err := c.addParser(name, table) + if t, ok := input.(telegraf.ParserPlugin); ok { + parser, err := c.addParser("inputs", name, table) if err != nil { return fmt.Errorf("adding parser failed: %w", err) } @@ -870,30 +901,30 @@ func (c *Config) addInput(name string, table *ast.Table) error { // Keep the old interface for backward compatibility if t, ok := input.(parsers.ParserInput); ok { - // DEPRECATED: Please switch your plugin to telegraf.ParserInput. - parser, err := c.addParser(name, table) + // DEPRECATED: Please switch your plugin to telegraf.ParserPlugin. + parser, err := c.addParser("inputs", name, table) if err != nil { return fmt.Errorf("adding parser failed: %w", err) } t.SetParser(parser) } - if t, ok := input.(telegraf.ParserFuncInput); ok { + if t, ok := input.(telegraf.ParserFuncPlugin); ok { if !c.probeParser(table) { return errors.New("parser not found") } t.SetParserFunc(func() (telegraf.Parser, error) { - return c.addParser(name, table) + return c.addParser("inputs", name, table) }) } if t, ok := input.(parsers.ParserFuncInput); ok { - // DEPRECATED: Please switch your plugin to telegraf.ParserFuncInput. + // DEPRECATED: Please switch your plugin to telegraf.ParserFuncPlugin. if !c.probeParser(table) { return errors.New("parser not found") } t.SetParserFunc(func() (parsers.Parser, error) { - return c.addParser(name, table) + return c.addParser("inputs", name, table) }) } diff --git a/config/config_test.go b/config/config_test.go index 066410587..35decf216 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -27,6 +27,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" _ "github.com/influxdata/telegraf/plugins/parsers/all" // Blank import to have all parsers for testing "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/influxdata/telegraf/plugins/processors" ) func TestReadBinaryFile(t *testing.T) { @@ -654,6 +655,128 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) { } } +func TestConfig_ProcessorsWithParsers(t *testing.T) { + formats := []string{ + "collectd", + "csv", + "dropwizard", + "form_urlencoded", + "graphite", + "grok", + "influx", + "json", + "json_v2", + "logfmt", + "nagios", + "prometheus", + "prometheusremotewrite", + "value", + "wavefront", + "xml", "xpath_json", "xpath_msgpack", "xpath_protobuf", + } + + c := NewConfig() + require.NoError(t, c.LoadConfig("./testdata/processors_with_parsers.toml")) + require.Len(t, c.Processors, len(formats)) + + override := map[string]struct { + param map[string]interface{} + mask []string + }{ + "csv": { + param: map[string]interface{}{ + "HeaderRowCount": 42, + }, + mask: []string{"TimeFunc", "ResetMode"}, + }, + "xpath_protobuf": { + param: map[string]interface{}{ + "ProtobufMessageDef": "testdata/addressbook.proto", + "ProtobufMessageType": "addressbook.AddressBook", + }, + }, + } + + expected := make([]telegraf.Parser, 0, len(formats)) + for _, format := range formats { + logger := models.NewLogger("parsers", format, "processors_with_parsers") + + creator, found := parsers.Parsers[format] + require.Truef(t, found, "No parser for format %q", format) + + parser := creator("parser_test") + if settings, found := override[format]; found { + s := reflect.Indirect(reflect.ValueOf(parser)) + for key, value := range settings.param { + v := reflect.ValueOf(value) + s.FieldByName(key).Set(v) + } + } + models.SetLoggerOnPlugin(parser, logger) + if p, ok := parser.(telegraf.Initializer); ok { + require.NoError(t, p.Init()) + } + expected = append(expected, parser) + } + require.Len(t, expected, len(formats)) + + actual := make([]interface{}, 0) + generated := make([]interface{}, 0) + for _, plugin := range c.Processors { + var processorIF telegraf.Processor + if p, ok := plugin.Processor.(unwrappable); ok { + processorIF = p.Unwrap() + } else { + processorIF = plugin.Processor.(telegraf.Processor) + } + require.NotNil(t, processorIF) + + processor, ok := processorIF.(*MockupProcessorPluginParser) + require.True(t, ok) + + // Get the parser set with 'SetParser()' + if p, ok := processor.Parser.(*models.RunningParser); ok { + actual = append(actual, p.Parser) + } else { + actual = append(actual, processor.Parser) + } + // Get the parser set with 'SetParserFunc()' + if processor.ParserFunc != nil { + g, err := processor.ParserFunc() + require.NoError(t, err) + if rp, ok := g.(*models.RunningParser); ok { + generated = append(generated, rp.Parser) + } else { + generated = append(generated, g) + } + } else { + generated = append(generated, nil) + } + } + require.Len(t, actual, len(formats)) + + for i, format := range formats { + // Determine the underlying type of the parser + stype := reflect.Indirect(reflect.ValueOf(expected[i])).Interface() + // Ignore all unexported fields and fields not relevant for functionality + options := []cmp.Option{ + cmpopts.IgnoreUnexported(stype), + cmpopts.IgnoreTypes(sync.Mutex{}), + cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}), + } + if settings, found := override[format]; found { + options = append(options, cmpopts.IgnoreFields(stype, settings.mask...)) + } + + // Do a manual comparision as require.EqualValues will also work on unexported fields + // that cannot be cleared or ignored. + diff := cmp.Diff(expected[i], actual[i], options...) + require.Emptyf(t, diff, "Difference in SetParser() for %q", format) + diff = cmp.Diff(expected[i], generated[i], options...) + require.Emptyf(t, diff, "Difference in SetParserFunc() for %q", format) + } +} + /*** Mockup INPUT plugin for (old) parser testing to avoid cyclic dependencies ***/ type MockupInputPluginParserOld struct { Parser parsers.Parser @@ -698,6 +821,24 @@ func (m *MockupInputPlugin) SampleConfig() string { return "Moc func (m *MockupInputPlugin) Gather(acc telegraf.Accumulator) error { return nil } func (m *MockupInputPlugin) SetParser(parser telegraf.Parser) { m.parser = parser } +/*** Mockup PROCESSOR plugin for testing to avoid cyclic dependencies ***/ +type MockupProcessorPluginParser struct { + Parser telegraf.Parser + ParserFunc telegraf.ParserFunc +} + +func (m *MockupProcessorPluginParser) Start(acc telegraf.Accumulator) error { return nil } +func (m *MockupProcessorPluginParser) Stop() error { return nil } +func (m *MockupProcessorPluginParser) SampleConfig() string { + return "Mockup test processor plugin with parser" +} +func (m *MockupProcessorPluginParser) Apply(in ...telegraf.Metric) []telegraf.Metric { return nil } +func (m *MockupProcessorPluginParser) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { + return nil +} +func (m *MockupProcessorPluginParser) SetParser(parser telegraf.Parser) { m.Parser = parser } +func (m *MockupProcessorPluginParser) SetParserFunc(f telegraf.ParserFunc) { m.ParserFunc = f } + /*** Mockup OUTPUT plugin for testing to avoid cyclic dependencies ***/ type MockupOuputPlugin struct { URL string `toml:"url"` @@ -723,6 +864,9 @@ func init() { inputs.Add("memcached", func() telegraf.Input { return &MockupInputPlugin{} }) inputs.Add("procstat", func() telegraf.Input { return &MockupInputPlugin{} }) + // Register the mockup output plugin for the required names + processors.Add("parser_test", func() telegraf.Processor { return &MockupProcessorPluginParser{} }) + // Register the mockup output plugin for the required names outputs.Add("azure_monitor", func() telegraf.Output { return &MockupOuputPlugin{NamespacePrefix: "Telegraf/"} }) outputs.Add("http", func() telegraf.Output { return &MockupOuputPlugin{} }) diff --git a/config/testdata/processors_with_parsers.toml b/config/testdata/processors_with_parsers.toml new file mode 100644 index 000000000..71022210c --- /dev/null +++ b/config/testdata/processors_with_parsers.toml @@ -0,0 +1,60 @@ +[[processors.parser_test]] + data_format = "collectd" + +[[processors.parser_test]] + data_format = "csv" + csv_header_row_count = 42 + +[[processors.parser_test]] + data_format = "dropwizard" + +[[processors.parser_test]] + data_format = "form_urlencoded" + +[[processors.parser_test]] + data_format = "graphite" + +[[processors.parser_test]] + data_format = "grok" + grok_patterns = ["%{COMBINED_LOG_FORMAT}"] + +[[processors.parser_test]] + data_format = "influx" + +[[processors.parser_test]] + data_format = "json" + +[[processors.parser_test]] + data_format = "json_v2" + +[[processors.parser_test]] + data_format = "logfmt" + +[[processors.parser_test]] + data_format = "nagios" + +[[processors.parser_test]] + data_format = "prometheus" + +[[processors.parser_test]] + data_format = "prometheusremotewrite" + +[[processors.parser_test]] + data_format = "value" + +[[processors.parser_test]] + data_format = "wavefront" + +[[processors.parser_test]] + data_format = "xml" + +[[processors.parser_test]] + data_format = "xpath_json" + +[[processors.parser_test]] + data_format = "xpath_msgpack" + +[[processors.parser_test]] + data_format = "xpath_protobuf" + xpath_protobuf_file = "testdata/addressbook.proto" + xpath_protobuf_type = "addressbook.AddressBook" diff --git a/parser.go b/parser.go index 1112fa211..5d67de987 100644 --- a/parser.go +++ b/parser.go @@ -24,16 +24,16 @@ type Parser interface { type ParserFunc func() (Parser, error) -// ParserInput is an interface for input plugins that are able to parse +// ParserPlugin is an interface for plugins that are able to parse // arbitrary data formats. -type ParserInput interface { +type ParserPlugin interface { // SetParser sets the parser function for the interface SetParser(parser Parser) } -// ParserFuncInput is an interface for input plugins that are able to parse +// ParserFuncPlugin is an interface for plugins that are able to parse // arbitrary data formats. -type ParserFuncInput interface { +type ParserFuncPlugin interface { // GetParser returns a new parser. SetParserFunc(fn ParserFunc) } diff --git a/plugins/processors/execd/execd.go b/plugins/processors/execd/execd.go index cbccb7dd4..5ab7ff53a 100644 --- a/plugins/processors/execd/execd.go +++ b/plugins/processors/execd/execd.go @@ -13,7 +13,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal/process" - "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/serializers" @@ -28,8 +27,7 @@ type Execd struct { RestartDelay config.Duration `toml:"restart_delay"` Log telegraf.Logger - parserConfig *parsers.Config - parser parsers.Parser + parser telegraf.Parser serializerConfig *serializers.Config serializer serializers.Serializer acc telegraf.Accumulator @@ -39,25 +37,22 @@ type Execd struct { func New() *Execd { return &Execd{ RestartDelay: config.Duration(10 * time.Second), - parserConfig: &parsers.Config{ - DataFormat: "influx", - }, serializerConfig: &serializers.Config{ DataFormat: "influx", }, } } +func (e *Execd) SetParser(p telegraf.Parser) { + e.parser = p +} + func (*Execd) SampleConfig() string { return sampleConfig } func (e *Execd) Start(acc telegraf.Accumulator) error { var err error - e.parser, err = parsers.NewParser(e.parserConfig) - if err != nil { - return fmt.Errorf("error creating parser: %w", err) - } e.serializer, err = serializers.NewSerializer(e.serializerConfig) if err != nil { return fmt.Errorf("error creating serializer: %w", err) diff --git a/plugins/processors/execd/execd_test.go b/plugins/processors/execd/execd_test.go index 48fd83d36..215d72465 100644 --- a/plugins/processors/execd/execd_test.go +++ b/plugins/processors/execd/execd_test.go @@ -20,6 +20,10 @@ func TestExternalProcessorWorks(t *testing.T) { e := New() e.Log = testutil.Logger{} + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + e.SetParser(parser) + exe, err := os.Executable() require.NoError(t, err) t.Log(exe) @@ -81,6 +85,10 @@ func TestParseLinesWithNewLines(t *testing.T) { e := New() e.Log = testutil.Logger{} + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + e.SetParser(parser) + exe, err := os.Executable() require.NoError(t, err) t.Log(exe) diff --git a/plugins/processors/parser/parser.go b/plugins/processors/parser/parser.go index 11eb958f5..2ed0bb42a 100644 --- a/plugins/processors/parser/parser.go +++ b/plugins/processors/parser/parser.go @@ -5,8 +5,6 @@ import ( _ "embed" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/models" - "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/processors" ) @@ -14,7 +12,6 @@ import ( var sampleConfig string type Parser struct { - parsers.Config DropOriginal bool `toml:"drop_original"` Merge string `toml:"merge"` ParseFields []string `toml:"parse_fields"` @@ -27,17 +24,11 @@ func (*Parser) SampleConfig() string { return sampleConfig } -func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric { - if p.parser == nil { - var err error - p.parser, err = parsers.NewParser(&p.Config) - if err != nil { - p.Log.Errorf("could not create parser: %v", err) - return metrics - } - models.SetLoggerOnPlugin(p.parser, p.Log) - } +func (p *Parser) SetParser(parser telegraf.Parser) { + p.parser = parser +} +func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric { results := []telegraf.Metric{} for _, metric := range metrics { diff --git a/plugins/processors/parser/parser_test.go b/plugins/processors/parser/parser_test.go index 112f7ae39..804cb02d7 100644 --- a/plugins/processors/parser/parser_test.go +++ b/plugins/processors/parser/parser_test.go @@ -6,10 +6,14 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/stretchr/testify/require" //Blank import to register all new-style parsers - _ "github.com/influxdata/telegraf/plugins/parsers/all" + + "github.com/influxdata/telegraf/plugins/parsers/grok" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/testutil" ) @@ -19,7 +23,7 @@ func TestApply(t *testing.T) { name string parseFields []string parseTags []string - config parsers.Config + parser telegraf.Parser dropOriginal bool merge string input telegraf.Metric @@ -29,8 +33,7 @@ func TestApply(t *testing.T) { name: "parse one field drop original", parseFields: []string{"sample"}, dropOriginal: true, - config: parsers.Config{ - DataFormat: "json", + parser: &json.Parser{ TagKeys: []string{ "ts", "lvl", @@ -65,8 +68,7 @@ func TestApply(t *testing.T) { parseFields: []string{"sample"}, dropOriginal: false, merge: "override", - config: parsers.Config{ - DataFormat: "json", + parser: &json.Parser{ TagKeys: []string{ "ts", "lvl", @@ -103,8 +105,7 @@ func TestApply(t *testing.T) { name: "parse one field keep", parseFields: []string{"sample"}, dropOriginal: false, - config: parsers.Config{ - DataFormat: "json", + parser: &json.Parser{ TagKeys: []string{ "ts", "lvl", @@ -144,11 +145,9 @@ func TestApply(t *testing.T) { }, }, { - name: "parse one field keep with measurement name", - parseFields: []string{"message"}, - config: parsers.Config{ - DataFormat: "influx", - }, + name: "parse one field keep with measurement name", + parseFields: []string{"message"}, + parser: &influx.Parser{}, dropOriginal: false, input: metric.New( "influxField", @@ -181,9 +180,7 @@ func TestApply(t *testing.T) { parseFields: []string{"message"}, dropOriginal: false, merge: "override", - config: parsers.Config{ - DataFormat: "influx", - }, + parser: &influx.Parser{}, input: metric.New( "influxField", map[string]string{ @@ -210,9 +207,8 @@ func TestApply(t *testing.T) { name: "parse grok field", parseFields: []string{"grokSample"}, dropOriginal: true, - config: parsers.Config{ - DataFormat: "grok", - GrokPatterns: []string{"%{COMBINED_LOG_FORMAT}"}, + parser: &grok.Parser{ + Patterns: []string{"%{COMBINED_LOG_FORMAT}"}, }, input: metric.New( "success", @@ -245,9 +241,8 @@ func TestApply(t *testing.T) { name: "parse two fields [replace]", parseFields: []string{"field_1", "field_2"}, dropOriginal: true, - config: parsers.Config{ - DataFormat: "json", - TagKeys: []string{"lvl", "err"}, + parser: &json.Parser{ + TagKeys: []string{"lvl", "err"}, }, input: metric.New( "bigMeasure", @@ -279,9 +274,8 @@ func TestApply(t *testing.T) { parseFields: []string{"field_1", "field_2"}, dropOriginal: false, merge: "override", - config: parsers.Config{ - DataFormat: "json", - TagKeys: []string{"lvl", "msg", "err", "fatal"}, + parser: &json.Parser{ + TagKeys: []string{"lvl", "msg", "err", "fatal"}, }, input: metric.New( "bigMeasure", @@ -311,9 +305,8 @@ func TestApply(t *testing.T) { name: "parse two fields [keep]", parseFields: []string{"field_1", "field_2"}, dropOriginal: false, - config: parsers.Config{ - DataFormat: "json", - TagKeys: []string{"lvl", "msg", "err", "fatal"}, + parser: &json.Parser{ + TagKeys: []string{"lvl", "msg", "err", "fatal"}, }, input: metric.New( "bigMeasure", @@ -354,9 +347,7 @@ func TestApply(t *testing.T) { name: "parse one tag drop original", parseTags: []string{"sample"}, dropOriginal: true, - config: parsers.Config{ - DataFormat: "logfmt", - }, + parser: &logfmt.Parser{}, input: metric.New( "singleTag", map[string]string{ @@ -380,9 +371,7 @@ func TestApply(t *testing.T) { parseTags: []string{"sample"}, dropOriginal: false, merge: "override", - config: parsers.Config{ - DataFormat: "logfmt", - }, + parser: &logfmt.Parser{}, input: metric.New( "singleTag", map[string]string{ @@ -408,9 +397,7 @@ func TestApply(t *testing.T) { name: "parse one tag keep", parseTags: []string{"sample"}, dropOriginal: false, - config: parsers.Config{ - DataFormat: "logfmt", - }, + parser: &logfmt.Parser{}, input: metric.New( "singleTag", map[string]string{ @@ -441,9 +428,8 @@ func TestApply(t *testing.T) { name: "Fail to parse one field but parses other [keep]", parseFields: []string{"good", "bad"}, dropOriginal: false, - config: parsers.Config{ - DataFormat: "json", - TagKeys: []string{"lvl"}, + parser: &json.Parser{ + TagKeys: []string{"lvl"}, }, input: metric.New( "success", @@ -475,9 +461,8 @@ func TestApply(t *testing.T) { name: "Fail to parse one field but parses other [keep] v2", parseFields: []string{"bad", "good", "ok"}, dropOriginal: false, - config: parsers.Config{ - DataFormat: "json", - TagKeys: []string{"lvl", "thing"}, + parser: &json.Parser{ + TagKeys: []string{"lvl", "thing"}, }, input: metric.New( "success", @@ -519,9 +504,8 @@ func TestApply(t *testing.T) { parseFields: []string{"good", "bad"}, dropOriginal: false, merge: "override", - config: parsers.Config{ - DataFormat: "json", - TagKeys: []string{"lvl"}, + parser: &json.Parser{ + TagKeys: []string{"lvl"}, }, input: metric.New( "success", @@ -551,9 +535,8 @@ func TestApply(t *testing.T) { name: "Fail to parse one field but parses other [replace]", parseFields: []string{"good", "bad"}, dropOriginal: true, - config: parsers.Config{ - DataFormat: "json", - TagKeys: []string{"lvl"}, + parser: &json.Parser{ + TagKeys: []string{"lvl"}, }, input: metric.New( "success", @@ -579,16 +562,19 @@ func TestApply(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - parser := Parser{ - Config: tt.config, + if p, ok := tt.parser.(telegraf.Initializer); ok { + require.NoError(t, p.Init()) + } + plugin := Parser{ ParseFields: tt.parseFields, ParseTags: tt.parseTags, DropOriginal: tt.dropOriginal, Merge: tt.merge, Log: testutil.Logger{Name: "processor.parser"}, } + plugin.SetParser(tt.parser) - output := parser.Apply(tt.input) + output := plugin.Apply(tt.input) t.Logf("Testing: %s", tt.name) testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime()) }) @@ -599,16 +585,14 @@ func TestBadApply(t *testing.T) { tests := []struct { name string parseFields []string - config parsers.Config + parser telegraf.Parser input telegraf.Metric expected []telegraf.Metric }{ { name: "field not found", parseFields: []string{"bad_field"}, - config: parsers.Config{ - DataFormat: "json", - }, + parser: &json.Parser{}, input: metric.New( "bad", map[string]string{}, @@ -629,9 +613,7 @@ func TestBadApply(t *testing.T) { { name: "non string field", parseFields: []string{"some_field"}, - config: parsers.Config{ - DataFormat: "json", - }, + parser: &json.Parser{}, input: metric.New( "bad", map[string]string{}, @@ -653,14 +635,17 @@ func TestBadApply(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - parser := Parser{ - Config: tt.config, + if p, ok := tt.parser.(telegraf.Initializer); ok { + require.NoError(t, p.Init()) + } + + plugin := Parser{ ParseFields: tt.parseFields, Log: testutil.Logger{Name: "processor.parser"}, } + plugin.SetParser(tt.parser) - output := parser.Apply(tt.input) - + output := plugin.Apply(tt.input) testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime()) }) }