fix(parsers): Memory leak for plugins using ParserFunc. (#11815)
This commit is contained in:
parent
d637a665e8
commit
3b3584b40b
|
|
@ -200,13 +200,6 @@ func (a *Agent) initPlugins() error {
|
||||||
input.LogName(), err)
|
input.LogName(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, parser := range a.Config.Parsers {
|
|
||||||
err := parser.Init()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not initialize parser %s::%s: %v",
|
|
||||||
parser.Config.DataFormat, parser.Config.Parent, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, processor := range a.Config.Processors {
|
for _, processor := range a.Config.Processors {
|
||||||
err := processor.Init()
|
err := processor.Init()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -69,10 +69,11 @@ type Config struct {
|
||||||
Inputs []*models.RunningInput
|
Inputs []*models.RunningInput
|
||||||
Outputs []*models.RunningOutput
|
Outputs []*models.RunningOutput
|
||||||
Aggregators []*models.RunningAggregator
|
Aggregators []*models.RunningAggregator
|
||||||
Parsers []*models.RunningParser
|
|
||||||
// Processors have a slice wrapper type because they need to be sorted
|
// Processors have a slice wrapper type because they need to be sorted
|
||||||
Processors models.RunningProcessors
|
Processors models.RunningProcessors
|
||||||
AggProcessors models.RunningProcessors
|
AggProcessors models.RunningProcessors
|
||||||
|
// Parsers are created by their inputs during gather. Config doesn't keep track of them
|
||||||
|
// like the other plugins because they need to be garbage collected (See issue #11809)
|
||||||
|
|
||||||
Deprecations map[string][]int64
|
Deprecations map[string][]int64
|
||||||
version *semver.Version
|
version *semver.Version
|
||||||
|
|
@ -98,7 +99,6 @@ func NewConfig() *Config {
|
||||||
Tags: make(map[string]string),
|
Tags: make(map[string]string),
|
||||||
Inputs: make([]*models.RunningInput, 0),
|
Inputs: make([]*models.RunningInput, 0),
|
||||||
Outputs: make([]*models.RunningOutput, 0),
|
Outputs: make([]*models.RunningOutput, 0),
|
||||||
Parsers: make([]*models.RunningParser, 0),
|
|
||||||
Processors: make([]*models.RunningProcessor, 0),
|
Processors: make([]*models.RunningProcessor, 0),
|
||||||
AggProcessors: make([]*models.RunningProcessor, 0),
|
AggProcessors: make([]*models.RunningProcessor, 0),
|
||||||
InputFilters: make([]string, 0),
|
InputFilters: make([]string, 0),
|
||||||
|
|
@ -242,15 +242,6 @@ func (c *Config) AggregatorNames() []string {
|
||||||
return PluginNameCounts(name)
|
return PluginNameCounts(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParserNames returns a list of strings of the configured parsers.
|
|
||||||
func (c *Config) ParserNames() []string {
|
|
||||||
var name []string
|
|
||||||
for _, parser := range c.Parsers {
|
|
||||||
name = append(name, parser.Config.DataFormat)
|
|
||||||
}
|
|
||||||
return PluginNameCounts(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProcessorNames returns a list of strings of the configured processors.
|
// ProcessorNames returns a list of strings of the configured processors.
|
||||||
func (c *Config) ProcessorNames() []string {
|
func (c *Config) ProcessorNames() []string {
|
||||||
var name []string
|
var name []string
|
||||||
|
|
@ -725,9 +716,8 @@ func (c *Config) addParser(parentname string, table *ast.Table) (*models.Running
|
||||||
}
|
}
|
||||||
|
|
||||||
running := models.NewRunningParser(parser, conf)
|
running := models.NewRunningParser(parser, conf)
|
||||||
c.Parsers = append(c.Parsers, running)
|
err = running.Init()
|
||||||
|
return running, err
|
||||||
return running, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) addProcessor(name string, table *ast.Table) error {
|
func (c *Config) addProcessor(name string, table *ast.Table) error {
|
||||||
|
|
@ -888,12 +878,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
|
||||||
return errors.New("parser not found")
|
return errors.New("parser not found")
|
||||||
}
|
}
|
||||||
t.SetParserFunc(func() (telegraf.Parser, error) {
|
t.SetParserFunc(func() (telegraf.Parser, error) {
|
||||||
parser, err := c.addParser(name, table)
|
return c.addParser(name, table)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = parser.Init()
|
|
||||||
return parser, err
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -903,12 +888,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
|
||||||
return errors.New("parser not found")
|
return errors.New("parser not found")
|
||||||
}
|
}
|
||||||
t.SetParserFunc(func() (parsers.Parser, error) {
|
t.SetParserFunc(func() (parsers.Parser, error) {
|
||||||
parser, err := c.addParser(name, table)
|
return c.addParser(name, table)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = parser.Init()
|
|
||||||
return parser, err
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -497,7 +497,6 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
// Get the parser set with 'SetParser()'
|
// Get the parser set with 'SetParser()'
|
||||||
if p, ok := input.Parser.(*models.RunningParser); ok {
|
if p, ok := input.Parser.(*models.RunningParser); ok {
|
||||||
require.NoError(t, p.Init())
|
|
||||||
actual = append(actual, p.Parser)
|
actual = append(actual, p.Parser)
|
||||||
} else {
|
} else {
|
||||||
actual = append(actual, input.Parser)
|
actual = append(actual, input.Parser)
|
||||||
|
|
@ -618,7 +617,6 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
// Get the parser set with 'SetParser()'
|
// Get the parser set with 'SetParser()'
|
||||||
if p, ok := input.Parser.(*models.RunningParser); ok {
|
if p, ok := input.Parser.(*models.RunningParser); ok {
|
||||||
require.NoError(t, p.Init())
|
|
||||||
actual = append(actual, p.Parser)
|
actual = append(actual, p.Parser)
|
||||||
} else {
|
} else {
|
||||||
actual = append(actual, input.Parser)
|
actual = append(actual, input.Parser)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue