fix(processors): Correctly setup processors (#12081)

This commit is contained in:
Sven Rebhan 2022-10-24 21:21:24 +02:00 committed by GitHub
parent 4b78088979
commit 87125f0524
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 45 additions and 45 deletions

View File

@ -754,7 +754,6 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
}
return fmt.Errorf("undefined but requested processor: %s", name)
}
streamingProcessor := creator()
// For processors with parsers we need to compute the set of
// options that is not covered by both, the parser and the processor.
@ -772,51 +771,26 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
return err
}
var processor interface{}
processor = streamingProcessor
if p, ok := streamingProcessor.(unwrappable); ok {
processor = p.Unwrap()
}
// If the (underlying) processor has a SetParser or SetParserFunc function,
// it can accept arbitrary data-formats, so build the requested parser and
// set it.
if t, ok := processor.(telegraf.ParserPlugin); ok {
missCountThreshold = 2
parser, err := c.addParser("processors", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
}
if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
missCountThreshold = 2
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.addParser("processors", name, table)
})
}
// Set up the processor
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
// Setup the processor running before the aggregators
processorBefore, hasParser, err := c.setupProcessor(processorConfig.Name, creator, table)
if err != nil {
return err
}
rf := models.NewRunningProcessor(streamingProcessor, processorConfig)
rf := models.NewRunningProcessor(processorBefore, processorConfig)
c.Processors = append(c.Processors, rf)
// Save a copy for the aggregator
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
// Setup another (new) processor instance running after the aggregator
processorAfter, _, err := c.setupProcessor(processorConfig.Name, creator, table)
if err != nil {
return err
}
rf = models.NewRunningProcessor(streamingProcessor, processorConfig)
rf = models.NewRunningProcessor(processorAfter, processorConfig)
c.AggProcessors = append(c.AggProcessors, rf)
// Check the number of misses against the threshold
if hasParser {
missCountThreshold = 2
}
for key, count := range missCount {
if count <= missCountThreshold {
continue
@ -829,20 +803,46 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
return nil
}
func (c *Config) setupProcessorOptions(name string, processor telegraf.StreamingProcessor, table *ast.Table) error {
if p, ok := processor.(unwrappable); ok {
unwrapped := p.Unwrap()
if err := c.toml.UnmarshalTable(table, unwrapped); err != nil {
return fmt.Errorf("unmarshalling unwrappable failed: %w", err)
func (c *Config) setupProcessor(name string, creator processors.StreamingCreator, table *ast.Table) (telegraf.StreamingProcessor, bool, error) {
var hasParser bool
streamingProcessor := creator()
var processor interface{}
if p, ok := streamingProcessor.(unwrappable); ok {
processor = p.Unwrap()
} else {
processor = streamingProcessor
}
// If the (underlying) processor has a SetParser or SetParserFunc function,
// it can accept arbitrary data-formats, so build the requested parser and
// set it.
if t, ok := processor.(telegraf.ParserPlugin); ok {
parser, err := c.addParser("processors", name, table)
if err != nil {
return nil, true, fmt.Errorf("adding parser failed: %w", err)
}
return c.printUserDeprecation("processors", name, unwrapped)
t.SetParser(parser)
hasParser = true
}
if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
if !c.probeParser(table) {
return nil, false, errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.addParser("processors", name, table)
})
hasParser = true
}
if err := c.toml.UnmarshalTable(table, processor); err != nil {
return fmt.Errorf("unmarshalling failed: %w", err)
return nil, hasParser, fmt.Errorf("unmarshalling failed: %w", err)
}
return c.printUserDeprecation("processors", name, processor)
err := c.printUserDeprecation("processors", name, processor)
return streamingProcessor, hasParser, err
}
func (c *Config) addOutput(name string, table *ast.Table) error {