chore: More parser cleanup (#11532)

This commit is contained in:
Sven Rebhan 2022-07-28 22:30:36 +02:00 committed by GitHub
parent fd84042220
commit 3ad9fe73ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 119 additions and 345 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes"
"crypto/tls"
_ "embed"
"errors"
"fmt"
"io"
"log"
@ -24,14 +25,12 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/aggregators"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/temporary/json_v2"
"github.com/influxdata/telegraf/plugins/parsers/temporary/xpath"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/toml"
@ -692,6 +691,15 @@ func (c *Config) addParser(parentname string, table *ast.Table) (*models.Running
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
if dataformat == "" {
if parentname == "exec" {
// Legacy support, exec plugin originally parsed JSON by default.
dataformat = "json"
} else {
dataformat = "influx"
}
}
creator, ok := parsers.Parsers[dataformat]
if !ok {
return nil, fmt.Errorf("Undefined but requested parser: %s", dataformat)
@ -830,7 +838,6 @@ func (c *Config) addInput(name string, table *ast.Table) error {
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
// missing entries at the end.
missThreshold := 0
missCount := make(map[string]int)
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()
@ -850,92 +857,50 @@ func (c *Config) addInput(name string, table *ast.Table) error {
// If the input has a SetParser or SetParserFunc function, it can accept
// arbitrary data-formats, so build the requested parser and set it.
if t, ok := input.(telegraf.ParserInput); ok {
missThreshold = 1
if parser, err := c.addParser(name, table); err == nil {
t.SetParser(parser)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
parser, err := c.buildParserOld(name, config)
if err != nil {
return err
}
t.SetParser(parser)
parser, err := c.addParser(name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
}
// Keep the old interface for backward compatibility
if t, ok := input.(parsers.ParserInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserInput.
missThreshold = 1
if parser, err := c.addParser(name, table); err == nil {
t.SetParser(parser)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
parser, err := c.buildParserOld(name, config)
if err != nil {
return err
}
t.SetParser(parser)
parser, err := c.addParser(name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
}
if t, ok := input.(telegraf.ParserFuncInput); ok {
missThreshold = 1
if c.probeParser(table) {
t.SetParserFunc(func() (telegraf.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
} else {
missThreshold = 0
// Fallback to the old way
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.buildParserOld(name, config)
})
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
}
if t, ok := input.(parsers.ParserFuncInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncInput.
missThreshold = 1
if c.probeParser(table) {
t.SetParserFunc(func() (parsers.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
} else {
missThreshold = 0
// Fallback to the old way
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
t.SetParserFunc(func() (parsers.Parser, error) {
return c.buildParserOld(name, config)
})
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (parsers.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
}
pluginConfig, err := c.buildInput(name, table)
@ -963,7 +928,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
// Check the number of misses against the threshold
for key, count := range missCount {
if count <= missThreshold {
if count <= 1 {
continue
}
if err := c.missingTomlField(nil, key); err != nil {
@ -1119,173 +1084,6 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
return cp, nil
}
// buildParserOld grabs the necessary entries from the ast.Table for creating
// a parsers.Parser object, and creates it, which can then be added onto
// an Input object.
func (c *Config) buildParserOld(name string, config *parsers.Config) (telegraf.Parser, error) {
parser, err := parsers.NewParser(config)
if err != nil {
return nil, err
}
logger := models.NewLogger("parsers", config.DataFormat, name)
models.SetLoggerOnPlugin(parser, logger)
if initializer, ok := parser.(telegraf.Initializer); ok {
if err := initializer.Init(); err != nil {
return nil, err
}
}
return parser, nil
}
func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
pc := &parsers.Config{
JSONStrict: true,
}
c.getFieldString(tbl, "data_format", &pc.DataFormat)
// Legacy support, exec plugin originally parsed JSON by default.
if name == "exec" && pc.DataFormat == "" {
pc.DataFormat = "json"
} else if pc.DataFormat == "" {
pc.DataFormat = "influx"
}
c.getFieldString(tbl, "separator", &pc.Separator)
c.getFieldStringSlice(tbl, "templates", &pc.Templates)
c.getFieldStringSlice(tbl, "tag_keys", &pc.TagKeys)
c.getFieldStringSlice(tbl, "json_string_fields", &pc.JSONStringFields)
c.getFieldString(tbl, "json_name_key", &pc.JSONNameKey)
c.getFieldString(tbl, "json_query", &pc.JSONQuery)
c.getFieldString(tbl, "json_time_key", &pc.JSONTimeKey)
c.getFieldString(tbl, "json_time_format", &pc.JSONTimeFormat)
c.getFieldString(tbl, "json_timezone", &pc.JSONTimezone)
c.getFieldBool(tbl, "json_strict", &pc.JSONStrict)
c.getFieldString(tbl, "data_type", &pc.DataType)
c.getFieldString(tbl, "collectd_auth_file", &pc.CollectdAuthFile)
c.getFieldString(tbl, "collectd_security_level", &pc.CollectdSecurityLevel)
c.getFieldString(tbl, "collectd_parse_multivalue", &pc.CollectdSplit)
c.getFieldStringSlice(tbl, "collectd_typesdb", &pc.CollectdTypesDB)
c.getFieldString(tbl, "dropwizard_metric_registry_path", &pc.DropwizardMetricRegistryPath)
c.getFieldString(tbl, "dropwizard_time_path", &pc.DropwizardTimePath)
c.getFieldString(tbl, "dropwizard_time_format", &pc.DropwizardTimeFormat)
c.getFieldString(tbl, "dropwizard_tags_path", &pc.DropwizardTagsPath)
c.getFieldStringMap(tbl, "dropwizard_tag_paths", &pc.DropwizardTagPathsMap)
//for grok data_format
c.getFieldStringSlice(tbl, "grok_named_patterns", &pc.GrokNamedPatterns)
c.getFieldStringSlice(tbl, "grok_patterns", &pc.GrokPatterns)
c.getFieldString(tbl, "grok_custom_patterns", &pc.GrokCustomPatterns)
c.getFieldStringSlice(tbl, "grok_custom_pattern_files", &pc.GrokCustomPatternFiles)
c.getFieldString(tbl, "grok_timezone", &pc.GrokTimezone)
c.getFieldString(tbl, "grok_unique_timestamp", &pc.GrokUniqueTimestamp)
c.getFieldStringSlice(tbl, "form_urlencoded_tag_keys", &pc.FormUrlencodedTagKeys)
c.getFieldString(tbl, "value_field_name", &pc.ValueFieldName)
// for influx parser
c.getFieldString(tbl, "influx_parser_type", &pc.InfluxParserType)
// for XPath parser family
if choice.Contains(pc.DataFormat, []string{"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf"}) {
c.getFieldString(tbl, "xpath_protobuf_file", &pc.XPathProtobufFile)
c.getFieldString(tbl, "xpath_protobuf_type", &pc.XPathProtobufType)
c.getFieldStringSlice(tbl, "xpath_protobuf_import_paths", &pc.XPathProtobufImportPaths)
c.getFieldBool(tbl, "xpath_print_document", &pc.XPathPrintDocument)
// Determine the actual xpath configuration tables
node, xpathOK := tbl.Fields["xpath"]
if !xpathOK {
// Add this for backward compatibility
node, xpathOK = tbl.Fields[pc.DataFormat]
}
if xpathOK {
if subtbls, ok := node.([]*ast.Table); ok {
pc.XPathConfig = make([]xpath.Config, len(subtbls))
for i, subtbl := range subtbls {
subcfg := pc.XPathConfig[i]
c.getFieldString(subtbl, "metric_name", &subcfg.MetricQuery)
c.getFieldString(subtbl, "metric_selection", &subcfg.Selection)
c.getFieldString(subtbl, "timestamp", &subcfg.Timestamp)
c.getFieldString(subtbl, "timestamp_format", &subcfg.TimestampFmt)
c.getFieldStringMap(subtbl, "tags", &subcfg.Tags)
c.getFieldStringMap(subtbl, "fields", &subcfg.Fields)
c.getFieldStringMap(subtbl, "fields_int", &subcfg.FieldsInt)
c.getFieldString(subtbl, "field_selection", &subcfg.FieldSelection)
c.getFieldBool(subtbl, "field_name_expansion", &subcfg.FieldNameExpand)
c.getFieldString(subtbl, "field_name", &subcfg.FieldNameQuery)
c.getFieldString(subtbl, "field_value", &subcfg.FieldValueQuery)
c.getFieldString(subtbl, "tag_selection", &subcfg.TagSelection)
c.getFieldBool(subtbl, "tag_name_expansion", &subcfg.TagNameExpand)
c.getFieldString(subtbl, "tag_name", &subcfg.TagNameQuery)
c.getFieldString(subtbl, "tag_value", &subcfg.TagValueQuery)
pc.XPathConfig[i] = subcfg
}
}
}
}
// for JSON_v2 parser
if node, ok := tbl.Fields["json_v2"]; ok {
if metricConfigs, ok := node.([]*ast.Table); ok {
pc.JSONV2Config = make([]json_v2.Config, len(metricConfigs))
for i, metricConfig := range metricConfigs {
mc := pc.JSONV2Config[i]
c.getFieldString(metricConfig, "measurement_name", &mc.MeasurementName)
if mc.MeasurementName == "" {
mc.MeasurementName = name
}
c.getFieldString(metricConfig, "measurement_name_path", &mc.MeasurementNamePath)
c.getFieldString(metricConfig, "timestamp_path", &mc.TimestampPath)
c.getFieldString(metricConfig, "timestamp_format", &mc.TimestampFormat)
c.getFieldString(metricConfig, "timestamp_timezone", &mc.TimestampTimezone)
mc.Fields = getFieldSubtable(c, metricConfig)
mc.Tags = getTagSubtable(c, metricConfig)
if objectconfigs, ok := metricConfig.Fields["object"]; ok {
if objectconfigs, ok := objectconfigs.([]*ast.Table); ok {
for _, objectConfig := range objectconfigs {
var o json_v2.Object
c.getFieldString(objectConfig, "path", &o.Path)
c.getFieldBool(objectConfig, "optional", &o.Optional)
c.getFieldString(objectConfig, "timestamp_key", &o.TimestampKey)
c.getFieldString(objectConfig, "timestamp_format", &o.TimestampFormat)
c.getFieldString(objectConfig, "timestamp_timezone", &o.TimestampTimezone)
c.getFieldBool(objectConfig, "disable_prepend_keys", &o.DisablePrependKeys)
c.getFieldStringSlice(objectConfig, "included_keys", &o.IncludedKeys)
c.getFieldStringSlice(objectConfig, "excluded_keys", &o.ExcludedKeys)
c.getFieldStringSlice(objectConfig, "tags", &o.Tags)
c.getFieldStringMap(objectConfig, "renames", &o.Renames)
c.getFieldStringMap(objectConfig, "fields", &o.Fields)
o.FieldPaths = getFieldSubtable(c, objectConfig)
o.TagPaths = getTagSubtable(c, objectConfig)
mc.JSONObjects = append(mc.JSONObjects, o)
}
}
}
pc.JSONV2Config[i] = mc
}
}
}
pc.MetricName = name
if c.hasErrs() {
return nil, c.firstErr()
}
return pc, nil
}
func getFieldSubtable(c *Config, metricConfig *ast.Table) []json_v2.DataSet {
var fields []json_v2.DataSet
@ -1425,12 +1223,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags":
// Parser options to ignore
case "data_type", "separator", "tag_keys",
// "templates", // shared with serializers
"grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns",
"grok_timezone", "grok_unique_timestamp",
"influx_parser_type",
"value_field_name":
case "data_type":
// Serializer options to ignore
case "prefix", "template", "templates",

View File

@ -23,13 +23,14 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
_ "github.com/influxdata/telegraf/plugins/parsers/all" // Blank import to have all parsers for testing
"github.com/influxdata/telegraf/plugins/parsers/json"
)
func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) {
c := NewConfig()
require.NoError(t, os.Setenv("MY_TEST_SERVER", "192.168.1.1"))
require.NoError(t, os.Setenv("TEST_INTERVAL", "10s"))
c.LoadConfig("./testdata/single_plugin_env_vars.toml")
require.NoError(t, c.LoadConfig("./testdata/single_plugin_env_vars.toml"))
input := inputs.Inputs["memcached"]().(*MockupInputPlugin)
input.Servers = []string{"192.168.1.1"}
@ -146,18 +147,13 @@ func TestConfig_LoadDirectory(t *testing.T) {
expectedConfigs[0].Tags = make(map[string]string)
expectedPlugins[1] = inputs.Inputs["exec"]().(*MockupInputPlugin)
parserConfig := &parsers.Config{
parser := &json.Parser{
MetricName: "exec",
DataFormat: "json",
JSONStrict: true,
Strict: true,
}
p, err := parsers.NewParser(parserConfig)
require.NoError(t, err)
require.NoError(t, parser.Init())
// Inject logger to have proper struct for comparison
models.SetLoggerOnPlugin(p, models.NewLogger("parsers", parserConfig.DataFormat, parserConfig.MetricName))
expectedPlugins[1].SetParser(p)
expectedPlugins[1].SetParser(parser)
expectedPlugins[1].Command = "/usr/bin/myothercollector --foo=bar"
expectedConfigs[1] = &models.InputConfig{
Name: "exec",
@ -208,11 +204,27 @@ func TestConfig_LoadDirectory(t *testing.T) {
require.NotNil(t, input.Log)
input.Log = nil
// Ignore the parser if not expected
if expectedPlugins[i].parser == nil {
input.parser = nil
// Check the parsers if any
if expectedPlugins[i].parser != nil {
runningParser, ok := input.parser.(*models.RunningParser)
require.True(t, ok)
// We only use the JSON parser here
parser, ok := runningParser.Parser.(*json.Parser)
require.True(t, ok)
// Prepare parser for comparison
require.NoError(t, parser.Init())
parser.Log = nil
// Compare the parser
require.Equalf(t, expectedPlugins[i].parser, parser, "Plugin %d: incorrect parser produced", i)
}
// Ignore the parsers for further comparisons
input.parser = nil
expectedPlugins[i].parser = nil
require.Equalf(t, expectedPlugins[i], plugin.Input, "Plugin %d: incorrect struct produced", i)
require.Equalf(t, expectedConfigs[i], plugin.Config, "Plugin %d: incorrect config produced", i)
}
@ -430,38 +442,22 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
logger := models.NewLogger("parsers", format, cfg.MetricName)
// Try with the new format
if creator, found := parsers.Parsers[format]; found {
t.Logf("using new format parser for %q...", format)
parserNew := creator(formatCfg.MetricName)
if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(parserNew))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
}
models.SetLoggerOnPlugin(parserNew, logger)
if p, ok := parserNew.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
expected = append(expected, parserNew)
continue
}
creator, found := parsers.Parsers[format]
require.Truef(t, found, "No parser for format %q", format)
// Try with the old format
parserOld, err := parsers.NewParser(formatCfg)
if err == nil {
t.Logf("using old format parser for %q...", format)
models.SetLoggerOnPlugin(parserOld, logger)
if p, ok := parserOld.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
parser := creator(formatCfg.MetricName)
if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(parser))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
expected = append(expected, parserOld)
continue
}
require.Containsf(t, err.Error(), "invalid data format:", "setup %q failed: %v", format, err)
require.Failf(t, "%q neither found in old nor new format", format)
models.SetLoggerOnPlugin(parser, logger)
if p, ok := parser.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
expected = append(expected, parser)
}
require.Len(t, expected, len(formats))
@ -567,38 +563,22 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
logger := models.NewLogger("parsers", format, cfg.MetricName)
// Try with the new format
if creator, found := parsers.Parsers[format]; found {
t.Logf("using new format parser for %q...", format)
parserNew := creator(formatCfg.MetricName)
if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(parserNew))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
}
models.SetLoggerOnPlugin(parserNew, logger)
if p, ok := parserNew.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
expected = append(expected, parserNew)
continue
}
creator, found := parsers.Parsers[format]
require.Truef(t, found, "No parser for format %q", format)
// Try with the old format
parserOld, err := parsers.NewParser(formatCfg)
if err == nil {
t.Logf("using old format parser for %q...", format)
models.SetLoggerOnPlugin(parserOld, logger)
if p, ok := parserOld.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
parser := creator(formatCfg.MetricName)
if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(parser))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
expected = append(expected, parserOld)
continue
}
require.Containsf(t, err.Error(), "invalid data format:", "setup %q failed: %v", format, err)
require.Failf(t, "%q neither found in old nor new format", format)
models.SetLoggerOnPlugin(parser, logger)
if p, ok := parser.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
expected = append(expected, parser)
}
require.Len(t, expected, len(formats))

View File

@ -425,14 +425,13 @@ func TestParseCompleteFile(t *testing.T) {
require.NoError(t, err)
r.Log = testutil.Logger{}
parserConfig := parsers.Config{
DataFormat: "json",
JSONNameKey: "name",
TagKeys: []string{"tag1"},
}
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
parser := &json.Parser{
NameKey: "name",
TagKeys: []string{"tag1"},
}
err := parser.Init()
return parser, err
})
testJSON := `{

View File

@ -16,8 +16,8 @@ import (
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/common/oauth"
httpplugin "github.com/influxdata/telegraf/plugins/inputs/http"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/testutil"
@ -295,7 +295,9 @@ func TestBodyAndContentEncoding(t *testing.T) {
})
tt.plugin.SetParserFunc(func() (telegraf.Parser, error) {
return parsers.NewParser(&parsers.Config{DataFormat: "influx"})
parser := &influx.Parser{}
err := parser.Init()
return parser, err
})
var acc testutil.Accumulator

View File

@ -231,7 +231,10 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
func init() {
parsers.Add("json",
func(defaultMetricName string) telegraf.Parser {
return &Parser{MetricName: defaultMetricName}
return &Parser{
MetricName: defaultMetricName,
Strict: true,
}
})
}

View File

@ -188,24 +188,20 @@ type Config struct {
}
// NewParser returns a Parser interface based on the given config.
// DEPRECATED: Please instantiate the parser directly instead of using this function.
func NewParser(config *Config) (Parser, error) {
var err error
var parser Parser
switch config.DataFormat {
default:
creator, found := Parsers[config.DataFormat]
if !found {
return nil, fmt.Errorf("invalid data format: %s", config.DataFormat)
}
// Try to create new-style parsers the old way...
// DEPRECATED: Please instantiate the parser directly instead of using this function.
parser = creator(config.MetricName)
p, ok := parser.(ParserCompatibility)
if !ok {
return nil, fmt.Errorf("parser for %q cannot be created the old way", config.DataFormat)
}
err = p.InitFromConfig(config)
creator, found := Parsers[config.DataFormat]
if !found {
return nil, fmt.Errorf("invalid data format: %s", config.DataFormat)
}
// Try to create new-style parsers the old way...
parser := creator(config.MetricName)
p, ok := parser.(ParserCompatibility)
if !ok {
return nil, fmt.Errorf("parser for %q cannot be created the old way", config.DataFormat)
}
err := p.InitFromConfig(config)
return parser, err
}

View File

@ -20,6 +20,7 @@ func TestRegistry_BackwardCompatibility(t *testing.T) {
CSVHeaderRowCount: 42,
XPathProtobufFile: "xpath/testcases/protos/addressbook.proto",
XPathProtobufType: "addressbook.AddressBook",
JSONStrict: true,
}
// Some parsers need certain settings to not error. Furthermore, we