From 6a29dcba459ced8c37b28a84dce29bf85e821746 Mon Sep 17 00:00:00 2001 From: Sebastian Spaink <3441183+sspaink@users.noreply.github.com> Date: Tue, 8 Nov 2022 15:16:26 -0600 Subject: [PATCH] fix: Run processors in config order (#12113) --- config/config.go | 18 +++- config/config_test.go | 95 +++++++++++++++++++ .../processor_order/multiple_processors.toml | 7 ++ .../multiple_processors_messy_order.toml | 16 ++++ .../multiple_processors_simple_order.toml | 9 ++ docs/CONFIGURATION.md | 6 +- models/running_processor.go | 39 +++++++- 7 files changed, 180 insertions(+), 10 deletions(-) create mode 100644 config/testdata/processor_order/multiple_processors.toml create mode 100644 config/testdata/processor_order/multiple_processors_messy_order.toml create mode 100644 config/testdata/processor_order/multiple_processors_simple_order.toml diff --git a/config/config.go b/config/config.go index 3eb07600b..b0b18c322 100644 --- a/config/config.go +++ b/config/config.go @@ -21,6 +21,7 @@ import ( "time" "github.com/coreos/go-semver/semver" + "github.com/google/uuid" "github.com/influxdata/toml" "github.com/influxdata/toml/ast" @@ -396,6 +397,9 @@ func (c *Config) LoadAll(configFiles ...string) error { // LoadConfigData loads TOML-formatted config data func (c *Config) LoadConfigData(data []byte) error { + // Create unique identifier for plugins to identify when using multiple configurations + id := uuid.New() + tbl, err := parseConfig(data) if err != nil { return fmt.Errorf("error parsing data: %s", err) @@ -505,7 +509,7 @@ func (c *Config) LoadConfigData(data []byte) error { switch pluginSubTable := pluginVal.(type) { case []*ast.Table: for _, t := range pluginSubTable { - if err = c.addProcessor(pluginName, t); err != nil { + if err = c.addProcessor(id.String(), pluginName, t); err != nil { return fmt.Errorf("error parsing %s, %w", pluginName, err) } } @@ -746,7 +750,7 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) return running, err } -func (c *Config) addProcessor(name string, table *ast.Table) error { +func (c *Config) addProcessor(id string, name string, table *ast.Table) error { creator, ok := processors.Processors[name] if !ok { // Handle removed, deprecated plugins @@ -768,7 +772,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { c.setLocalMissingTomlFieldTracker(missCount) defer c.resetMissingTomlFieldTracker() - processorConfig, err := c.buildProcessor(name, table) + processorConfig, err := c.buildProcessor(id, name, table) if err != nil { return err } @@ -1062,8 +1066,12 @@ func (c *Config) buildParser(name string, tbl *ast.Table) *models.ParserConfig { // buildProcessor parses Processor specific items from the ast.Table, // builds the filter and returns a // models.ProcessorConfig to be inserted into models.RunningProcessor -func (c *Config) buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) { - conf := &models.ProcessorConfig{Name: name} +func (c *Config) buildProcessor(id string, name string, tbl *ast.Table) (*models.ProcessorConfig, error) { + conf := &models.ProcessorConfig{ + ID: id, + Name: name, + Line: tbl.Line, + } c.getFieldInt64(tbl, "order", &conf.Order) c.getFieldString(tbl, "alias", &conf.Alias) diff --git a/config/config_test.go b/config/config_test.go index 609b77d71..03d3d5853 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -728,6 +728,101 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) { } } +func TestConfig_MultipleProcessorsOrder(t *testing.T) { + tests := []struct { + name string + filename []string + expectedOrder []string + }{ + { + name: "Test the order of multiple unique processosr", + filename: []string{"multiple_processors.toml"}, + expectedOrder: []string{ + "processor", + "parser_test", + "processor_parser", + "processor_parserfunc", + }, + }, + { + name: "Test using a single 'order' configuration", + filename: []string{"multiple_processors_simple_order.toml"}, + expectedOrder: []string{ + "parser_test", + "processor_parser", + "processor_parserfunc", + "processor", + }, + }, + { + name: "Test using multiple 'order' configurations", + filename: []string{"multiple_processors_messy_order.toml"}, + expectedOrder: []string{ + "parser_test", + "processor_parserfunc", + "processor", + "processor_parser", + "processor_parser", + "processor_parserfunc", + }, + }, + { + name: "Test loading multiple configuration files", + filename: []string{ + "multiple_processors.toml", + "multiple_processors_simple_order.toml", + }, + expectedOrder: []string{ + "processor", + "parser_test", + "processor_parser", + "processor_parserfunc", + "parser_test", + "processor_parser", + "processor_parserfunc", + "processor", + }, + }, + { + name: "Test loading multiple configuration files both with order", + filename: []string{ + "multiple_processors_simple_order.toml", + "multiple_processors_messy_order.toml", + }, + expectedOrder: []string{ + "parser_test", + "processor_parser", + "processor_parserfunc", + "parser_test", + "processor_parserfunc", + "processor", + "processor", + "processor_parser", + "processor_parser", + "processor_parserfunc", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c := NewConfig() + for _, f := range test.filename { + require.NoError(t, c.LoadConfig(filepath.Join("./testdata/processor_order", f))) + } + + require.Equal(t, len(test.expectedOrder), len(c.Processors)) + + var order []string + for _, p := range c.Processors { + order = append(order, p.Config.Name) + } + + require.Equal(t, test.expectedOrder, order) + }) + } +} + func TestConfig_ProcessorsWithParsers(t *testing.T) { formats := []string{ "collectd", diff --git a/config/testdata/processor_order/multiple_processors.toml b/config/testdata/processor_order/multiple_processors.toml new file mode 100644 index 000000000..9a16cb790 --- /dev/null +++ b/config/testdata/processor_order/multiple_processors.toml @@ -0,0 +1,7 @@ +[[processors.processor]] + +[[processors.parser_test]] + +[[processors.processor_parser]] + +[[processors.processor_parserfunc]] diff --git a/config/testdata/processor_order/multiple_processors_messy_order.toml b/config/testdata/processor_order/multiple_processors_messy_order.toml new file mode 100644 index 000000000..0abb84c22 --- /dev/null +++ b/config/testdata/processor_order/multiple_processors_messy_order.toml @@ -0,0 +1,16 @@ +[[processors.parser_test]] + +[[processors.processor_parser]] + order = 2 + +[[processors.processor_parserfunc]] + +[[processors.processor]] + order = 1 + +[[processors.processor_parser]] + order = 3 + +[[processors.processor_parserfunc]] + order = 3 + diff --git a/config/testdata/processor_order/multiple_processors_simple_order.toml b/config/testdata/processor_order/multiple_processors_simple_order.toml new file mode 100644 index 000000000..521c97e6c --- /dev/null +++ b/config/testdata/processor_order/multiple_processors_simple_order.toml @@ -0,0 +1,9 @@ +[[processors.parser_test]] + +[[processors.processor_parser]] + +[[processors.processor_parserfunc]] + +[[processors.processor]] + order = 1 + diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 07ccfd4cd..642b6e959 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -429,8 +429,10 @@ input plugins and before any aggregator plugins. Parameters that can be used with any processor plugin: - **alias**: Name an instance of a plugin. -- **order**: The order in which the processor(s) are executed. If this is not - specified then processor execution order will be random. +- **order**: The order in which the processor(s) are executed. starting with 1. + If this is not specified then processor execution order will be the order in + the config. Processors without "order" will take precedence over those + with a defined order. The [metric filtering][] parameters can be used to limit what metrics are handled by the processor. Excluded metrics are passed downstream to the next diff --git a/models/running_processor.go b/models/running_processor.go index 0e4385741..52fc40f60 100644 --- a/models/running_processor.go +++ b/models/running_processor.go @@ -16,15 +16,48 @@ type RunningProcessor struct { type RunningProcessors []*RunningProcessor -func (rp RunningProcessors) Len() int { return len(rp) } -func (rp RunningProcessors) Swap(i, j int) { rp[i], rp[j] = rp[j], rp[i] } -func (rp RunningProcessors) Less(i, j int) bool { return rp[i].Config.Order < rp[j].Config.Order } +func (rp RunningProcessors) Len() int { + return len(rp) +} +func (rp RunningProcessors) Swap(i, j int) { + rp[i], rp[j] = rp[j], rp[i] +} +func (rp RunningProcessors) Less(i, j int) bool { + // If the processors are defined in separate files only sort based on order + if rp[i].Config.ID != rp[j].Config.ID { + return rp[i].Config.Order < rp[j].Config.Order + } + + // If Order is defined for both processors, sort according to the number set + if rp[i].Config.Order != 0 && rp[j].Config.Order != 0 { + // If both orders are equal, ensure config order is maintained + if rp[i].Config.Order == rp[j].Config.Order { + return rp[i].Config.Line < rp[j].Config.Line + } + + return rp[i].Config.Order < rp[j].Config.Order + } + + // If "Order" is defined for one processor but not another, + // the processor without an "Order" will always take precedence. + // This adheres to the original implementation. + if rp[i].Config.Order != 0 { + return false + } + if rp[j].Config.Order != 0 { + return true + } + + return rp[i].Config.Line < rp[j].Config.Line +} // ProcessorConfig containing a name and filter type ProcessorConfig struct { + ID string Name string Alias string Order int64 + Line int Filter Filter }