fix: Run processors in config order (#12113)

This commit is contained in:
Sebastian Spaink 2022-11-08 15:16:26 -06:00 committed by GitHub
parent 6390d31eb0
commit 6a29dcba45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 10 deletions

View File

@ -21,6 +21,7 @@ import (
"time"
"github.com/coreos/go-semver/semver"
"github.com/google/uuid"
"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"
@ -396,6 +397,9 @@ func (c *Config) LoadAll(configFiles ...string) error {
// LoadConfigData loads TOML-formatted config data
func (c *Config) LoadConfigData(data []byte) error {
// Create unique identifier for plugins to identify when using multiple configurations
id := uuid.New()
tbl, err := parseConfig(data)
if err != nil {
return fmt.Errorf("error parsing data: %s", err)
@ -505,7 +509,7 @@ func (c *Config) LoadConfigData(data []byte) error {
switch pluginSubTable := pluginVal.(type) {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addProcessor(pluginName, t); err != nil {
if err = c.addProcessor(id.String(), pluginName, t); err != nil {
return fmt.Errorf("error parsing %s, %w", pluginName, err)
}
}
@ -746,7 +750,7 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
return running, err
}
func (c *Config) addProcessor(name string, table *ast.Table) error {
func (c *Config) addProcessor(id string, name string, table *ast.Table) error {
creator, ok := processors.Processors[name]
if !ok {
// Handle removed, deprecated plugins
@ -768,7 +772,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()
processorConfig, err := c.buildProcessor(name, table)
processorConfig, err := c.buildProcessor(id, name, table)
if err != nil {
return err
}
@ -1062,8 +1066,12 @@ func (c *Config) buildParser(name string, tbl *ast.Table) *models.ParserConfig {
// buildProcessor parses Processor specific items from the ast.Table,
// builds the filter and returns a
// models.ProcessorConfig to be inserted into models.RunningProcessor
func (c *Config) buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
conf := &models.ProcessorConfig{Name: name}
func (c *Config) buildProcessor(id string, name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
conf := &models.ProcessorConfig{
ID: id,
Name: name,
Line: tbl.Line,
}
c.getFieldInt64(tbl, "order", &conf.Order)
c.getFieldString(tbl, "alias", &conf.Alias)

View File

@ -728,6 +728,101 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
}
}
func TestConfig_MultipleProcessorsOrder(t *testing.T) {
tests := []struct {
name string
filename []string
expectedOrder []string
}{
{
name: "Test the order of multiple unique processosr",
filename: []string{"multiple_processors.toml"},
expectedOrder: []string{
"processor",
"parser_test",
"processor_parser",
"processor_parserfunc",
},
},
{
name: "Test using a single 'order' configuration",
filename: []string{"multiple_processors_simple_order.toml"},
expectedOrder: []string{
"parser_test",
"processor_parser",
"processor_parserfunc",
"processor",
},
},
{
name: "Test using multiple 'order' configurations",
filename: []string{"multiple_processors_messy_order.toml"},
expectedOrder: []string{
"parser_test",
"processor_parserfunc",
"processor",
"processor_parser",
"processor_parser",
"processor_parserfunc",
},
},
{
name: "Test loading multiple configuration files",
filename: []string{
"multiple_processors.toml",
"multiple_processors_simple_order.toml",
},
expectedOrder: []string{
"processor",
"parser_test",
"processor_parser",
"processor_parserfunc",
"parser_test",
"processor_parser",
"processor_parserfunc",
"processor",
},
},
{
name: "Test loading multiple configuration files both with order",
filename: []string{
"multiple_processors_simple_order.toml",
"multiple_processors_messy_order.toml",
},
expectedOrder: []string{
"parser_test",
"processor_parser",
"processor_parserfunc",
"parser_test",
"processor_parserfunc",
"processor",
"processor",
"processor_parser",
"processor_parser",
"processor_parserfunc",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := NewConfig()
for _, f := range test.filename {
require.NoError(t, c.LoadConfig(filepath.Join("./testdata/processor_order", f)))
}
require.Equal(t, len(test.expectedOrder), len(c.Processors))
var order []string
for _, p := range c.Processors {
order = append(order, p.Config.Name)
}
require.Equal(t, test.expectedOrder, order)
})
}
}
func TestConfig_ProcessorsWithParsers(t *testing.T) {
formats := []string{
"collectd",

View File

@ -0,0 +1,7 @@
[[processors.processor]]
[[processors.parser_test]]
[[processors.processor_parser]]
[[processors.processor_parserfunc]]

View File

@ -0,0 +1,16 @@
[[processors.parser_test]]
[[processors.processor_parser]]
order = 2
[[processors.processor_parserfunc]]
[[processors.processor]]
order = 1
[[processors.processor_parser]]
order = 3
[[processors.processor_parserfunc]]
order = 3

View File

@ -0,0 +1,9 @@
[[processors.parser_test]]
[[processors.processor_parser]]
[[processors.processor_parserfunc]]
[[processors.processor]]
order = 1

View File

@ -429,8 +429,10 @@ input plugins and before any aggregator plugins.
Parameters that can be used with any processor plugin:
- **alias**: Name an instance of a plugin.
- **order**: The order in which the processor(s) are executed. If this is not
specified then processor execution order will be random.
- **order**: The order in which the processor(s) are executed. starting with 1.
If this is not specified then processor execution order will be the order in
the config. Processors without "order" will take precedence over those
with a defined order.
The [metric filtering][] parameters can be used to limit what metrics are
handled by the processor. Excluded metrics are passed downstream to the next

View File

@ -16,15 +16,48 @@ type RunningProcessor struct {
type RunningProcessors []*RunningProcessor
func (rp RunningProcessors) Len() int { return len(rp) }
func (rp RunningProcessors) Swap(i, j int) { rp[i], rp[j] = rp[j], rp[i] }
func (rp RunningProcessors) Less(i, j int) bool { return rp[i].Config.Order < rp[j].Config.Order }
func (rp RunningProcessors) Len() int {
return len(rp)
}
func (rp RunningProcessors) Swap(i, j int) {
rp[i], rp[j] = rp[j], rp[i]
}
func (rp RunningProcessors) Less(i, j int) bool {
// If the processors are defined in separate files only sort based on order
if rp[i].Config.ID != rp[j].Config.ID {
return rp[i].Config.Order < rp[j].Config.Order
}
// If Order is defined for both processors, sort according to the number set
if rp[i].Config.Order != 0 && rp[j].Config.Order != 0 {
// If both orders are equal, ensure config order is maintained
if rp[i].Config.Order == rp[j].Config.Order {
return rp[i].Config.Line < rp[j].Config.Line
}
return rp[i].Config.Order < rp[j].Config.Order
}
// If "Order" is defined for one processor but not another,
// the processor without an "Order" will always take precedence.
// This adheres to the original implementation.
if rp[i].Config.Order != 0 {
return false
}
if rp[j].Config.Order != 0 {
return true
}
return rp[i].Config.Line < rp[j].Config.Line
}
// ProcessorConfig containing a name and filter
type ProcessorConfig struct {
ID string
Name string
Alias string
Order int64
Line int
Filter Filter
}