chore(processors): Convert processors to `ParserPlugin`s (#11600)

This commit is contained in:
Sven Rebhan 2022-09-26 22:24:34 +02:00 committed by GitHub
parent d982ed9a45
commit 8a9c2eec85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 337 additions and 123 deletions

View File

@ -687,12 +687,12 @@ func (c *Config) probeParser(table *ast.Table) bool {
return ok
}
func (c *Config) addParser(parentname string, table *ast.Table) (*models.RunningParser, error) {
func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
if dataformat == "" {
if parentname == "exec" {
if parentcategory == "inputs" && parentname == "exec" {
// Legacy support, exec plugin originally parsed JSON by default.
dataformat = "json"
} else {
@ -735,51 +735,82 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
}
return fmt.Errorf("Undefined but requested processor: %s", name)
}
streamingProcessor := creator()
// For processors with parsers we need to compute the set of
// options that is not covered by both, the parser and the processor.
// We achieve this by keeping a local book of missing entries
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
// missing entries at the end.
missCount := make(map[string]int)
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()
processorConfig, err := c.buildProcessor(name, table)
if err != nil {
return err
}
rf, err := c.newRunningProcessor(creator, processorConfig, table)
if err != nil {
var processor interface{}
processor = streamingProcessor
if p, ok := streamingProcessor.(unwrappable); ok {
processor = p.Unwrap()
}
// If the (underlying) processor has a SetParser or SetParserFunc function,
// it can accept arbitrary data-formats, so build the requested parser and
// set it.
if t, ok := processor.(telegraf.ParserPlugin); ok {
parser, err := c.addParser("processors", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
}
if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.addParser("processors", name, table)
})
}
// Setup the processor
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
return err
}
rf := models.NewRunningProcessor(streamingProcessor, processorConfig)
c.Processors = append(c.Processors, rf)
// save a copy for the aggregator
rf, err = c.newRunningProcessor(creator, processorConfig, table)
if err != nil {
// Save a copy for the aggregator
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
return err
}
rf = models.NewRunningProcessor(streamingProcessor, processorConfig)
c.AggProcessors = append(c.AggProcessors, rf)
return nil
}
func (c *Config) newRunningProcessor(
creator processors.StreamingCreator,
processorConfig *models.ProcessorConfig,
table *ast.Table,
) (*models.RunningProcessor, error) {
processor := creator()
func (c *Config) setupProcessorOptions(name string, processor telegraf.StreamingProcessor, table *ast.Table) error {
if p, ok := processor.(unwrappable); ok {
if err := c.toml.UnmarshalTable(table, p.Unwrap()); err != nil {
return nil, err
}
} else {
if err := c.toml.UnmarshalTable(table, processor); err != nil {
return nil, err
unwrapped := p.Unwrap()
if err := c.toml.UnmarshalTable(table, unwrapped); err != nil {
return fmt.Errorf("unmarshalling unwrappable failed: %w", err)
}
return c.printUserDeprecation("processors", name, unwrapped)
}
if err := c.printUserDeprecation("processors", processorConfig.Name, processor); err != nil {
return nil, err
if err := c.toml.UnmarshalTable(table, processor); err != nil {
return fmt.Errorf("unmarshalling failed: %w", err)
}
rf := models.NewRunningProcessor(processor, processorConfig)
return rf, nil
return c.printUserDeprecation("processors", name, processor)
}
func (c *Config) addOutput(name string, table *ast.Table) error {
@ -860,8 +891,8 @@ func (c *Config) addInput(name string, table *ast.Table) error {
// If the input has a SetParser or SetParserFunc function, it can accept
// arbitrary data-formats, so build the requested parser and set it.
if t, ok := input.(telegraf.ParserInput); ok {
parser, err := c.addParser(name, table)
if t, ok := input.(telegraf.ParserPlugin); ok {
parser, err := c.addParser("inputs", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
@ -870,30 +901,30 @@ func (c *Config) addInput(name string, table *ast.Table) error {
// Keep the old interface for backward compatibility
if t, ok := input.(parsers.ParserInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserInput.
parser, err := c.addParser(name, table)
// DEPRECATED: Please switch your plugin to telegraf.ParserPlugin.
parser, err := c.addParser("inputs", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
}
if t, ok := input.(telegraf.ParserFuncInput); ok {
if t, ok := input.(telegraf.ParserFuncPlugin); ok {
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.addParser(name, table)
return c.addParser("inputs", name, table)
})
}
if t, ok := input.(parsers.ParserFuncInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncInput.
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncPlugin.
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (parsers.Parser, error) {
return c.addParser(name, table)
return c.addParser("inputs", name, table)
})
}

View File

@ -27,6 +27,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
_ "github.com/influxdata/telegraf/plugins/parsers/all" // Blank import to have all parsers for testing
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/processors"
)
func TestReadBinaryFile(t *testing.T) {
@ -654,6 +655,128 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
}
}
func TestConfig_ProcessorsWithParsers(t *testing.T) {
formats := []string{
"collectd",
"csv",
"dropwizard",
"form_urlencoded",
"graphite",
"grok",
"influx",
"json",
"json_v2",
"logfmt",
"nagios",
"prometheus",
"prometheusremotewrite",
"value",
"wavefront",
"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf",
}
c := NewConfig()
require.NoError(t, c.LoadConfig("./testdata/processors_with_parsers.toml"))
require.Len(t, c.Processors, len(formats))
override := map[string]struct {
param map[string]interface{}
mask []string
}{
"csv": {
param: map[string]interface{}{
"HeaderRowCount": 42,
},
mask: []string{"TimeFunc", "ResetMode"},
},
"xpath_protobuf": {
param: map[string]interface{}{
"ProtobufMessageDef": "testdata/addressbook.proto",
"ProtobufMessageType": "addressbook.AddressBook",
},
},
}
expected := make([]telegraf.Parser, 0, len(formats))
for _, format := range formats {
logger := models.NewLogger("parsers", format, "processors_with_parsers")
creator, found := parsers.Parsers[format]
require.Truef(t, found, "No parser for format %q", format)
parser := creator("parser_test")
if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(parser))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
}
models.SetLoggerOnPlugin(parser, logger)
if p, ok := parser.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
expected = append(expected, parser)
}
require.Len(t, expected, len(formats))
actual := make([]interface{}, 0)
generated := make([]interface{}, 0)
for _, plugin := range c.Processors {
var processorIF telegraf.Processor
if p, ok := plugin.Processor.(unwrappable); ok {
processorIF = p.Unwrap()
} else {
processorIF = plugin.Processor.(telegraf.Processor)
}
require.NotNil(t, processorIF)
processor, ok := processorIF.(*MockupProcessorPluginParser)
require.True(t, ok)
// Get the parser set with 'SetParser()'
if p, ok := processor.Parser.(*models.RunningParser); ok {
actual = append(actual, p.Parser)
} else {
actual = append(actual, processor.Parser)
}
// Get the parser set with 'SetParserFunc()'
if processor.ParserFunc != nil {
g, err := processor.ParserFunc()
require.NoError(t, err)
if rp, ok := g.(*models.RunningParser); ok {
generated = append(generated, rp.Parser)
} else {
generated = append(generated, g)
}
} else {
generated = append(generated, nil)
}
}
require.Len(t, actual, len(formats))
for i, format := range formats {
// Determine the underlying type of the parser
stype := reflect.Indirect(reflect.ValueOf(expected[i])).Interface()
// Ignore all unexported fields and fields not relevant for functionality
options := []cmp.Option{
cmpopts.IgnoreUnexported(stype),
cmpopts.IgnoreTypes(sync.Mutex{}),
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
}
if settings, found := override[format]; found {
options = append(options, cmpopts.IgnoreFields(stype, settings.mask...))
}
// Do a manual comparision as require.EqualValues will also work on unexported fields
// that cannot be cleared or ignored.
diff := cmp.Diff(expected[i], actual[i], options...)
require.Emptyf(t, diff, "Difference in SetParser() for %q", format)
diff = cmp.Diff(expected[i], generated[i], options...)
require.Emptyf(t, diff, "Difference in SetParserFunc() for %q", format)
}
}
/*** Mockup INPUT plugin for (old) parser testing to avoid cyclic dependencies ***/
type MockupInputPluginParserOld struct {
Parser parsers.Parser
@ -698,6 +821,24 @@ func (m *MockupInputPlugin) SampleConfig() string { return "Moc
func (m *MockupInputPlugin) Gather(acc telegraf.Accumulator) error { return nil }
func (m *MockupInputPlugin) SetParser(parser telegraf.Parser) { m.parser = parser }
/*** Mockup PROCESSOR plugin for testing to avoid cyclic dependencies ***/
type MockupProcessorPluginParser struct {
Parser telegraf.Parser
ParserFunc telegraf.ParserFunc
}
func (m *MockupProcessorPluginParser) Start(acc telegraf.Accumulator) error { return nil }
func (m *MockupProcessorPluginParser) Stop() error { return nil }
func (m *MockupProcessorPluginParser) SampleConfig() string {
return "Mockup test processor plugin with parser"
}
func (m *MockupProcessorPluginParser) Apply(in ...telegraf.Metric) []telegraf.Metric { return nil }
func (m *MockupProcessorPluginParser) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
return nil
}
func (m *MockupProcessorPluginParser) SetParser(parser telegraf.Parser) { m.Parser = parser }
func (m *MockupProcessorPluginParser) SetParserFunc(f telegraf.ParserFunc) { m.ParserFunc = f }
/*** Mockup OUTPUT plugin for testing to avoid cyclic dependencies ***/
type MockupOuputPlugin struct {
URL string `toml:"url"`
@ -723,6 +864,9 @@ func init() {
inputs.Add("memcached", func() telegraf.Input { return &MockupInputPlugin{} })
inputs.Add("procstat", func() telegraf.Input { return &MockupInputPlugin{} })
// Register the mockup output plugin for the required names
processors.Add("parser_test", func() telegraf.Processor { return &MockupProcessorPluginParser{} })
// Register the mockup output plugin for the required names
outputs.Add("azure_monitor", func() telegraf.Output { return &MockupOuputPlugin{NamespacePrefix: "Telegraf/"} })
outputs.Add("http", func() telegraf.Output { return &MockupOuputPlugin{} })

View File

@ -0,0 +1,60 @@
[[processors.parser_test]]
data_format = "collectd"
[[processors.parser_test]]
data_format = "csv"
csv_header_row_count = 42
[[processors.parser_test]]
data_format = "dropwizard"
[[processors.parser_test]]
data_format = "form_urlencoded"
[[processors.parser_test]]
data_format = "graphite"
[[processors.parser_test]]
data_format = "grok"
grok_patterns = ["%{COMBINED_LOG_FORMAT}"]
[[processors.parser_test]]
data_format = "influx"
[[processors.parser_test]]
data_format = "json"
[[processors.parser_test]]
data_format = "json_v2"
[[processors.parser_test]]
data_format = "logfmt"
[[processors.parser_test]]
data_format = "nagios"
[[processors.parser_test]]
data_format = "prometheus"
[[processors.parser_test]]
data_format = "prometheusremotewrite"
[[processors.parser_test]]
data_format = "value"
[[processors.parser_test]]
data_format = "wavefront"
[[processors.parser_test]]
data_format = "xml"
[[processors.parser_test]]
data_format = "xpath_json"
[[processors.parser_test]]
data_format = "xpath_msgpack"
[[processors.parser_test]]
data_format = "xpath_protobuf"
xpath_protobuf_file = "testdata/addressbook.proto"
xpath_protobuf_type = "addressbook.AddressBook"

View File

@ -24,16 +24,16 @@ type Parser interface {
type ParserFunc func() (Parser, error)
// ParserInput is an interface for input plugins that are able to parse
// ParserPlugin is an interface for plugins that are able to parse
// arbitrary data formats.
type ParserInput interface {
type ParserPlugin interface {
// SetParser sets the parser function for the interface
SetParser(parser Parser)
}
// ParserFuncInput is an interface for input plugins that are able to parse
// ParserFuncPlugin is an interface for plugins that are able to parse
// arbitrary data formats.
type ParserFuncInput interface {
type ParserFuncPlugin interface {
// GetParser returns a new parser.
SetParserFunc(fn ParserFunc)
}

View File

@ -13,7 +13,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/process"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
@ -28,8 +27,7 @@ type Execd struct {
RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger
parserConfig *parsers.Config
parser parsers.Parser
parser telegraf.Parser
serializerConfig *serializers.Config
serializer serializers.Serializer
acc telegraf.Accumulator
@ -39,25 +37,22 @@ type Execd struct {
func New() *Execd {
return &Execd{
RestartDelay: config.Duration(10 * time.Second),
parserConfig: &parsers.Config{
DataFormat: "influx",
},
serializerConfig: &serializers.Config{
DataFormat: "influx",
},
}
}
func (e *Execd) SetParser(p telegraf.Parser) {
e.parser = p
}
func (*Execd) SampleConfig() string {
return sampleConfig
}
func (e *Execd) Start(acc telegraf.Accumulator) error {
var err error
e.parser, err = parsers.NewParser(e.parserConfig)
if err != nil {
return fmt.Errorf("error creating parser: %w", err)
}
e.serializer, err = serializers.NewSerializer(e.serializerConfig)
if err != nil {
return fmt.Errorf("error creating serializer: %w", err)

View File

@ -20,6 +20,10 @@ func TestExternalProcessorWorks(t *testing.T) {
e := New()
e.Log = testutil.Logger{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
e.SetParser(parser)
exe, err := os.Executable()
require.NoError(t, err)
t.Log(exe)
@ -81,6 +85,10 @@ func TestParseLinesWithNewLines(t *testing.T) {
e := New()
e.Log = testutil.Logger{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
e.SetParser(parser)
exe, err := os.Executable()
require.NoError(t, err)
t.Log(exe)

View File

@ -5,8 +5,6 @@ import (
_ "embed"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/processors"
)
@ -14,7 +12,6 @@ import (
var sampleConfig string
type Parser struct {
parsers.Config
DropOriginal bool `toml:"drop_original"`
Merge string `toml:"merge"`
ParseFields []string `toml:"parse_fields"`
@ -27,17 +24,11 @@ func (*Parser) SampleConfig() string {
return sampleConfig
}
func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
if p.parser == nil {
var err error
p.parser, err = parsers.NewParser(&p.Config)
if err != nil {
p.Log.Errorf("could not create parser: %v", err)
return metrics
}
models.SetLoggerOnPlugin(p.parser, p.Log)
}
func (p *Parser) SetParser(parser telegraf.Parser) {
p.parser = parser
}
func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
results := []telegraf.Metric{}
for _, metric := range metrics {

View File

@ -6,10 +6,14 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/stretchr/testify/require"
//Blank import to register all new-style parsers
_ "github.com/influxdata/telegraf/plugins/parsers/all"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
"github.com/influxdata/telegraf/testutil"
)
@ -19,7 +23,7 @@ func TestApply(t *testing.T) {
name string
parseFields []string
parseTags []string
config parsers.Config
parser telegraf.Parser
dropOriginal bool
merge string
input telegraf.Metric
@ -29,8 +33,7 @@ func TestApply(t *testing.T) {
name: "parse one field drop original",
parseFields: []string{"sample"},
dropOriginal: true,
config: parsers.Config{
DataFormat: "json",
parser: &json.Parser{
TagKeys: []string{
"ts",
"lvl",
@ -65,8 +68,7 @@ func TestApply(t *testing.T) {
parseFields: []string{"sample"},
dropOriginal: false,
merge: "override",
config: parsers.Config{
DataFormat: "json",
parser: &json.Parser{
TagKeys: []string{
"ts",
"lvl",
@ -103,8 +105,7 @@ func TestApply(t *testing.T) {
name: "parse one field keep",
parseFields: []string{"sample"},
dropOriginal: false,
config: parsers.Config{
DataFormat: "json",
parser: &json.Parser{
TagKeys: []string{
"ts",
"lvl",
@ -144,11 +145,9 @@ func TestApply(t *testing.T) {
},
},
{
name: "parse one field keep with measurement name",
parseFields: []string{"message"},
config: parsers.Config{
DataFormat: "influx",
},
name: "parse one field keep with measurement name",
parseFields: []string{"message"},
parser: &influx.Parser{},
dropOriginal: false,
input: metric.New(
"influxField",
@ -181,9 +180,7 @@ func TestApply(t *testing.T) {
parseFields: []string{"message"},
dropOriginal: false,
merge: "override",
config: parsers.Config{
DataFormat: "influx",
},
parser: &influx.Parser{},
input: metric.New(
"influxField",
map[string]string{
@ -210,9 +207,8 @@ func TestApply(t *testing.T) {
name: "parse grok field",
parseFields: []string{"grokSample"},
dropOriginal: true,
config: parsers.Config{
DataFormat: "grok",
GrokPatterns: []string{"%{COMBINED_LOG_FORMAT}"},
parser: &grok.Parser{
Patterns: []string{"%{COMBINED_LOG_FORMAT}"},
},
input: metric.New(
"success",
@ -245,9 +241,8 @@ func TestApply(t *testing.T) {
name: "parse two fields [replace]",
parseFields: []string{"field_1", "field_2"},
dropOriginal: true,
config: parsers.Config{
DataFormat: "json",
TagKeys: []string{"lvl", "err"},
parser: &json.Parser{
TagKeys: []string{"lvl", "err"},
},
input: metric.New(
"bigMeasure",
@ -279,9 +274,8 @@ func TestApply(t *testing.T) {
parseFields: []string{"field_1", "field_2"},
dropOriginal: false,
merge: "override",
config: parsers.Config{
DataFormat: "json",
TagKeys: []string{"lvl", "msg", "err", "fatal"},
parser: &json.Parser{
TagKeys: []string{"lvl", "msg", "err", "fatal"},
},
input: metric.New(
"bigMeasure",
@ -311,9 +305,8 @@ func TestApply(t *testing.T) {
name: "parse two fields [keep]",
parseFields: []string{"field_1", "field_2"},
dropOriginal: false,
config: parsers.Config{
DataFormat: "json",
TagKeys: []string{"lvl", "msg", "err", "fatal"},
parser: &json.Parser{
TagKeys: []string{"lvl", "msg", "err", "fatal"},
},
input: metric.New(
"bigMeasure",
@ -354,9 +347,7 @@ func TestApply(t *testing.T) {
name: "parse one tag drop original",
parseTags: []string{"sample"},
dropOriginal: true,
config: parsers.Config{
DataFormat: "logfmt",
},
parser: &logfmt.Parser{},
input: metric.New(
"singleTag",
map[string]string{
@ -380,9 +371,7 @@ func TestApply(t *testing.T) {
parseTags: []string{"sample"},
dropOriginal: false,
merge: "override",
config: parsers.Config{
DataFormat: "logfmt",
},
parser: &logfmt.Parser{},
input: metric.New(
"singleTag",
map[string]string{
@ -408,9 +397,7 @@ func TestApply(t *testing.T) {
name: "parse one tag keep",
parseTags: []string{"sample"},
dropOriginal: false,
config: parsers.Config{
DataFormat: "logfmt",
},
parser: &logfmt.Parser{},
input: metric.New(
"singleTag",
map[string]string{
@ -441,9 +428,8 @@ func TestApply(t *testing.T) {
name: "Fail to parse one field but parses other [keep]",
parseFields: []string{"good", "bad"},
dropOriginal: false,
config: parsers.Config{
DataFormat: "json",
TagKeys: []string{"lvl"},
parser: &json.Parser{
TagKeys: []string{"lvl"},
},
input: metric.New(
"success",
@ -475,9 +461,8 @@ func TestApply(t *testing.T) {
name: "Fail to parse one field but parses other [keep] v2",
parseFields: []string{"bad", "good", "ok"},
dropOriginal: false,
config: parsers.Config{
DataFormat: "json",
TagKeys: []string{"lvl", "thing"},
parser: &json.Parser{
TagKeys: []string{"lvl", "thing"},
},
input: metric.New(
"success",
@ -519,9 +504,8 @@ func TestApply(t *testing.T) {
parseFields: []string{"good", "bad"},
dropOriginal: false,
merge: "override",
config: parsers.Config{
DataFormat: "json",
TagKeys: []string{"lvl"},
parser: &json.Parser{
TagKeys: []string{"lvl"},
},
input: metric.New(
"success",
@ -551,9 +535,8 @@ func TestApply(t *testing.T) {
name: "Fail to parse one field but parses other [replace]",
parseFields: []string{"good", "bad"},
dropOriginal: true,
config: parsers.Config{
DataFormat: "json",
TagKeys: []string{"lvl"},
parser: &json.Parser{
TagKeys: []string{"lvl"},
},
input: metric.New(
"success",
@ -579,16 +562,19 @@ func TestApply(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := Parser{
Config: tt.config,
if p, ok := tt.parser.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
plugin := Parser{
ParseFields: tt.parseFields,
ParseTags: tt.parseTags,
DropOriginal: tt.dropOriginal,
Merge: tt.merge,
Log: testutil.Logger{Name: "processor.parser"},
}
plugin.SetParser(tt.parser)
output := parser.Apply(tt.input)
output := plugin.Apply(tt.input)
t.Logf("Testing: %s", tt.name)
testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime())
})
@ -599,16 +585,14 @@ func TestBadApply(t *testing.T) {
tests := []struct {
name string
parseFields []string
config parsers.Config
parser telegraf.Parser
input telegraf.Metric
expected []telegraf.Metric
}{
{
name: "field not found",
parseFields: []string{"bad_field"},
config: parsers.Config{
DataFormat: "json",
},
parser: &json.Parser{},
input: metric.New(
"bad",
map[string]string{},
@ -629,9 +613,7 @@ func TestBadApply(t *testing.T) {
{
name: "non string field",
parseFields: []string{"some_field"},
config: parsers.Config{
DataFormat: "json",
},
parser: &json.Parser{},
input: metric.New(
"bad",
map[string]string{},
@ -653,14 +635,17 @@ func TestBadApply(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := Parser{
Config: tt.config,
if p, ok := tt.parser.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
plugin := Parser{
ParseFields: tt.parseFields,
Log: testutil.Logger{Name: "processor.parser"},
}
plugin.SetParser(tt.parser)
output := parser.Apply(tt.input)
output := plugin.Apply(tt.input)
testutil.RequireMetricsEqual(t, tt.expected, output, testutil.IgnoreTime())
})
}