diff --git a/agent/agent.go b/agent/agent.go index adf0a545c..4d5a2bfa4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -7,7 +7,6 @@ import ( "log" "os" "runtime" - "sort" "sync" "time" @@ -603,14 +602,16 @@ func (a *Agent) startProcessors( dst chan<- telegraf.Metric, processors models.RunningProcessors, ) (chan<- telegraf.Metric, []*processorUnit, error) { - // Sort from last to first - sort.SliceStable(processors, func(i, j int) bool { - return processors[i].Config.Order > processors[j].Config.Order - }) - var src chan telegraf.Metric units := make([]*processorUnit, 0, len(processors)) - for _, processor := range processors { + // The processor chain is constructed from the output side starting from + // the output(s) and walking the way back to the input(s). However, the + // processor-list is sorted by order and/or by appearance in the config, + // i.e. in input-to-output direction. Therefore, reverse the processor list + // to reflect the order/definition order in the processing chain. + for i := len(processors) - 1; i >= 0; i-- { + processor := processors[i] + src = make(chan telegraf.Metric, 100) acc := NewAccumulator(processor, dst) diff --git a/agent/agent_test.go b/agent/agent_test.go index 828f0dd75..c7d3fea98 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1,14 +1,26 @@ package agent import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/models" + _ "github.com/influxdata/telegraf/plugins/aggregators/all" _ "github.com/influxdata/telegraf/plugins/inputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all" + "github.com/influxdata/telegraf/plugins/parsers/influx" + _ "github.com/influxdata/telegraf/plugins/processors/all" + "github.com/influxdata/telegraf/testutil" ) func TestAgent_OmitHostname(t *testing.T) { @@ -165,3 +177,86 @@ func TestWindow(t *testing.T) { }) } } + +func TestCases(t *testing.T) { + // Get all directories in testcases + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + // Make sure tests contains data + require.NotEmpty(t, folders) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + fname := f.Name() + testdataPath := filepath.Join("testcases", fname) + configFilename := filepath.Join(testdataPath, "telegraf.conf") + expectedFilename := filepath.Join(testdataPath, "expected.out") + + t.Run(fname, func(t *testing.T) { + // Get parser to parse input and expected output + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + expected, err := testutil.ParseMetricsFromFile(expectedFilename, parser) + require.NoError(t, err) + require.NotEmpty(t, expected) + + // Load the config and inject the mock output to be able to verify + // the resulting metrics + cfg := config.NewConfig() + require.NoError(t, cfg.LoadAll(configFilename)) + require.Empty(t, cfg.Outputs, "No output(s) allowed in the config!") + + // Setup the agent and run the agent in "once" mode + agent := NewAgent(cfg) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + actual, err := collect(ctx, agent, 0) + require.NoError(t, err) + + // Process expected metrics and compare with resulting metrics + options := []cmp.Option{ + testutil.IgnoreTags("host"), + } + if expected[0].Time().IsZero() { + options = append(options, testutil.IgnoreTime()) + } + testutil.RequireMetricsEqual(t, expected, actual, options...) + }) + } +} + +// Implement a "test-mode" like call but collect the metrics +func collect(ctx context.Context, a *Agent, wait time.Duration) ([]telegraf.Metric, error) { + var received []telegraf.Metric + var mu sync.Mutex + + src := make(chan telegraf.Metric, 100) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for m := range src { + mu.Lock() + received = append(received, m) + mu.Unlock() + m.Reject() + } + }() + + if err := a.runTest(ctx, wait, src); err != nil { + return nil, err + } + wg.Wait() + + if models.GlobalGatherErrors.Get() != 0 { + return received, fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get()) + } + return received, nil +} diff --git a/agent/testcases/processor-order-appearance/expected.out b/agent/testcases/processor-order-appearance/expected.out new file mode 100644 index 000000000..6f41ac361 --- /dev/null +++ b/agent/testcases/processor-order-appearance/expected.out @@ -0,0 +1,2 @@ +new_metric_from_starlark,foo=bar baz=42i,timestamp="2023-07-13T12:53:54.197709713Z" 1689252834197709713 +old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000 diff --git a/agent/testcases/processor-order-appearance/input.influx b/agent/testcases/processor-order-appearance/input.influx new file mode 100644 index 000000000..2bcf853c7 --- /dev/null +++ b/agent/testcases/processor-order-appearance/input.influx @@ -0,0 +1 @@ +old_metric_from_mock,mood=good value=23i 1689253834000000000 diff --git a/agent/testcases/processor-order-appearance/telegraf.conf b/agent/testcases/processor-order-appearance/telegraf.conf new file mode 100644 index 000000000..6e4858450 --- /dev/null +++ b/agent/testcases/processor-order-appearance/telegraf.conf @@ -0,0 +1,26 @@ +# Test for using the appearance order in the file for processor order +[[inputs.file]] + files = ["testcases/processor-order-appearance/input.influx"] + data_format = "influx" + +[[processors.starlark]] + source = ''' +def apply(metric): + metrics = [] + + m = Metric("new_metric_from_starlark") + m.tags["foo"] = "bar" + m.fields["baz"] = 42 + m.time = 1689252834197709713 + metrics.append(m) + metrics.append(metric) + + return metrics +''' + +[[processors.date]] + field_key = "timestamp" + date_format = "2006-01-02T15:04:05.999999999Z" + timezone = "UTC" + + diff --git a/agent/testcases/processor-order-explicit/expected.out b/agent/testcases/processor-order-explicit/expected.out new file mode 100644 index 000000000..6f41ac361 --- /dev/null +++ b/agent/testcases/processor-order-explicit/expected.out @@ -0,0 +1,2 @@ +new_metric_from_starlark,foo=bar baz=42i,timestamp="2023-07-13T12:53:54.197709713Z" 1689252834197709713 +old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000 diff --git a/agent/testcases/processor-order-explicit/input.influx b/agent/testcases/processor-order-explicit/input.influx new file mode 100644 index 000000000..2bcf853c7 --- /dev/null +++ b/agent/testcases/processor-order-explicit/input.influx @@ -0,0 +1 @@ +old_metric_from_mock,mood=good value=23i 1689253834000000000 diff --git a/agent/testcases/processor-order-explicit/telegraf.conf b/agent/testcases/processor-order-explicit/telegraf.conf new file mode 100644 index 000000000..3729221f3 --- /dev/null +++ b/agent/testcases/processor-order-explicit/telegraf.conf @@ -0,0 +1,27 @@ +# Test for specifying an explicit processor order +[[inputs.file]] + files = ["testcases/processor-order-explicit/input.influx"] + data_format = "influx" + + +[[processors.date]] + field_key = "timestamp" + date_format = "2006-01-02T15:04:05.999999999Z" + timezone = "UTC" + order = 2 + +[[processors.starlark]] + source = ''' +def apply(metric): + metrics = [] + + m = Metric("new_metric_from_starlark") + m.tags["foo"] = "bar" + m.fields["baz"] = 42 + m.time = 1689252834197709713 + metrics.append(m) + metrics.append(metric) + + return metrics +''' + order = 1 diff --git a/agent/testcases/processor-order-mixed/expected.out b/agent/testcases/processor-order-mixed/expected.out new file mode 100644 index 000000000..6f41ac361 --- /dev/null +++ b/agent/testcases/processor-order-mixed/expected.out @@ -0,0 +1,2 @@ +new_metric_from_starlark,foo=bar baz=42i,timestamp="2023-07-13T12:53:54.197709713Z" 1689252834197709713 +old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000 diff --git a/agent/testcases/processor-order-mixed/input.influx b/agent/testcases/processor-order-mixed/input.influx new file mode 100644 index 000000000..2bcf853c7 --- /dev/null +++ b/agent/testcases/processor-order-mixed/input.influx @@ -0,0 +1 @@ +old_metric_from_mock,mood=good value=23i 1689253834000000000 diff --git a/agent/testcases/processor-order-mixed/telegraf.conf b/agent/testcases/processor-order-mixed/telegraf.conf new file mode 100644 index 000000000..5be9ba12f --- /dev/null +++ b/agent/testcases/processor-order-mixed/telegraf.conf @@ -0,0 +1,25 @@ +# Test for using the appearance order in the file for processor order +[[inputs.file]] + files = ["testcases/processor-order-appearance/input.influx"] + data_format = "influx" + +[[processors.starlark]] + source = ''' +def apply(metric): + metrics = [] + + m = Metric("new_metric_from_starlark") + m.tags["foo"] = "bar" + m.fields["baz"] = 42 + m.time = 1689252834197709713 + metrics.append(m) + metrics.append(metric) + + return metrics +''' + +[[processors.date]] + field_key = "timestamp" + date_format = "2006-01-02T15:04:05.999999999Z" + timezone = "UTC" + order = 1 diff --git a/agent/testcases/processor-order-no-starlark/expected.out b/agent/testcases/processor-order-no-starlark/expected.out new file mode 100644 index 000000000..c06a4cead --- /dev/null +++ b/agent/testcases/processor-order-no-starlark/expected.out @@ -0,0 +1,2 @@ +new_metric_from_starlark,foo=bar baz=42i 1689252834197709713 +old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000 diff --git a/agent/testcases/processor-order-no-starlark/input.influx b/agent/testcases/processor-order-no-starlark/input.influx new file mode 100644 index 000000000..2bcf853c7 --- /dev/null +++ b/agent/testcases/processor-order-no-starlark/input.influx @@ -0,0 +1 @@ +old_metric_from_mock,mood=good value=23i 1689253834000000000 diff --git a/agent/testcases/processor-order-no-starlark/telegraf.conf b/agent/testcases/processor-order-no-starlark/telegraf.conf new file mode 100644 index 000000000..0eb2e4362 --- /dev/null +++ b/agent/testcases/processor-order-no-starlark/telegraf.conf @@ -0,0 +1,26 @@ +# Test for using the appearance order in the file for processor order. +# This will not add the "timestamp" field as the starlark processor runs _after_ +# the date processor. +[[inputs.file]] + files = ["testcases/processor-order-no-starlark/input.influx"] + data_format = "influx" + +[[processors.date]] + field_key = "timestamp" + date_format = "2006-01-02T15:04:05.999999999Z" + timezone = "UTC" + +[[processors.starlark]] + source = ''' +def apply(metric): + metrics = [] + + m = Metric("new_metric_from_starlark") + m.tags["foo"] = "bar" + m.fields["baz"] = 42 + m.time = 1689252834197709713 + metrics.append(m) + metrics.append(metric) + + return metrics +'''