fix(config): set default parser (#12076)
This commit is contained in:
parent
87125f0524
commit
73bac76268
|
|
@ -689,9 +689,12 @@ func (c *Config) addAggregator(name string, table *ast.Table) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) probeParser(table *ast.Table) bool {
|
func (c *Config) probeParser(parentcategory string, parentname string, table *ast.Table) bool {
|
||||||
var dataformat string
|
var dataformat string
|
||||||
c.getFieldString(table, "data_format", &dataformat)
|
c.getFieldString(table, "data_format", &dataformat)
|
||||||
|
if dataformat == "" {
|
||||||
|
dataformat = setDefaultParser(parentcategory, parentname)
|
||||||
|
}
|
||||||
|
|
||||||
creator, ok := parsers.Parsers[dataformat]
|
creator, ok := parsers.Parsers[dataformat]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
@ -709,15 +712,10 @@ func (c *Config) probeParser(table *ast.Table) bool {
|
||||||
func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) {
|
func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) {
|
||||||
var dataformat string
|
var dataformat string
|
||||||
c.getFieldString(table, "data_format", &dataformat)
|
c.getFieldString(table, "data_format", &dataformat)
|
||||||
|
|
||||||
if dataformat == "" {
|
if dataformat == "" {
|
||||||
if parentcategory == "inputs" && parentname == "exec" {
|
dataformat = setDefaultParser(parentcategory, parentname)
|
||||||
// Legacy support, exec plugin originally parsed JSON by default.
|
|
||||||
dataformat = "json"
|
|
||||||
} else {
|
|
||||||
dataformat = "influx"
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var influxParserType string
|
var influxParserType string
|
||||||
c.getFieldString(table, "influx_parser_type", &influxParserType)
|
c.getFieldString(table, "influx_parser_type", &influxParserType)
|
||||||
if dataformat == "influx" && influxParserType == "upstream" {
|
if dataformat == "influx" && influxParserType == "upstream" {
|
||||||
|
|
@ -828,7 +826,7 @@ func (c *Config) setupProcessor(name string, creator processors.StreamingCreator
|
||||||
}
|
}
|
||||||
|
|
||||||
if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
|
if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
|
||||||
if !c.probeParser(table) {
|
if !c.probeParser("processors", name, table) {
|
||||||
return nil, false, errors.New("parser not found")
|
return nil, false, errors.New("parser not found")
|
||||||
}
|
}
|
||||||
t.SetParserFunc(func() (telegraf.Parser, error) {
|
t.SetParserFunc(func() (telegraf.Parser, error) {
|
||||||
|
|
@ -946,7 +944,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
|
||||||
|
|
||||||
if t, ok := input.(telegraf.ParserFuncPlugin); ok {
|
if t, ok := input.(telegraf.ParserFuncPlugin); ok {
|
||||||
missCountThreshold = 1
|
missCountThreshold = 1
|
||||||
if !c.probeParser(table) {
|
if !c.probeParser("inputs", name, table) {
|
||||||
return errors.New("parser not found")
|
return errors.New("parser not found")
|
||||||
}
|
}
|
||||||
t.SetParserFunc(func() (telegraf.Parser, error) {
|
t.SetParserFunc(func() (telegraf.Parser, error) {
|
||||||
|
|
@ -957,7 +955,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
|
||||||
if t, ok := input.(parsers.ParserFuncInput); ok {
|
if t, ok := input.(parsers.ParserFuncInput); ok {
|
||||||
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncPlugin.
|
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncPlugin.
|
||||||
missCountThreshold = 1
|
missCountThreshold = 1
|
||||||
if !c.probeParser(table) {
|
if !c.probeParser("inputs", name, table) {
|
||||||
return errors.New("parser not found")
|
return errors.New("parser not found")
|
||||||
}
|
}
|
||||||
t.SetParserFunc(func() (parsers.Parser, error) {
|
t.SetParserFunc(func() (parsers.Parser, error) {
|
||||||
|
|
@ -1431,6 +1429,15 @@ func keys(m map[string]bool) []string {
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setDefaultParser(category string, name string) string {
|
||||||
|
// Legacy support, exec plugin originally parsed JSON by default.
|
||||||
|
if category == "inputs" && name == "exec" {
|
||||||
|
return "json"
|
||||||
|
}
|
||||||
|
|
||||||
|
return "influx"
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Config) hasErrs() bool {
|
func (c *Config) hasErrs() bool {
|
||||||
return len(c.errs) > 0
|
return len(c.errs) > 0
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -265,6 +265,16 @@ func TestConfig_WrongCertPath(t *testing.T) {
|
||||||
require.Error(t, c.LoadConfig("./testdata/wrong_cert_path.toml"))
|
require.Error(t, c.LoadConfig("./testdata/wrong_cert_path.toml"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConfig_DefaultParser(t *testing.T) {
|
||||||
|
c := NewConfig()
|
||||||
|
require.NoError(t, c.LoadConfig("./testdata/default_parser.toml"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConfig_DefaultExecParser(t *testing.T) {
|
||||||
|
c := NewConfig()
|
||||||
|
require.NoError(t, c.LoadConfig("./testdata/default_parser_exec.toml"))
|
||||||
|
}
|
||||||
|
|
||||||
func TestConfig_LoadSpecialTypes(t *testing.T) {
|
func TestConfig_LoadSpecialTypes(t *testing.T) {
|
||||||
c := NewConfig()
|
c := NewConfig()
|
||||||
require.NoError(t, c.LoadConfig("./testdata/special_types.toml"))
|
require.NoError(t, c.LoadConfig("./testdata/special_types.toml"))
|
||||||
|
|
@ -886,6 +896,7 @@ type MockupInputPlugin struct {
|
||||||
Paths []string `toml:"paths"`
|
Paths []string `toml:"paths"`
|
||||||
Port int `toml:"port"`
|
Port int `toml:"port"`
|
||||||
Command string
|
Command string
|
||||||
|
Files []string
|
||||||
PidFile string
|
PidFile string
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
tls.ServerConfig
|
tls.ServerConfig
|
||||||
|
|
@ -1069,6 +1080,9 @@ func init() {
|
||||||
inputs.Add("exec", func() telegraf.Input {
|
inputs.Add("exec", func() telegraf.Input {
|
||||||
return &MockupInputPlugin{Timeout: Duration(time.Second * 5)}
|
return &MockupInputPlugin{Timeout: Duration(time.Second * 5)}
|
||||||
})
|
})
|
||||||
|
inputs.Add("file", func() telegraf.Input {
|
||||||
|
return &MockupInputPlugin{}
|
||||||
|
})
|
||||||
inputs.Add("http_listener_v2", func() telegraf.Input {
|
inputs.Add("http_listener_v2", func() telegraf.Input {
|
||||||
return &MockupInputPlugin{}
|
return &MockupInputPlugin{}
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
[[inputs.file]]
|
||||||
|
files = ["metrics"]
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
[[inputs.exec]]
|
||||||
|
command = '/usr/bin/echo {"value": 42}'
|
||||||
Loading…
Reference in New Issue