chore(serializers): Cleanup after migration (#13378)

This commit is contained in:
Sven Rebhan 2023-06-01 20:10:06 +02:00 committed by GitHub
parent 9147afbeca
commit 4a8b1473f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 159 additions and 72 deletions

View File

@ -1069,7 +1069,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
if err != nil {
return err
}
processorBefore, hasParser, err := c.setupProcessor(processorBeforeConfig.Name, creator, table)
processorBefore, count, err := c.setupProcessor(processorBeforeConfig.Name, creator, table)
if err != nil {
return err
}
@ -1088,10 +1088,9 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
rf = models.NewRunningProcessor(processorAfter, processorAfterConfig)
c.fileAggProcessors = append(c.fileAggProcessors, &OrderedPlugin{table.Line, rf})
// Check the number of misses against the threshold
if hasParser {
missCountThreshold = 2
}
// Check the number of misses against the threshold. We need to double
// the count as the processor setup is executed twice.
missCountThreshold = 2 * count
for key, count := range missCount {
if count <= missCountThreshold {
continue
@ -1104,8 +1103,8 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
return nil
}
func (c *Config) setupProcessor(name string, creator processors.StreamingCreator, table *ast.Table) (telegraf.StreamingProcessor, bool, error) {
var hasParser bool
func (c *Config) setupProcessor(name string, creator processors.StreamingCreator, table *ast.Table) (telegraf.StreamingProcessor, int, error) {
var optionTestCount int
streamingProcessor := creator()
@ -1122,28 +1121,39 @@ func (c *Config) setupProcessor(name string, creator processors.StreamingCreator
if t, ok := processor.(telegraf.ParserPlugin); ok {
parser, err := c.addParser("processors", name, table)
if err != nil {
return nil, true, fmt.Errorf("adding parser failed: %w", err)
return nil, 0, fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
hasParser = true
optionTestCount++
}
if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
if !c.probeParser("processors", name, table) {
return nil, false, errors.New("parser not found")
return nil, 0, errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.addParser("processors", name, table)
})
hasParser = true
optionTestCount++
}
// If the (underlying) processor has a SetSerializer function it can accept
// arbitrary data-formats, so build the requested serializer and set it.
if t, ok := processor.(telegraf.SerializerPlugin); ok {
serializer, err := c.addSerializer(name, table)
if err != nil {
return nil, 0, fmt.Errorf("adding serializer failed: %w", err)
}
t.SetSerializer(serializer)
optionTestCount++
}
if err := c.toml.UnmarshalTable(table, processor); err != nil {
return nil, hasParser, fmt.Errorf("unmarshalling failed: %w", err)
return nil, 0, fmt.Errorf("unmarshalling failed: %w", err)
}
err := c.printUserDeprecation("processors", name, processor)
return streamingProcessor, hasParser, err
return streamingProcessor, optionTestCount, err
}
func (c *Config) addOutput(name string, table *ast.Table) error {
@ -1177,32 +1187,20 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
// arbitrary types of output, so build the serializer and set it.
if t, ok := output.(telegraf.SerializerPlugin); ok {
missThreshold = 1
if serializer, err := c.addSerializer(name, table); err == nil {
t.SetSerializer(serializer)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
serializer, err := c.buildSerializerOld(table)
if err != nil {
return err
}
t.SetSerializer(serializer)
serializer, err := c.addSerializer(name, table)
if err != nil {
return err
}
t.SetSerializer(serializer)
} else if t, ok := output.(serializers.SerializerOutput); ok {
// Keep the old interface for backward compatibility
// DEPRECATED: Please switch your plugin to telegraf.Serializers
missThreshold = 1
if serializer, err := c.addSerializer(name, table); err == nil {
t.SetSerializer(serializer)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
serializer, err := c.buildSerializerOld(table)
if err != nil {
return err
}
t.SetSerializer(serializer)
serializer, err := c.addSerializer(name, table)
if err != nil {
return err
}
t.SetSerializer(serializer)
}
outputConfig, err := c.buildOutput(name, table)
@ -1469,29 +1467,6 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
return cp, err
}
// buildSerializerOld grabs the necessary entries from the ast.Table for creating
// a serializers.Serializer object, and creates it, which can then be added onto
// an Output object.
func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error) {
sc := &serializers.Config{TimestampUnits: 1 * time.Second}
c.getFieldString(tbl, "data_format", &sc.DataFormat)
if sc.DataFormat == "" {
sc.DataFormat = "influx"
}
c.getFieldString(tbl, "prefix", &sc.Prefix)
c.getFieldString(tbl, "template", &sc.Template)
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
if c.hasErrs() {
return nil, c.firstErr()
}
return serializers.NewSerializer(sc)
}
// buildOutput parses output specific items from the ast.Table,
// builds the filter and returns a
// models.OutputConfig to be inserted into models.RunningInput
@ -1546,11 +1521,9 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
// Secret-store options to ignore
case "id":
// Parser options to ignore
// Parser and serializer options to ignore
case "data_type", "influx_parser_type":
// Serializer options to ignore
case "prefix", "template", "templates":
default:
c.unusedFieldsMutex.Lock()
c.UnusedFields[key] = true
@ -1572,6 +1545,7 @@ func (c *Config) setLocalMissingTomlFieldTracker(counter map[string]int) {
root = root || pt.Implements(reflect.TypeOf((*telegraf.Output)(nil)).Elem())
root = root || pt.Implements(reflect.TypeOf((*telegraf.Aggregator)(nil)).Elem())
root = root || pt.Implements(reflect.TypeOf((*telegraf.Processor)(nil)).Elem())
root = root || pt.Implements(reflect.TypeOf((*telegraf.StreamingProcessor)(nil)).Elem())
root = root || pt.Implements(reflect.TypeOf((*telegraf.Parser)(nil)).Elem())
root = root || pt.Implements(reflect.TypeOf((*telegraf.Serializer)(nil)).Elem())

View File

@ -27,19 +27,15 @@ type Execd struct {
RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger
parser telegraf.Parser
serializerConfig *serializers.Config
serializer serializers.Serializer
acc telegraf.Accumulator
process *process.Process
parser telegraf.Parser
serializer serializers.Serializer
acc telegraf.Accumulator
process *process.Process
}
func New() *Execd {
return &Execd{
RestartDelay: config.Duration(10 * time.Second),
serializerConfig: &serializers.Config{
DataFormat: "influx",
},
}
}
@ -47,18 +43,18 @@ func (e *Execd) SetParser(p telegraf.Parser) {
e.parser = p
}
func (e *Execd) SetSerializer(s telegraf.Serializer) {
e.serializer = s
}
func (*Execd) SampleConfig() string {
return sampleConfig
}
func (e *Execd) Start(acc telegraf.Accumulator) error {
var err error
e.serializer, err = serializers.NewSerializer(e.serializerConfig)
if err != nil {
return fmt.Errorf("error creating serializer: %w", err)
}
e.acc = acc
var err error
e.process, err = process.New(e.Command, e.Environment)
if err != nil {
return fmt.Errorf("error creating new process: %w", err)

View File

@ -5,14 +5,19 @@ import (
"flag"
"fmt"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
_ "github.com/influxdata/telegraf/plugins/parsers/all"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/processors"
_ "github.com/influxdata/telegraf/plugins/serializers/all"
influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -25,6 +30,10 @@ func TestExternalProcessorWorks(t *testing.T) {
require.NoError(t, parser.Init())
e.SetParser(parser)
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
e.SetSerializer(serializer)
exe, err := os.Executable()
require.NoError(t, err)
t.Log(exe)
@ -90,6 +99,10 @@ func TestParseLinesWithNewLines(t *testing.T) {
require.NoError(t, parser.Init())
e.SetParser(parser)
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
e.SetSerializer(serializer)
exe, err := os.Executable()
require.NoError(t, err)
t.Log(exe)
@ -199,3 +212,66 @@ func runCountMultiplierProgram() {
fmt.Fprint(os.Stdout, string(b))
}
}
func TestCases(t *testing.T) {
// Get all directories in testcases
folders, err := os.ReadDir("testcases")
require.NoError(t, err)
// Make sure tests contains data
require.NotEmpty(t, folders)
// Set up for file inputs
processors.AddStreaming("execd", func() telegraf.StreamingProcessor {
return New()
})
for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}
fname := f.Name()
t.Run(fname, func(t *testing.T) {
testdataPath := filepath.Join("testcases", fname)
configFilename := filepath.Join(testdataPath, "telegraf.conf")
inputFilename := filepath.Join(testdataPath, "input.influx")
expectedFilename := filepath.Join(testdataPath, "expected.out")
// Get parser to parse input and expected output
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input, err := testutil.ParseMetricsFromFile(inputFilename, parser)
require.NoError(t, err)
expected, err := testutil.ParseMetricsFromFile(expectedFilename, parser)
require.NoError(t, err)
// Configure the plugin
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfig(configFilename))
require.Len(t, cfg.Processors, 1, "wrong number of outputs")
plugin := cfg.Processors[0].Processor
// Process the metrics
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
for _, m := range input {
require.NoError(t, plugin.Add(m, &acc))
}
plugin.Stop()
require.Eventually(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond)
// Check the expectations
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual)
})
}
}

View File

@ -0,0 +1,5 @@
cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000123
cpu,cpu=cpu-total,host=Munin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000456
cpu,cpu=cpu-total,host=Thor usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000789
disk,device=nvme0n1p4,fstype=ext4,host=Hugin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000111
disk,device=nvme0n1p4,fstype=ext4,host=Munin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000222

View File

@ -0,0 +1,5 @@
cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000123
cpu,cpu=cpu-total,host=Munin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000456
cpu,cpu=cpu-total,host=Thor usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000789
disk,device=nvme0n1p4,fstype=ext4,host=Hugin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000111
disk,device=nvme0n1p4,fstype=ext4,host=Munin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000222

View File

@ -0,0 +1,3 @@
[[processors.execd]]
command = ["cat"]
data_format = "influx"

View File

@ -0,0 +1,5 @@
cpu fields_usage_guest=0,fields_usage_guest_nice=0,fields_usage_idle=99.75000000049295,fields_usage_iowait=0,fields_usage_irq=0.1250000000007958,fields_usage_nice=0,fields_usage_softirq=0,fields_usage_steal=0,fields_usage_system=0,fields_usage_user=0.12500000000363798 1678124473000000000
cpu fields_usage_guest=0,fields_usage_guest_nice=0,fields_usage_idle=99.75000000049295,fields_usage_iowait=0,fields_usage_irq=0.1250000000007958,fields_usage_nice=0,fields_usage_softirq=0,fields_usage_steal=0,fields_usage_system=0,fields_usage_user=0.12500000000363798 1678124473000000000
cpu fields_usage_guest=0,fields_usage_guest_nice=0,fields_usage_idle=99.75000000049295,fields_usage_iowait=0,fields_usage_irq=0.1250000000007958,fields_usage_nice=0,fields_usage_softirq=0,fields_usage_steal=0,fields_usage_system=0,fields_usage_user=0.12500000000363798 1678124473000000000
disk fields_free=65652391936,fields_inodes_free=40445279,fields_inodes_total=45047808,fields_inodes_used=4602529,fields_total=725328994304,fields_used=622756728832,fields_used_percent=90.4631722684 1678124473000000000
disk fields_free=65652391936,fields_inodes_free=40445279,fields_inodes_total=45047808,fields_inodes_used=4602529,fields_total=725328994304,fields_used=622756728832,fields_used_percent=90.4631722684 1678124473000000000

View File

@ -0,0 +1,5 @@
cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000123
cpu,cpu=cpu-total,host=Munin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000456
cpu,cpu=cpu-total,host=Thor usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000789
disk,device=nvme0n1p4,fstype=ext4,host=Hugin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000111
disk,device=nvme0n1p4,fstype=ext4,host=Munin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000222

View File

@ -0,0 +1,6 @@
[[processors.execd]]
command = ["cat"]
data_format = "json"
json_name_key = "name"
json_time_key = "timestamp"
json_time_format = "unix"

View File

@ -0,0 +1,5 @@
cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000123
cpu,cpu=cpu-total,host=Munin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000456
cpu,cpu=cpu-total,host=Thor usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000789
disk,device=nvme0n1p4,fstype=ext4,host=Hugin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000111
disk,device=nvme0n1p4,fstype=ext4,host=Munin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000222

View File

@ -0,0 +1,5 @@
cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000123
cpu,cpu=cpu-total,host=Munin usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000456
cpu,cpu=cpu-total,host=Thor usage_guest=0,usage_guest_nice=0,usage_idle=99.75000000049295,usage_iowait=0,usage_irq=0.1250000000007958,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0.12500000000363798 1678124473000000789
disk,device=nvme0n1p4,fstype=ext4,host=Hugin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000111
disk,device=nvme0n1p4,fstype=ext4,host=Munin,mode=rw,path=/ free=65652391936i,inodes_free=40445279i,inodes_total=45047808i,inodes_used=4602529i,total=725328994304i,used=622756728832i,used_percent=90.4631722684 1678124473000000222

View File

@ -0,0 +1,2 @@
[[processors.execd]]
command = ["cat"]