diff --git a/config/config.go b/config/config.go index 86a74ab95..0289cb835 100644 --- a/config/config.go +++ b/config/config.go @@ -1069,7 +1069,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { if err != nil { return err } - processorBefore, hasParser, err := c.setupProcessor(processorBeforeConfig.Name, creator, table) + processorBefore, count, err := c.setupProcessor(processorBeforeConfig.Name, creator, table) if err != nil { return err } @@ -1088,10 +1088,9 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { rf = models.NewRunningProcessor(processorAfter, processorAfterConfig) c.fileAggProcessors = append(c.fileAggProcessors, &OrderedPlugin{table.Line, rf}) - // Check the number of misses against the threshold - if hasParser { - missCountThreshold = 2 - } + // Check the number of misses against the threshold. We need to double + // the count as the processor setup is executed twice. + missCountThreshold = 2 * count for key, count := range missCount { if count <= missCountThreshold { continue @@ -1104,8 +1103,8 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { return nil } -func (c *Config) setupProcessor(name string, creator processors.StreamingCreator, table *ast.Table) (telegraf.StreamingProcessor, bool, error) { - var hasParser bool +func (c *Config) setupProcessor(name string, creator processors.StreamingCreator, table *ast.Table) (telegraf.StreamingProcessor, int, error) { + var optionTestCount int streamingProcessor := creator() @@ -1122,28 +1121,39 @@ func (c *Config) setupProcessor(name string, creator processors.StreamingCreator if t, ok := processor.(telegraf.ParserPlugin); ok { parser, err := c.addParser("processors", name, table) if err != nil { - return nil, true, fmt.Errorf("adding parser failed: %w", err) + return nil, 0, fmt.Errorf("adding parser failed: %w", err) } t.SetParser(parser) - hasParser = true + optionTestCount++ } if t, ok := processor.(telegraf.ParserFuncPlugin); ok { if !c.probeParser("processors", name, table) { - return nil, false, errors.New("parser not found") + return nil, 0, errors.New("parser not found") } t.SetParserFunc(func() (telegraf.Parser, error) { return c.addParser("processors", name, table) }) - hasParser = true + optionTestCount++ + } + + // If the (underlying) processor has a SetSerializer function it can accept + // arbitrary data-formats, so build the requested serializer and set it. + if t, ok := processor.(telegraf.SerializerPlugin); ok { + serializer, err := c.addSerializer(name, table) + if err != nil { + return nil, 0, fmt.Errorf("adding serializer failed: %w", err) + } + t.SetSerializer(serializer) + optionTestCount++ } if err := c.toml.UnmarshalTable(table, processor); err != nil { - return nil, hasParser, fmt.Errorf("unmarshalling failed: %w", err) + return nil, 0, fmt.Errorf("unmarshalling failed: %w", err) } err := c.printUserDeprecation("processors", name, processor) - return streamingProcessor, hasParser, err + return streamingProcessor, optionTestCount, err } func (c *Config) addOutput(name string, table *ast.Table) error { @@ -1177,32 +1187,20 @@ func (c *Config) addOutput(name string, table *ast.Table) error { // arbitrary types of output, so build the serializer and set it. if t, ok := output.(telegraf.SerializerPlugin); ok { missThreshold = 1 - if serializer, err := c.addSerializer(name, table); err == nil { - t.SetSerializer(serializer) - } else { - missThreshold = 0 - // Fallback to the old way of instantiating the parsers. - serializer, err := c.buildSerializerOld(table) - if err != nil { - return err - } - t.SetSerializer(serializer) + serializer, err := c.addSerializer(name, table) + if err != nil { + return err } + t.SetSerializer(serializer) } else if t, ok := output.(serializers.SerializerOutput); ok { // Keep the old interface for backward compatibility // DEPRECATED: Please switch your plugin to telegraf.Serializers missThreshold = 1 - if serializer, err := c.addSerializer(name, table); err == nil { - t.SetSerializer(serializer) - } else { - missThreshold = 0 - // Fallback to the old way of instantiating the parsers. - serializer, err := c.buildSerializerOld(table) - if err != nil { - return err - } - t.SetSerializer(serializer) + serializer, err := c.addSerializer(name, table) + if err != nil { + return err } + t.SetSerializer(serializer) } outputConfig, err := c.buildOutput(name, table) @@ -1469,29 +1467,6 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e return cp, err } -// buildSerializerOld grabs the necessary entries from the ast.Table for creating -// a serializers.Serializer object, and creates it, which can then be added onto -// an Output object. -func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error) { - sc := &serializers.Config{TimestampUnits: 1 * time.Second} - - c.getFieldString(tbl, "data_format", &sc.DataFormat) - - if sc.DataFormat == "" { - sc.DataFormat = "influx" - } - - c.getFieldString(tbl, "prefix", &sc.Prefix) - c.getFieldString(tbl, "template", &sc.Template) - c.getFieldStringSlice(tbl, "templates", &sc.Templates) - - if c.hasErrs() { - return nil, c.firstErr() - } - - return serializers.NewSerializer(sc) -} - // buildOutput parses output specific items from the ast.Table, // builds the filter and returns a // models.OutputConfig to be inserted into models.RunningInput @@ -1546,11 +1521,9 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { // Secret-store options to ignore case "id": - // Parser options to ignore + // Parser and serializer options to ignore case "data_type", "influx_parser_type": - // Serializer options to ignore - case "prefix", "template", "templates": default: c.unusedFieldsMutex.Lock() c.UnusedFields[key] = true @@ -1572,6 +1545,7 @@ func (c *Config) setLocalMissingTomlFieldTracker(counter map[string]int) { root = root || pt.Implements(reflect.TypeOf((*telegraf.Output)(nil)).Elem()) root = root || pt.Implements(reflect.TypeOf((*telegraf.Aggregator)(nil)).Elem()) root = root || pt.Implements(reflect.TypeOf((*telegraf.Processor)(nil)).Elem()) + root = root || pt.Implements(reflect.TypeOf((*telegraf.StreamingProcessor)(nil)).Elem()) root = root || pt.Implements(reflect.TypeOf((*telegraf.Parser)(nil)).Elem()) root = root || pt.Implements(reflect.TypeOf((*telegraf.Serializer)(nil)).Elem()) diff --git a/plugins/processors/execd/execd.go b/plugins/processors/execd/execd.go index a9ba34fa3..2fa765eb8 100644 --- a/plugins/processors/execd/execd.go +++ b/plugins/processors/execd/execd.go @@ -27,19 +27,15 @@ type Execd struct { RestartDelay config.Duration `toml:"restart_delay"` Log telegraf.Logger - parser telegraf.Parser - serializerConfig *serializers.Config - serializer serializers.Serializer - acc telegraf.Accumulator - process *process.Process + parser telegraf.Parser + serializer serializers.Serializer + acc telegraf.Accumulator + process *process.Process } func New() *Execd { return &Execd{ RestartDelay: config.Duration(10 * time.Second), - serializerConfig: &serializers.Config{ - DataFormat: "influx", - }, } } @@ -47,18 +43,18 @@ func (e *Execd) SetParser(p telegraf.Parser) { e.parser = p } +func (e *Execd) SetSerializer(s telegraf.Serializer) { + e.serializer = s +} + func (*Execd) SampleConfig() string { return sampleConfig } func (e *Execd) Start(acc telegraf.Accumulator) error { - var err error - e.serializer, err = serializers.NewSerializer(e.serializerConfig) - if err != nil { - return fmt.Errorf("error creating serializer: %w", err) - } e.acc = acc + var err error e.process, err = process.New(e.Command, e.Environment) if err != nil { return fmt.Errorf("error creating new process: %w", err) diff --git a/plugins/processors/execd/execd_test.go b/plugins/processors/execd/execd_test.go index 339bd9146..b8b57e73a 100644 --- a/plugins/processors/execd/execd_test.go +++ b/plugins/processors/execd/execd_test.go @@ -5,14 +5,19 @@ import ( "flag" "fmt" "os" + "path/filepath" "testing" "time" "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" + _ "github.com/influxdata/telegraf/plugins/parsers/all" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/processors" + _ "github.com/influxdata/telegraf/plugins/serializers/all" influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -25,6 +30,10 @@ func TestExternalProcessorWorks(t *testing.T) { require.NoError(t, parser.Init()) e.SetParser(parser) + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) + e.SetSerializer(serializer) + exe, err := os.Executable() require.NoError(t, err) t.Log(exe) @@ -90,6 +99,10 @@ func TestParseLinesWithNewLines(t *testing.T) { require.NoError(t, parser.Init()) e.SetParser(parser) + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) + e.SetSerializer(serializer) + exe, err := os.Executable() require.NoError(t, err) t.Log(exe) @@ -199,3 +212,66 @@ func runCountMultiplierProgram() { fmt.Fprint(os.Stdout, string(b)) } } + +func TestCases(t *testing.T) { + // Get all directories in testcases + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + // Make sure tests contains data + require.NotEmpty(t, folders) + + // Set up for file inputs + processors.AddStreaming("execd", func() telegraf.StreamingProcessor { + return New() + }) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + fname := f.Name() + t.Run(fname, func(t *testing.T) { + testdataPath := filepath.Join("testcases", fname) + configFilename := filepath.Join(testdataPath, "telegraf.conf") + inputFilename := filepath.Join(testdataPath, "input.influx") + expectedFilename := filepath.Join(testdataPath, "expected.out") + + // Get parser to parse input and expected output + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + input, err := testutil.ParseMetricsFromFile(inputFilename, parser) + require.NoError(t, err) + + expected, err := testutil.ParseMetricsFromFile(expectedFilename, parser) + require.NoError(t, err) + + // Configure the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(configFilename)) + require.Len(t, cfg.Processors, 1, "wrong number of outputs") + plugin := cfg.Processors[0].Processor + + // Process the metrics + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + for _, m := range input { + require.NoError(t, plugin.Add(m, &acc)) + } + plugin.Stop() + + require.Eventually(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond) + + // Check the expectations + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual) + }) + } +} diff --git a/plugins/processors/execd/testcases/dataformat-influx/expected.out b/plugins/processors/execd/testcases/dataformat-influx/expected.out new file mode 100644 index 000000000..44779c033 --- /dev/null +++ b/plugins/processors/execd/testcases/dataformat-influx/expected.out @@ -0,0 +1,5 @@ +cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000123 +cpu,cpu=cpu-total,host=Munin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000456 +cpu,cpu=cpu-total,host=Thor usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000789 +disk,device=nvme0n1p4,fstype=ext4,host=Hugin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000111 +disk,device=nvme0n1p4,fstype=ext4,host=Munin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000222 \ No newline at end of file diff --git a/plugins/processors/execd/testcases/dataformat-influx/input.influx b/plugins/processors/execd/testcases/dataformat-influx/input.influx new file mode 100644 index 000000000..44779c033 --- /dev/null +++ b/plugins/processors/execd/testcases/dataformat-influx/input.influx @@ -0,0 +1,5 @@ +cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000123 +cpu,cpu=cpu-total,host=Munin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000456 +cpu,cpu=cpu-total,host=Thor usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000789 +disk,device=nvme0n1p4,fstype=ext4,host=Hugin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000111 +disk,device=nvme0n1p4,fstype=ext4,host=Munin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000222 \ No newline at end of file diff --git a/plugins/processors/execd/testcases/dataformat-influx/telegraf.conf b/plugins/processors/execd/testcases/dataformat-influx/telegraf.conf new file mode 100644 index 000000000..b72c11bf3 --- /dev/null +++ b/plugins/processors/execd/testcases/dataformat-influx/telegraf.conf @@ -0,0 +1,3 @@ +[[processors.execd]] + command = ["cat"] + data_format = "influx" diff --git a/plugins/processors/execd/testcases/dataformat-json/expected.out b/plugins/processors/execd/testcases/dataformat-json/expected.out new file mode 100644 index 000000000..f2c7a0f3e --- /dev/null +++ b/plugins/processors/execd/testcases/dataformat-json/expected.out @@ -0,0 +1,5 @@ +cpu fields_usage_guest=0,fields_usage_guest_nice=0,fields_usage_idle=99.75000000049295,fields_usage_iowait=0,fields_usage_irq=0.1250000000007958,fields_usage_nice=0,fields_usage_softirq=0,fields_usage_steal=0,fields_usage_system=0,fields_usage_user=0.12500000000363798 1678124473000000000 +cpu fields_usage_guest=0,fields_usage_guest_nice=0,fields_usage_idle=99.75000000049295,fields_usage_iowait=0,fields_usage_irq=0.1250000000007958,fields_usage_nice=0,fields_usage_softirq=0,fields_usage_steal=0,fields_usage_system=0,fields_usage_user=0.12500000000363798 1678124473000000000 +cpu fields_usage_guest=0,fields_usage_guest_nice=0,fields_usage_idle=99.75000000049295,fields_usage_iowait=0,fields_usage_irq=0.1250000000007958,fields_usage_nice=0,fields_usage_softirq=0,fields_usage_steal=0,fields_usage_system=0,fields_usage_user=0.12500000000363798 1678124473000000000 +disk fields_free=65652391936,fields_inodes_free=40445279,fields_inodes_total=45047808,fields_inodes_used=4602529,fields_total=725328994304,fields_used=622756728832,fields_used_percent=90.4631722684 1678124473000000000 +disk fields_free=65652391936,fields_inodes_free=40445279,fields_inodes_total=45047808,fields_inodes_used=4602529,fields_total=725328994304,fields_used=622756728832,fields_used_percent=90.4631722684 1678124473000000000 \ No newline at end of file diff --git a/plugins/processors/execd/testcases/dataformat-json/input.influx b/plugins/processors/execd/testcases/dataformat-json/input.influx new file mode 100644 index 000000000..44779c033 --- /dev/null +++ b/plugins/processors/execd/testcases/dataformat-json/input.influx @@ -0,0 +1,5 @@ +cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000123 +cpu,cpu=cpu-total,host=Munin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000456 +cpu,cpu=cpu-total,host=Thor usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000789 +disk,device=nvme0n1p4,fstype=ext4,host=Hugin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000111 +disk,device=nvme0n1p4,fstype=ext4,host=Munin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000222 \ No newline at end of file diff --git a/plugins/processors/execd/testcases/dataformat-json/telegraf.conf b/plugins/processors/execd/testcases/dataformat-json/telegraf.conf new file mode 100644 index 000000000..18ce3e19d --- /dev/null +++ b/plugins/processors/execd/testcases/dataformat-json/telegraf.conf @@ -0,0 +1,6 @@ +[[processors.execd]] + command = ["cat"] + data_format = "json" + json_name_key = "name" + json_time_key = "timestamp" + json_time_format = "unix" diff --git a/plugins/processors/execd/testcases/defaults/expected.out b/plugins/processors/execd/testcases/defaults/expected.out new file mode 100644 index 000000000..44779c033 --- /dev/null +++ b/plugins/processors/execd/testcases/defaults/expected.out @@ -0,0 +1,5 @@ +cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000123 +cpu,cpu=cpu-total,host=Munin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000456 +cpu,cpu=cpu-total,host=Thor usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000789 +disk,device=nvme0n1p4,fstype=ext4,host=Hugin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000111 +disk,device=nvme0n1p4,fstype=ext4,host=Munin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000222 \ No newline at end of file diff --git a/plugins/processors/execd/testcases/defaults/input.influx b/plugins/processors/execd/testcases/defaults/input.influx new file mode 100644 index 000000000..44779c033 --- /dev/null +++ b/plugins/processors/execd/testcases/defaults/input.influx @@ -0,0 +1,5 @@ +cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000123 +cpu,cpu=cpu-total,host=Munin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000456 +cpu,cpu=cpu-total,host=Thor usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000789 +disk,device=nvme0n1p4,fstype=ext4,host=Hugin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000111 +disk,device=nvme0n1p4,fstype=ext4,host=Munin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000222 \ No newline at end of file diff --git a/plugins/processors/execd/testcases/defaults/telegraf.conf b/plugins/processors/execd/testcases/defaults/telegraf.conf new file mode 100644 index 000000000..55c666991 --- /dev/null +++ b/plugins/processors/execd/testcases/defaults/telegraf.conf @@ -0,0 +1,2 @@ +[[processors.execd]] + command = ["cat"]