feat: Parser plugin restructuring (#8791)

This commit is contained in:
Sven Rebhan 2022-01-12 23:54:42 +01:00 committed by GitHub
parent 4e6ff025dc
commit 193dc450c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1321 additions and 446 deletions

View File

@ -192,6 +192,13 @@ func (a *Agent) initPlugins() error {
input.LogName(), err)
}
}
for _, parser := range a.Config.Parsers {
err := parser.Init()
if err != nil {
return fmt.Errorf("could not initialize parser %s::%s: %v",
parser.Config.DataFormat, parser.Config.Parent, err)
}
}
for _, processor := range a.Config.Processors {
err := processor.Init()
if err != nil {

View File

@ -31,6 +31,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/parsers/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"
"gopkg.in/tomb.v1"
)

View File

@ -76,6 +76,7 @@ type Config struct {
Inputs []*models.RunningInput
Outputs []*models.RunningOutput
Aggregators []*models.RunningAggregator
Parsers []*models.RunningParser
// Processors have a slice wrapper type because they need to be sorted
Processors models.RunningProcessors
AggProcessors models.RunningProcessors
@ -103,6 +104,7 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0),
Parsers: make([]*models.RunningParser, 0),
Processors: make([]*models.RunningProcessor, 0),
AggProcessors: make([]*models.RunningProcessor, 0),
InputFilters: make([]string, 0),
@ -233,6 +235,15 @@ func (c *Config) AggregatorNames() []string {
return PluginNameCounts(name)
}
// ParserNames returns a list of strings of the configured parsers.
func (c *Config) ParserNames() []string {
var name []string
for _, parser := range c.Parsers {
name = append(name, parser.Config.DataFormat)
}
return PluginNameCounts(name)
}
// ProcessorNames returns a list of strings of the configured processors.
func (c *Config) ProcessorNames() []string {
var name []string
@ -1048,6 +1059,39 @@ func (c *Config) addAggregator(name string, table *ast.Table) error {
return nil
}
func (c *Config) probeParser(table *ast.Table) bool {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
_, ok := parsers.Parsers[dataformat]
return ok
}
func (c *Config) addParser(parentname string, table *ast.Table) (*models.RunningParser, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
creator, ok := parsers.Parsers[dataformat]
if !ok {
return nil, fmt.Errorf("Undefined but requested parser: %s", dataformat)
}
parser := creator(parentname)
conf, err := c.buildParser(parentname, table)
if err != nil {
return nil, err
}
if err := c.toml.UnmarshalTable(table, parser); err != nil {
return nil, err
}
running := models.NewRunningParser(parser, conf)
c.Parsers = append(c.Parsers, running)
return running, nil
}
func (c *Config) addProcessor(name string, table *ast.Table) error {
creator, ok := processors.Processors[name]
if !ok {
@ -1162,6 +1206,17 @@ func (c *Config) addInput(name string, table *ast.Table) error {
name = "diskio"
}
// For inputs with parsers we need to compute the set of
// options that is not covered by both, the parser and the input.
// We achieve this by keeping a local book of missing entries
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
// missing entries at the end.
missThreshold := 0
missCount := make(map[string]int)
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()
creator, ok := inputs.Inputs[name]
if !ok {
// Handle removed, deprecated plugins
@ -1174,35 +1229,95 @@ func (c *Config) addInput(name string, table *ast.Table) error {
}
input := creator()
// If the input has a SetParser function, then this means it can accept
// arbitrary types of input, so build the parser and set it.
if t, ok := input.(parsers.ParserInput); ok {
parser, err := c.buildParser(name, table)
if err != nil {
return err
// If the input has a SetParser or SetParserFunc function, it can accept
// arbitrary data-formats, so build the requested parser and set it.
if t, ok := input.(telegraf.ParserInput); ok {
missThreshold = 1
if parser, err := c.addParser(name, table); err == nil {
t.SetParser(parser)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
parser, err := c.buildParserOld(name, config)
if err != nil {
return err
}
t.SetParser(parser)
}
}
// Keep the old interface for backward compatibility
if t, ok := input.(parsers.ParserInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserInput.
missThreshold = 1
if parser, err := c.addParser(name, table); err == nil {
t.SetParser(parser)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
parser, err := c.buildParserOld(name, config)
if err != nil {
return err
}
t.SetParser(parser)
}
}
if t, ok := input.(telegraf.ParserFuncInput); ok {
missThreshold = 1
if c.probeParser(table) {
t.SetParserFunc(func() (telegraf.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
} else {
missThreshold = 0
// Fallback to the old way
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.buildParserOld(name, config)
})
}
t.SetParser(parser)
}
if t, ok := input.(parsers.ParserFuncInput); ok {
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
t.SetParserFunc(func() (parsers.Parser, error) {
parser, err := parsers.NewParser(config)
if err != nil {
return nil, err
}
logger := models.NewLogger("parsers", config.DataFormat, name)
models.SetLoggerOnPlugin(parser, logger)
if initializer, ok := parser.(telegraf.Initializer); ok {
if err := initializer.Init(); err != nil {
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncInput.
missThreshold = 1
if c.probeParser(table) {
t.SetParserFunc(func() (parsers.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
} else {
missThreshold = 0
// Fallback to the old way
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
return parser, nil
})
t.SetParserFunc(func() (parsers.Parser, error) {
return c.buildParserOld(name, config)
})
}
}
pluginConfig, err := c.buildInput(name, table)
@ -1221,6 +1336,17 @@ func (c *Config) addInput(name string, table *ast.Table) error {
rp := models.NewRunningInput(input, pluginConfig)
rp.SetDefaultTags(c.Tags)
c.Inputs = append(c.Inputs, rp)
// Check the number of misses against the threshold
for key, count := range missCount {
if count <= missThreshold {
continue
}
if err := c.missingTomlField(nil, key); err != nil {
return err
}
}
return nil
}
@ -1265,6 +1391,21 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato
return conf, nil
}
// buildParser parses Parser specific items from the ast.Table,
// builds the filter and returns a
// models.ParserConfig to be inserted into models.RunningParser
func (c *Config) buildParser(name string, tbl *ast.Table) (*models.ParserConfig, error) {
var dataformat string
c.getFieldString(tbl, "data_format", &dataformat)
conf := &models.ParserConfig{
Parent: name,
DataFormat: dataformat,
}
return conf, nil
}
// buildProcessor parses Processor specific items from the ast.Table,
// builds the filter and returns a
// models.ProcessorConfig to be inserted into models.RunningProcessor
@ -1353,14 +1494,10 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
return cp, nil
}
// buildParser grabs the necessary entries from the ast.Table for creating
// buildParserOld grabs the necessary entries from the ast.Table for creating
// a parsers.Parser object, and creates it, which can then be added onto
// an Input object.
func (c *Config) buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
config, err := c.getParserConfig(name, tbl)
if err != nil {
return nil, err
}
func (c *Config) buildParserOld(name string, config *parsers.Config) (telegraf.Parser, error) {
parser, err := parsers.NewParser(config)
if err != nil {
return nil, err
@ -1422,23 +1559,6 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config,
c.getFieldString(tbl, "grok_timezone", &pc.GrokTimezone)
c.getFieldString(tbl, "grok_unique_timestamp", &pc.GrokUniqueTimestamp)
//for csv parser
c.getFieldStringSlice(tbl, "csv_column_names", &pc.CSVColumnNames)
c.getFieldStringSlice(tbl, "csv_column_types", &pc.CSVColumnTypes)
c.getFieldStringSlice(tbl, "csv_tag_columns", &pc.CSVTagColumns)
c.getFieldString(tbl, "csv_timezone", &pc.CSVTimezone)
c.getFieldString(tbl, "csv_delimiter", &pc.CSVDelimiter)
c.getFieldString(tbl, "csv_comment", &pc.CSVComment)
c.getFieldString(tbl, "csv_measurement_column", &pc.CSVMeasurementColumn)
c.getFieldString(tbl, "csv_timestamp_column", &pc.CSVTimestampColumn)
c.getFieldString(tbl, "csv_timestamp_format", &pc.CSVTimestampFormat)
c.getFieldInt(tbl, "csv_header_row_count", &pc.CSVHeaderRowCount)
c.getFieldInt(tbl, "csv_skip_rows", &pc.CSVSkipRows)
c.getFieldInt(tbl, "csv_skip_columns", &pc.CSVSkipColumns)
c.getFieldBool(tbl, "csv_trim_space", &pc.CSVTrimSpace)
c.getFieldStringSlice(tbl, "csv_skip_values", &pc.CSVSkipValues)
c.getFieldBool(tbl, "csv_skip_errors", &pc.CSVSkipErrors)
c.getFieldStringSlice(tbl, "form_urlencoded_tag_keys", &pc.FormUrlencodedTagKeys)
c.getFieldString(tbl, "value_field_name", &pc.ValueFieldName)
@ -1652,9 +1772,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
switch key {
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
"csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns", "csv_skip_errors",
"csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",
"dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path",
"fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys",
@ -1679,6 +1796,22 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
return nil
}
func (c *Config) setLocalMissingTomlFieldTracker(counter map[string]int) {
f := func(_ reflect.Type, key string) error {
if c, ok := counter[key]; ok {
counter[key] = c + 1
} else {
counter[key] = 1
}
return nil
}
c.toml.MissingField = f
}
func (c *Config) resetMissingTomlFieldTracker() {
c.toml.MissingField = c.missingTomlField
}
func (c *Config) getFieldString(tbl *ast.Table, fieldName string, target *string) {
if node, ok := tbl.Fields[fieldName]; ok {
if kv, ok := node.(*ast.KeyValue); ok {

View File

@ -5,6 +5,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"reflect"
"runtime"
"strings"
"testing"
@ -18,6 +19,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
_ "github.com/influxdata/telegraf/plugins/parsers/all" // Blank import to have all parsers for testing
)
func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) {
@ -359,6 +361,370 @@ func TestConfig_URLLikeFileName(t *testing.T) {
}
}
func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
formats := []string{
"collectd",
"csv",
"dropwizard",
"form_urlencoded",
"graphite",
"grok",
"influx",
"json",
"json_v2",
"logfmt",
"nagios",
"prometheus",
"prometheusremotewrite",
"value",
"wavefront",
"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf",
}
c := NewConfig()
require.NoError(t, c.LoadConfig("./testdata/parsers_new.toml"))
require.Len(t, c.Inputs, len(formats))
cfg := parsers.Config{
CSVHeaderRowCount: 42,
DropwizardTagPathsMap: make(map[string]string),
GrokPatterns: []string{"%{COMBINED_LOG_FORMAT}"},
JSONStrict: true,
MetricName: "parser_test_new",
}
override := map[string]struct {
cfg *parsers.Config
param map[string]interface{}
mask []string
}{
"csv": {
param: map[string]interface{}{
"HeaderRowCount": cfg.CSVHeaderRowCount,
},
mask: []string{"TimeFunc", "Log"},
},
"json_v2": {
mask: []string{"Log"},
},
"logfmt": {
mask: []string{"Now"},
},
"xml": {
mask: []string{"Log"},
},
"xpath_json": {
mask: []string{"Log"},
},
"xpath_msgpack": {
mask: []string{"Log"},
},
"xpath_protobuf": {
cfg: &parsers.Config{
XPathProtobufFile: "testdata/addressbook.proto",
XPathProtobufType: "addressbook.AddressBook",
},
param: map[string]interface{}{
"ProtobufMessageDef": "testdata/addressbook.proto",
"ProtobufMessageType": "addressbook.AddressBook",
},
mask: []string{"Log"},
},
}
expected := make([]telegraf.Parser, 0, len(formats))
for _, format := range formats {
formatCfg := &cfg
settings, hasOverride := override[format]
if hasOverride && settings.cfg != nil {
formatCfg = settings.cfg
}
formatCfg.DataFormat = format
logger := models.NewLogger("parsers", format, cfg.MetricName)
// Try with the new format
if creator, found := parsers.Parsers[format]; found {
t.Logf("using new format parser for %q...", format)
parserNew := creator(formatCfg.MetricName)
if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(parserNew))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
}
models.SetLoggerOnPlugin(parserNew, logger)
if p, ok := parserNew.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
expected = append(expected, parserNew)
continue
}
// Try with the old format
parserOld, err := parsers.NewParser(formatCfg)
if err == nil {
t.Logf("using old format parser for %q...", format)
models.SetLoggerOnPlugin(parserOld, logger)
if p, ok := parserOld.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
expected = append(expected, parserOld)
continue
}
require.Containsf(t, err.Error(), "invalid data format:", "setup %q failed: %v", format, err)
require.Failf(t, "%q neither found in old nor new format", format)
}
require.Len(t, expected, len(formats))
actual := make([]interface{}, 0)
generated := make([]interface{}, 0)
for _, plugin := range c.Inputs {
input, ok := plugin.Input.(*MockupInputPluginParserNew)
require.True(t, ok)
// Get the parser set with 'SetParser()'
if p, ok := input.Parser.(*models.RunningParser); ok {
actual = append(actual, p.Parser)
} else {
actual = append(actual, input.Parser)
}
// Get the parser set with 'SetParserFunc()'
g, err := input.ParserFunc()
require.NoError(t, err)
if rp, ok := g.(*models.RunningParser); ok {
generated = append(generated, rp.Parser)
} else {
generated = append(generated, g)
}
}
require.Len(t, actual, len(formats))
for i, format := range formats {
if settings, found := override[format]; found {
a := reflect.Indirect(reflect.ValueOf(actual[i]))
e := reflect.Indirect(reflect.ValueOf(expected[i]))
g := reflect.Indirect(reflect.ValueOf(generated[i]))
for _, key := range settings.mask {
af := a.FieldByName(key)
ef := e.FieldByName(key)
gf := g.FieldByName(key)
v := reflect.Zero(ef.Type())
af.Set(v)
ef.Set(v)
gf.Set(v)
}
}
// We need special handling for same parsers as they internally contain pointers
// to other structs that inherently differ between instances
switch format {
case "dropwizard", "grok", "influx", "wavefront":
// At least check if we have the same type
require.IsType(t, expected[i], actual[i])
require.IsType(t, expected[i], generated[i])
continue
}
require.EqualValuesf(t, expected[i], actual[i], "in SetParser() for %q", format)
require.EqualValuesf(t, expected[i], generated[i], "in SetParserFunc() for %q", format)
}
}
func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
formats := []string{
"collectd",
"csv",
"dropwizard",
"form_urlencoded",
"graphite",
"grok",
"influx",
"json",
"json_v2",
"logfmt",
"nagios",
"prometheus",
"prometheusremotewrite",
"value",
"wavefront",
"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf",
}
c := NewConfig()
require.NoError(t, c.LoadConfig("./testdata/parsers_old.toml"))
require.Len(t, c.Inputs, len(formats))
cfg := parsers.Config{
CSVHeaderRowCount: 42,
DropwizardTagPathsMap: make(map[string]string),
GrokPatterns: []string{"%{COMBINED_LOG_FORMAT}"},
JSONStrict: true,
MetricName: "parser_test_old",
}
override := map[string]struct {
cfg *parsers.Config
param map[string]interface{}
mask []string
}{
"csv": {
param: map[string]interface{}{
"HeaderRowCount": cfg.CSVHeaderRowCount,
},
mask: []string{"TimeFunc", "Log"},
},
"json_v2": {
mask: []string{"Log"},
},
"logfmt": {
mask: []string{"Now"},
},
"xml": {
mask: []string{"Log"},
},
"xpath_json": {
mask: []string{"Log"},
},
"xpath_msgpack": {
mask: []string{"Log"},
},
"xpath_protobuf": {
cfg: &parsers.Config{
XPathProtobufFile: "testdata/addressbook.proto",
XPathProtobufType: "addressbook.AddressBook",
},
param: map[string]interface{}{
"ProtobufMessageDef": "testdata/addressbook.proto",
"ProtobufMessageType": "addressbook.AddressBook",
},
mask: []string{"Log"},
},
}
expected := make([]telegraf.Parser, 0, len(formats))
for _, format := range formats {
formatCfg := &cfg
settings, hasOverride := override[format]
if hasOverride && settings.cfg != nil {
formatCfg = settings.cfg
}
formatCfg.DataFormat = format
logger := models.NewLogger("parsers", format, cfg.MetricName)
// Try with the new format
if creator, found := parsers.Parsers[format]; found {
t.Logf("using new format parser for %q...", format)
parserNew := creator(formatCfg.MetricName)
if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(parserNew))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
}
models.SetLoggerOnPlugin(parserNew, logger)
if p, ok := parserNew.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
expected = append(expected, parserNew)
continue
}
// Try with the old format
parserOld, err := parsers.NewParser(formatCfg)
if err == nil {
t.Logf("using old format parser for %q...", format)
models.SetLoggerOnPlugin(parserOld, logger)
if p, ok := parserOld.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
expected = append(expected, parserOld)
continue
}
require.Containsf(t, err.Error(), "invalid data format:", "setup %q failed: %v", format, err)
require.Failf(t, "%q neither found in old nor new format", format)
}
require.Len(t, expected, len(formats))
actual := make([]interface{}, 0)
generated := make([]interface{}, 0)
for _, plugin := range c.Inputs {
input, ok := plugin.Input.(*MockupInputPluginParserOld)
require.True(t, ok)
// Get the parser set with 'SetParser()'
if p, ok := input.Parser.(*models.RunningParser); ok {
actual = append(actual, p.Parser)
} else {
actual = append(actual, input.Parser)
}
// Get the parser set with 'SetParserFunc()'
g, err := input.ParserFunc()
require.NoError(t, err)
if rp, ok := g.(*models.RunningParser); ok {
generated = append(generated, rp.Parser)
} else {
generated = append(generated, g)
}
}
require.Len(t, actual, len(formats))
for i, format := range formats {
if settings, found := override[format]; found {
a := reflect.Indirect(reflect.ValueOf(actual[i]))
e := reflect.Indirect(reflect.ValueOf(expected[i]))
g := reflect.Indirect(reflect.ValueOf(generated[i]))
for _, key := range settings.mask {
af := a.FieldByName(key)
ef := e.FieldByName(key)
gf := g.FieldByName(key)
v := reflect.Zero(ef.Type())
af.Set(v)
ef.Set(v)
gf.Set(v)
}
}
// We need special handling for same parsers as they internally contain pointers
// to other structs that inherently differ between instances
switch format {
case "dropwizard", "grok", "influx", "wavefront":
// At least check if we have the same type
require.IsType(t, expected[i], actual[i])
require.IsType(t, expected[i], generated[i])
continue
}
require.EqualValuesf(t, expected[i], actual[i], "in SetParser() for %q", format)
require.EqualValuesf(t, expected[i], generated[i], "in SetParserFunc() for %q", format)
}
}
/*** Mockup INPUT plugin for (old) parser testing to avoid cyclic dependencies ***/
type MockupInputPluginParserOld struct {
Parser parsers.Parser
ParserFunc parsers.ParserFunc
}
func (m *MockupInputPluginParserOld) SampleConfig() string { return "Mockup old parser test plugin" }
func (m *MockupInputPluginParserOld) Description() string { return "Mockup old parser test plugin" }
func (m *MockupInputPluginParserOld) Gather(acc telegraf.Accumulator) error { return nil }
func (m *MockupInputPluginParserOld) SetParser(parser parsers.Parser) { m.Parser = parser }
func (m *MockupInputPluginParserOld) SetParserFunc(f parsers.ParserFunc) { m.ParserFunc = f }
/*** Mockup INPUT plugin for (new) parser testing to avoid cyclic dependencies ***/
type MockupInputPluginParserNew struct {
Parser telegraf.Parser
ParserFunc telegraf.ParserFunc
}
func (m *MockupInputPluginParserNew) SampleConfig() string { return "Mockup old parser test plugin" }
func (m *MockupInputPluginParserNew) Description() string { return "Mockup old parser test plugin" }
func (m *MockupInputPluginParserNew) Gather(acc telegraf.Accumulator) error { return nil }
func (m *MockupInputPluginParserNew) SetParser(parser telegraf.Parser) { m.Parser = parser }
func (m *MockupInputPluginParserNew) SetParserFunc(f telegraf.ParserFunc) { m.ParserFunc = f }
/*** Mockup INPUT plugin for testing to avoid cyclic dependencies ***/
type MockupInputPlugin struct {
Servers []string `toml:"servers"`
@ -373,13 +739,13 @@ type MockupInputPlugin struct {
Log telegraf.Logger `toml:"-"`
tls.ServerConfig
parser parsers.Parser
parser telegraf.Parser
}
func (m *MockupInputPlugin) SampleConfig() string { return "Mockup test intput plugin" }
func (m *MockupInputPlugin) Description() string { return "Mockup test intput plugin" }
func (m *MockupInputPlugin) SampleConfig() string { return "Mockup test input plugin" }
func (m *MockupInputPlugin) Description() string { return "Mockup test input plugin" }
func (m *MockupInputPlugin) Gather(acc telegraf.Accumulator) error { return nil }
func (m *MockupInputPlugin) SetParser(parser parsers.Parser) { m.parser = parser }
func (m *MockupInputPlugin) SetParser(parser telegraf.Parser) { m.parser = parser }
/*** Mockup OUTPUT plugin for testing to avoid cyclic dependencies ***/
type MockupOuputPlugin struct {
@ -400,6 +766,8 @@ func (m *MockupOuputPlugin) Write(metrics []telegraf.Metric) error { return nil
// Register the mockup plugin on loading
func init() {
// Register the mockup input plugin for the required names
inputs.Add("parser_test_new", func() telegraf.Input { return &MockupInputPluginParserNew{} })
inputs.Add("parser_test_old", func() telegraf.Input { return &MockupInputPluginParserOld{} })
inputs.Add("exec", func() telegraf.Input { return &MockupInputPlugin{Timeout: Duration(time.Second * 5)} })
inputs.Add("http_listener_v2", func() telegraf.Input { return &MockupInputPlugin{} })
inputs.Add("memcached", func() telegraf.Input { return &MockupInputPlugin{} })

28
config/testdata/addressbook.proto vendored Normal file
View File

@ -0,0 +1,28 @@
syntax = "proto3";
package addressbook;
message Person {
string name = 1;
int32 id = 2; // Unique ID number for this person.
string email = 3;
uint32 age = 4;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
string number = 1;
PhoneType type = 2;
}
repeated PhoneNumber phones = 5;
}
message AddressBook {
repeated Person people = 1;
repeated string tags = 2;
}

60
config/testdata/parsers_new.toml vendored Normal file
View File

@ -0,0 +1,60 @@
[[inputs.parser_test_new]]
data_format = "collectd"
[[inputs.parser_test_new]]
data_format = "csv"
csv_header_row_count = 42
[[inputs.parser_test_new]]
data_format = "dropwizard"
[[inputs.parser_test_new]]
data_format = "form_urlencoded"
[[inputs.parser_test_new]]
data_format = "graphite"
[[inputs.parser_test_new]]
data_format = "grok"
grok_patterns = ["%{COMBINED_LOG_FORMAT}"]
[[inputs.parser_test_new]]
data_format = "influx"
[[inputs.parser_test_new]]
data_format = "json"
[[inputs.parser_test_new]]
data_format = "json_v2"
[[inputs.parser_test_new]]
data_format = "logfmt"
[[inputs.parser_test_new]]
data_format = "nagios"
[[inputs.parser_test_new]]
data_format = "prometheus"
[[inputs.parser_test_new]]
data_format = "prometheusremotewrite"
[[inputs.parser_test_new]]
data_format = "value"
[[inputs.parser_test_new]]
data_format = "wavefront"
[[inputs.parser_test_new]]
data_format = "xml"
[[inputs.parser_test_new]]
data_format = "xpath_json"
[[inputs.parser_test_new]]
data_format = "xpath_msgpack"
[[inputs.parser_test_new]]
data_format = "xpath_protobuf"
xpath_protobuf_file = "testdata/addressbook.proto"
xpath_protobuf_type = "addressbook.AddressBook"

60
config/testdata/parsers_old.toml vendored Normal file
View File

@ -0,0 +1,60 @@
[[inputs.parser_test_old]]
data_format = "collectd"
[[inputs.parser_test_old]]
data_format = "csv"
csv_header_row_count = 42
[[inputs.parser_test_old]]
data_format = "dropwizard"
[[inputs.parser_test_old]]
data_format = "form_urlencoded"
[[inputs.parser_test_old]]
data_format = "graphite"
[[inputs.parser_test_old]]
data_format = "grok"
grok_patterns = ["%{COMBINED_LOG_FORMAT}"]
[[inputs.parser_test_old]]
data_format = "influx"
[[inputs.parser_test_old]]
data_format = "json"
[[inputs.parser_test_old]]
data_format = "json_v2"
[[inputs.parser_test_old]]
data_format = "logfmt"
[[inputs.parser_test_old]]
data_format = "nagios"
[[inputs.parser_test_old]]
data_format = "prometheus"
[[inputs.parser_test_old]]
data_format = "prometheusremotewrite"
[[inputs.parser_test_old]]
data_format = "value"
[[inputs.parser_test_old]]
data_format = "wavefront"
[[inputs.parser_test_old]]
data_format = "xml"
[[inputs.parser_test_old]]
data_format = "xpath_json"
[[inputs.parser_test_old]]
data_format = "xpath_msgpack"
[[inputs.parser_test_old]]
data_format = "xpath_protobuf"
xpath_protobuf_file = "testdata/addressbook.proto"
xpath_protobuf_type = "addressbook.AddressBook"

97
models/running_parsers.go Normal file
View File

@ -0,0 +1,97 @@
package models
import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)
type RunningParser struct {
Parser telegraf.Parser
Config *ParserConfig
log telegraf.Logger
MetricsParsed selfstat.Stat
ParseTime selfstat.Stat
}
func NewRunningParser(parser telegraf.Parser, config *ParserConfig) *RunningParser {
tags := map[string]string{"type": config.DataFormat}
if config.Alias != "" {
tags["alias"] = config.Alias
}
parserErrorsRegister := selfstat.Register("parser", "errors", tags)
logger := NewLogger("parsers", config.DataFormat+"::"+config.Parent, config.Alias)
logger.OnErr(func() {
parserErrorsRegister.Incr(1)
})
SetLoggerOnPlugin(parser, logger)
return &RunningParser{
Parser: parser,
Config: config,
MetricsParsed: selfstat.Register(
"parser",
"metrics_parsed",
tags,
),
ParseTime: selfstat.Register(
"parser",
"parse_time_ns",
tags,
),
log: logger,
}
}
// ParserConfig is the common config for all parsers.
type ParserConfig struct {
Parent string
Alias string
DataFormat string
DefaultTags map[string]string
}
func (r *RunningParser) LogName() string {
return logName("parsers", r.Config.DataFormat+"::"+r.Config.Parent, r.Config.Alias)
}
func (r *RunningParser) Init() error {
if p, ok := r.Parser.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
func (r *RunningParser) Parse(buf []byte) ([]telegraf.Metric, error) {
start := time.Now()
m, err := r.Parser.Parse(buf)
elapsed := time.Since(start)
r.ParseTime.Incr(elapsed.Nanoseconds())
r.MetricsParsed.Incr(int64(len(m)))
return m, err
}
func (r *RunningParser) ParseLine(line string) (telegraf.Metric, error) {
start := time.Now()
m, err := r.Parser.ParseLine(line)
elapsed := time.Since(start)
r.ParseTime.Incr(elapsed.Nanoseconds())
r.MetricsParsed.Incr(1)
return m, err
}
func (r *RunningParser) SetDefaultTags(tags map[string]string) {
r.Parser.SetDefaultTags(tags)
}
func (r *RunningParser) Log() telegraf.Logger {
return r.log
}

39
parser.go Normal file
View File

@ -0,0 +1,39 @@
package telegraf
// Parser is an interface defining functions that a parser plugin must satisfy.
type Parser interface {
// Parse takes a byte buffer separated by newlines
// ie, `cpu.usage.idle 90\ncpu.usage.busy 10`
// and parses it into telegraf metrics
//
// Must be thread-safe.
Parse(buf []byte) ([]Metric, error)
// ParseLine takes a single string metric
// ie, "cpu.usage.idle 90"
// and parses it into a telegraf metric.
//
// Must be thread-safe.
ParseLine(line string) (Metric, error)
// SetDefaultTags tells the parser to add all of the given tags
// to each parsed metric.
// NOTE: do _not_ modify the map after you've passed it here!!
SetDefaultTags(tags map[string]string)
}
type ParserFunc func() (Parser, error)
// ParserInput is an interface for input plugins that are able to parse
// arbitrary data formats.
type ParserInput interface {
// SetParser sets the parser function for the interface
SetParser(parser Parser)
}
// ParserFuncInput is an interface for input plugins that are able to parse
// arbitrary data formats.
type ParserFuncInput interface {
// GetParser returns a new parser.
SetParserFunc(fn ParserFunc)
}

View File

@ -9,6 +9,7 @@ import (
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/testutil"
)
@ -35,13 +36,12 @@ func TestCSVGZImport(t *testing.T) {
err = r.Init()
require.NoError(t, err)
parserConfig := parsers.Config{
DataFormat: "csv",
CSVHeaderRowCount: 1,
}
require.NoError(t, err)
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
parser := csv.Parser{
HeaderRowCount: 1,
}
err := parser.Init()
return &parser, err
})
r.Log = testutil.Logger{}
@ -215,15 +215,14 @@ func TestCSVNoSkipRows(t *testing.T) {
err = r.Init()
require.NoError(t, err)
parserConfig := parsers.Config{
DataFormat: "csv",
CSVHeaderRowCount: 1,
CSVSkipRows: 0,
CSVTagColumns: []string{"line1"},
}
require.NoError(t, err)
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
parser := csv.Parser{
HeaderRowCount: 1,
SkipRows: 0,
TagColumns: []string{"line1"},
}
err := parser.Init()
return &parser, err
})
r.Log = testutil.Logger{}
@ -288,15 +287,14 @@ func TestCSVSkipRows(t *testing.T) {
err = r.Init()
require.NoError(t, err)
parserConfig := parsers.Config{
DataFormat: "csv",
CSVHeaderRowCount: 1,
CSVSkipRows: 2,
CSVTagColumns: []string{"line1"},
}
require.NoError(t, err)
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
parser := csv.Parser{
HeaderRowCount: 1,
SkipRows: 2,
TagColumns: []string{"line1"},
}
err := parser.Init()
return &parser, err
})
r.Log = testutil.Logger{}
@ -363,14 +361,13 @@ func TestCSVMultiHeader(t *testing.T) {
err = r.Init()
require.NoError(t, err)
parserConfig := parsers.Config{
DataFormat: "csv",
CSVHeaderRowCount: 2,
CSVTagColumns: []string{"line1"},
}
require.NoError(t, err)
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
parser := csv.Parser{
HeaderRowCount: 2,
TagColumns: []string{"line1"},
}
err := parser.Init()
return &parser, err
})
r.Log = testutil.Logger{}

View File

@ -183,7 +183,7 @@ func TestCharacterEncoding(t *testing.T) {
tests := []struct {
name string
plugin *File
csv *csv.Config
csv *csv.Parser
file string
}{
{
@ -192,7 +192,7 @@ func TestCharacterEncoding(t *testing.T) {
Files: []string{"testdata/mtr-utf-8.csv"},
CharacterEncoding: "",
},
csv: &csv.Config{
csv: &csv.Parser{
MetricName: "file",
SkipRows: 1,
ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"},
@ -205,7 +205,7 @@ func TestCharacterEncoding(t *testing.T) {
Files: []string{"testdata/mtr-utf-8.csv"},
CharacterEncoding: "utf-8",
},
csv: &csv.Config{
csv: &csv.Parser{
MetricName: "file",
SkipRows: 1,
ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"},
@ -218,7 +218,7 @@ func TestCharacterEncoding(t *testing.T) {
Files: []string{"testdata/mtr-utf-16le.csv"},
CharacterEncoding: "utf-16le",
},
csv: &csv.Config{
csv: &csv.Parser{
MetricName: "file",
SkipRows: 1,
ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"},
@ -231,7 +231,7 @@ func TestCharacterEncoding(t *testing.T) {
Files: []string{"testdata/mtr-utf-16be.csv"},
CharacterEncoding: "utf-16be",
},
csv: &csv.Config{
csv: &csv.Parser{
MetricName: "file",
SkipRows: 1,
ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"},
@ -244,8 +244,8 @@ func TestCharacterEncoding(t *testing.T) {
err := tt.plugin.Init()
require.NoError(t, err)
parser, err := csv.NewParser(tt.csv)
require.NoError(t, err)
parser := tt.csv
require.NoError(t, parser.Init())
tt.plugin.SetParser(parser)
var acc testutil.Accumulator

View File

@ -96,7 +96,7 @@ var sampleConfig = `
"telegraf/+/mem",
"sensors/#",
]
# topic_fields = "_/_/_/temperature"
# topic_fields = "_/_/_/temperature"
## The message topic will be stored in a tag specified by this value. If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"
@ -142,14 +142,14 @@ var sampleConfig = `
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
## _ denotes an ignored entry in the topic path
## [[inputs.mqtt_consumer.topic_parsing]]
## topic = ""
## measurement = ""
## tags = ""
## fields = ""
## [inputs.mqtt_consumer.topic_parsing.types]
##
##
`
func (m *MQTTConsumer) SampleConfig() string {

View File

@ -301,11 +301,13 @@ cpu,42
plugin.FromBeginning = true
plugin.Files = []string{tmpfile.Name()}
plugin.SetParserFunc(func() (parsers.Parser, error) {
return csv.NewParser(&csv.Config{
parser := csv.Parser{
MeasurementColumn: "measurement",
HeaderRowCount: 1,
TimeFunc: func() time.Time { return time.Unix(0, 0) },
})
}
err := parser.Init()
return &parser, err
})
err = plugin.Init()
@ -360,13 +362,15 @@ skip2,mem,100
plugin.FromBeginning = true
plugin.Files = []string{tmpfile.Name()}
plugin.SetParserFunc(func() (parsers.Parser, error) {
return csv.NewParser(&csv.Config{
parser := csv.Parser{
MeasurementColumn: "measurement1",
HeaderRowCount: 2,
SkipRows: 1,
SkipColumns: 1,
TimeFunc: func() time.Time { return time.Unix(0, 0) },
})
}
err := parser.Init()
return &parser, err
})
err = plugin.Init()

View File

@ -0,0 +1,6 @@
package all
import (
//Blank imports for plugins to register themselves
_ "github.com/influxdata/telegraf/plugins/parsers/csv"
)

View File

@ -14,27 +14,29 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)
type TimeFunc func() time.Time
type Config struct {
ColumnNames []string `toml:"csv_column_names"`
ColumnTypes []string `toml:"csv_column_types"`
Comment string `toml:"csv_comment"`
Delimiter string `toml:"csv_delimiter"`
HeaderRowCount int `toml:"csv_header_row_count"`
MeasurementColumn string `toml:"csv_measurement_column"`
MetricName string `toml:"metric_name"`
SkipColumns int `toml:"csv_skip_columns"`
SkipRows int `toml:"csv_skip_rows"`
TagColumns []string `toml:"csv_tag_columns"`
TimestampColumn string `toml:"csv_timestamp_column"`
TimestampFormat string `toml:"csv_timestamp_format"`
Timezone string `toml:"csv_timezone"`
TrimSpace bool `toml:"csv_trim_space"`
SkipValues []string `toml:"csv_skip_values"`
SkipErrors bool `toml:"csv_skip_errors"`
type Parser struct {
ColumnNames []string `toml:"csv_column_names"`
ColumnTypes []string `toml:"csv_column_types"`
Comment string `toml:"csv_comment"`
Delimiter string `toml:"csv_delimiter"`
HeaderRowCount int `toml:"csv_header_row_count"`
MeasurementColumn string `toml:"csv_measurement_column"`
MetricName string `toml:"metric_name"`
SkipColumns int `toml:"csv_skip_columns"`
SkipRows int `toml:"csv_skip_rows"`
TagColumns []string `toml:"csv_tag_columns"`
TimestampColumn string `toml:"csv_timestamp_column"`
TimestampFormat string `toml:"csv_timestamp_format"`
Timezone string `toml:"csv_timezone"`
TrimSpace bool `toml:"csv_trim_space"`
SkipValues []string `toml:"csv_skip_values"`
SkipErrors bool `toml:"csv_skip_errors"`
Log telegraf.Logger `toml:"-"`
gotColumnNames bool
@ -42,42 +44,36 @@ type Config struct {
DefaultTags map[string]string
}
// Parser is a CSV parser, you should use NewParser to create a new instance.
type Parser struct {
*Config
Log telegraf.Logger
}
func NewParser(c *Config) (*Parser, error) {
if c.HeaderRowCount == 0 && len(c.ColumnNames) == 0 {
return nil, fmt.Errorf("`csv_header_row_count` must be defined if `csv_column_names` is not specified")
func (p *Parser) Init() error {
if p.HeaderRowCount == 0 && len(p.ColumnNames) == 0 {
return fmt.Errorf("`csv_header_row_count` must be defined if `csv_column_names` is not specified")
}
if c.Delimiter != "" {
runeStr := []rune(c.Delimiter)
if p.Delimiter != "" {
runeStr := []rune(p.Delimiter)
if len(runeStr) > 1 {
return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", c.Delimiter)
return fmt.Errorf("csv_delimiter must be a single character, got: %s", p.Delimiter)
}
}
if c.Comment != "" {
runeStr := []rune(c.Comment)
if p.Comment != "" {
runeStr := []rune(p.Comment)
if len(runeStr) > 1 {
return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", c.Comment)
return fmt.Errorf("csv_delimiter must be a single character, got: %s", p.Comment)
}
}
if len(c.ColumnNames) > 0 && len(c.ColumnTypes) > 0 && len(c.ColumnNames) != len(c.ColumnTypes) {
return nil, fmt.Errorf("csv_column_names field count doesn't match with csv_column_types")
if len(p.ColumnNames) > 0 && len(p.ColumnTypes) > 0 && len(p.ColumnNames) != len(p.ColumnTypes) {
return fmt.Errorf("csv_column_names field count doesn't match with csv_column_types")
}
c.gotColumnNames = len(c.ColumnNames) > 0
p.gotColumnNames = len(p.ColumnNames) > 0
if c.TimeFunc == nil {
c.TimeFunc = time.Now
if p.TimeFunc == nil {
p.TimeFunc = time.Now
}
return &Parser{Config: c}, nil
return nil
}
func (p *Parser) SetTimeFunc(fn TimeFunc) {
@ -322,3 +318,30 @@ func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func init() {
parsers.Add("csv",
func(defaultMetricName string) telegraf.Parser {
return &Parser{MetricName: defaultMetricName}
})
}
func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.HeaderRowCount = config.CSVHeaderRowCount
p.SkipRows = config.CSVSkipRows
p.SkipColumns = config.CSVSkipColumns
p.Delimiter = config.CSVDelimiter
p.Comment = config.CSVComment
p.TrimSpace = config.CSVTrimSpace
p.ColumnNames = config.CSVColumnNames
p.ColumnTypes = config.CSVColumnTypes
p.TagColumns = config.CSVTagColumns
p.MeasurementColumn = config.CSVMeasurementColumn
p.TimestampColumn = config.CSVTimestampColumn
p.TimestampFormat = config.CSVTimestampFormat
p.Timezone = config.CSVTimezone
p.DefaultTags = config.DefaultTags
p.SkipValues = config.CSVSkipValues
return p.Init()
}

View File

@ -18,13 +18,12 @@ var DefaultTime = func() time.Time {
}
func TestBasicCSV(t *testing.T) {
p, err := NewParser(
&Config{
ColumnNames: []string{"first", "second", "third"},
TagColumns: []string{"third"},
TimeFunc: DefaultTime,
},
)
p := &Parser{
ColumnNames: []string{"first", "second", "third"},
TagColumns: []string{"third"},
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
_, err = p.ParseLine("1.4,true,hi")
@ -32,13 +31,12 @@ func TestBasicCSV(t *testing.T) {
}
func TestHeaderConcatenationCSV(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 2,
MeasurementColumn: "3",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 2,
MeasurementColumn: "3",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `first,second
1,2,3
@ -50,14 +48,13 @@ func TestHeaderConcatenationCSV(t *testing.T) {
}
func TestHeaderOverride(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
3.4,70,test_name`
@ -72,14 +69,13 @@ func TestHeaderOverride(t *testing.T) {
testCSVRows := []string{"line1,line2,line3\r\n", "3.4,70,test_name\r\n"}
p, err = NewParser(
&Config{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
},
)
p = &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
}
err = p.Init()
require.NoError(t, err)
metrics, err = p.Parse([]byte(testCSVRows[0]))
require.NoError(t, err)
@ -91,16 +87,15 @@ func TestHeaderOverride(t *testing.T) {
}
func TestTimestamp(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
@ -114,16 +109,15 @@ func TestTimestamp(t *testing.T) {
}
func TestTimestampYYYYMMDDHHmm(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "200601021504",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "200601021504",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
@ -136,15 +130,14 @@ func TestTimestampYYYYMMDDHHmm(t *testing.T) {
require.Equal(t, metrics[1].Time().UnixNano(), int64(1247328300000000000))
}
func TestTimestampError(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
23/05/09 04:05:06 PM,70,test_name
@ -154,16 +147,15 @@ func TestTimestampError(t *testing.T) {
}
func TestTimestampUnixFormat(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
1243094706,70,test_name
@ -175,16 +167,15 @@ func TestTimestampUnixFormat(t *testing.T) {
}
func TestTimestampUnixMSFormat(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "unix_ms",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "unix_ms",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
1243094706123,70,test_name
@ -196,14 +187,13 @@ func TestTimestampUnixMSFormat(t *testing.T) {
}
func TestQuotedCharacter(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
@ -214,15 +204,14 @@ func TestQuotedCharacter(t *testing.T) {
}
func TestDelimiter(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
Delimiter: "%",
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 1,
Delimiter: "%",
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1%line2%line3
@ -233,15 +222,14 @@ func TestDelimiter(t *testing.T) {
}
func TestValueConversion(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 0,
Delimiter: ",",
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 0,
Delimiter: ",",
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `3.3,4,true,hello`
@ -275,15 +263,14 @@ func TestValueConversion(t *testing.T) {
}
func TestSkipComment(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 0,
Comment: "#",
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 0,
Comment: "#",
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `#3.3,4,true,hello
4,9.9,true,name_this`
@ -301,15 +288,14 @@ func TestSkipComment(t *testing.T) {
}
func TestTrimSpace(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 0,
TrimSpace: true,
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 0,
TrimSpace: true,
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := ` 3.3, 4, true,hello`
@ -324,13 +310,12 @@ func TestTrimSpace(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedFields, metrics[0].Fields())
p, err = NewParser(
&Config{
HeaderRowCount: 2,
TrimSpace: true,
TimeFunc: DefaultTime,
},
)
p = &Parser{
HeaderRowCount: 2,
TrimSpace: true,
TimeFunc: DefaultTime,
}
err = p.Init()
require.NoError(t, err)
testCSV = " col , col ,col\n" +
" 1 , 2 ,3\n" +
@ -342,15 +327,15 @@ func TestTrimSpace(t *testing.T) {
}
func TestTrimSpaceDelimitedBySpace(t *testing.T) {
p, err := NewParser(
&Config{
Delimiter: " ",
HeaderRowCount: 1,
TrimSpace: true,
TimeFunc: DefaultTime,
},
)
p := &Parser{
Delimiter: " ",
HeaderRowCount: 1,
TrimSpace: true,
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := ` first second third fourth
abcdefgh 0 2 false
abcdef 3.3 4 true
@ -369,16 +354,16 @@ abcdefgh 0 2 false
}
func TestSkipRows(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
SkipRows: 1,
TagColumns: []string{"line1"},
MeasurementColumn: "line3",
TimeFunc: DefaultTime,
},
)
p := &Parser{
HeaderRowCount: 1,
SkipRows: 1,
TagColumns: []string{"line1"},
MeasurementColumn: "line3",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `garbage nonsense
line1,line2,line3
hello,80,test_name2`
@ -395,15 +380,14 @@ hello,80,test_name2`
require.Equal(t, expectedFields, metrics[0].Fields())
require.Equal(t, expectedTags, metrics[0].Tags())
p, err = NewParser(
&Config{
HeaderRowCount: 1,
SkipRows: 1,
TagColumns: []string{"line1"},
MeasurementColumn: "line3",
TimeFunc: DefaultTime,
},
)
p = &Parser{
HeaderRowCount: 1,
SkipRows: 1,
TagColumns: []string{"line1"},
MeasurementColumn: "line3",
TimeFunc: DefaultTime,
}
err = p.Init()
require.NoError(t, err)
testCSVRows := []string{"garbage nonsense\r\n", "line1,line2,line3\r\n", "hello,80,test_name2\r\n"}
@ -422,13 +406,12 @@ hello,80,test_name2`
}
func TestSkipColumns(t *testing.T) {
p, err := NewParser(
&Config{
SkipColumns: 1,
ColumnNames: []string{"line1", "line2"},
TimeFunc: DefaultTime,
},
)
p := &Parser{
SkipColumns: 1,
ColumnNames: []string{"line1", "line2"},
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `hello,80,test_name`
@ -442,14 +425,14 @@ func TestSkipColumns(t *testing.T) {
}
func TestSkipColumnsWithHeader(t *testing.T) {
p, err := NewParser(
&Config{
SkipColumns: 1,
HeaderRowCount: 2,
TimeFunc: DefaultTime,
},
)
p := &Parser{
SkipColumns: 1,
HeaderRowCount: 2,
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `col,col,col
1,2,3
trash,80,test_name`
@ -461,13 +444,11 @@ trash,80,test_name`
}
func TestMultiHeader(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 2,
TimeFunc: DefaultTime,
},
)
require.NoError(t, err)
p := &Parser{
HeaderRowCount: 2,
TimeFunc: DefaultTime,
}
require.NoError(t, p.Init())
testCSV := `col,col
1,2
80,test_name`
@ -478,12 +459,11 @@ func TestMultiHeader(t *testing.T) {
testCSVRows := []string{"col,col\r\n", "1,2\r\n", "80,test_name\r\n"}
p, err = NewParser(
&Config{
HeaderRowCount: 2,
TimeFunc: DefaultTime,
},
)
p = &Parser{
HeaderRowCount: 2,
TimeFunc: DefaultTime,
}
err = p.Init()
require.NoError(t, err)
metrics, err = p.Parse([]byte(testCSVRows[0]))
@ -499,13 +479,12 @@ func TestMultiHeader(t *testing.T) {
}
func TestParseStream(t *testing.T) {
p, err := NewParser(
&Config{
MetricName: "csv",
HeaderRowCount: 1,
TimeFunc: DefaultTime,
},
)
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
csvHeader := "a,b,c"
@ -530,14 +509,12 @@ func TestParseStream(t *testing.T) {
}
func TestParseLineMultiMetricErrorMessage(t *testing.T) {
p, err := NewParser(
&Config{
MetricName: "csv",
HeaderRowCount: 1,
TimeFunc: DefaultTime,
},
)
require.NoError(t, err)
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
TimeFunc: DefaultTime,
}
require.NoError(t, p.Init())
csvHeader := "a,b,c"
csvOneRow := "1,2,3"
@ -568,16 +545,16 @@ func TestParseLineMultiMetricErrorMessage(t *testing.T) {
}
func TestTimestampUnixFloatPrecision(t *testing.T) {
p, err := NewParser(
&Config{
MetricName: "csv",
ColumnNames: []string{"time", "value"},
TimestampColumn: "time",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
},
)
p := &Parser{
MetricName: "csv",
ColumnNames: []string{"time", "value"},
TimestampColumn: "time",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
data := `1551129661.95456123352050781250,42`
expected := []telegraf.Metric{
@ -597,17 +574,17 @@ func TestTimestampUnixFloatPrecision(t *testing.T) {
}
func TestSkipMeasurementColumn(t *testing.T) {
p, err := NewParser(
&Config{
MetricName: "csv",
HeaderRowCount: 1,
TimestampColumn: "timestamp",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
TrimSpace: true,
},
)
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
TimestampColumn: "timestamp",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
TrimSpace: true,
}
err := p.Init()
require.NoError(t, err)
data := `id,value,timestamp
1,5,1551129661.954561233`
@ -629,17 +606,17 @@ func TestSkipMeasurementColumn(t *testing.T) {
}
func TestSkipTimestampColumn(t *testing.T) {
p, err := NewParser(
&Config{
MetricName: "csv",
HeaderRowCount: 1,
TimestampColumn: "timestamp",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
TrimSpace: true,
},
)
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
TimestampColumn: "timestamp",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
TrimSpace: true,
}
err := p.Init()
require.NoError(t, err)
data := `id,value,timestamp
1,5,1551129661.954561233`
@ -661,18 +638,18 @@ func TestSkipTimestampColumn(t *testing.T) {
}
func TestTimestampTimezone(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
Timezone: "Asia/Jakarta",
},
)
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
Timezone: "Asia/Jakarta",
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
23/05/09 11:05:06 PM,70,test_name
07/11/09 11:05:06 PM,80,test_name2`
@ -684,15 +661,15 @@ func TestTimestampTimezone(t *testing.T) {
}
func TestEmptyMeasurementName(t *testing.T) {
p, err := NewParser(
&Config{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"", "b"},
MeasurementColumn: "",
},
)
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"", "b"},
MeasurementColumn: "",
}
err := p.Init()
require.NoError(t, err)
testCSV := `,b
1,2`
metrics, err := p.Parse([]byte(testCSV))
@ -711,15 +688,15 @@ func TestEmptyMeasurementName(t *testing.T) {
}
func TestNumericMeasurementName(t *testing.T) {
p, err := NewParser(
&Config{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
MeasurementColumn: "a",
},
)
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
MeasurementColumn: "a",
}
err := p.Init()
require.NoError(t, err)
testCSV := `a,b
1,2`
metrics, err := p.Parse([]byte(testCSV))
@ -738,14 +715,14 @@ func TestNumericMeasurementName(t *testing.T) {
}
func TestStaticMeasurementName(t *testing.T) {
p, err := NewParser(
&Config{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
},
)
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
}
err := p.Init()
require.NoError(t, err)
testCSV := `a,b
1,2`
metrics, err := p.Parse([]byte(testCSV))
@ -765,15 +742,15 @@ func TestStaticMeasurementName(t *testing.T) {
}
func TestSkipEmptyStringValue(t *testing.T) {
p, err := NewParser(
&Config{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
SkipValues: []string{""},
},
)
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
SkipValues: []string{""},
}
err := p.Init()
require.NoError(t, err)
testCSV := `a,b
1,""`
metrics, err := p.Parse([]byte(testCSV))
@ -792,15 +769,15 @@ func TestSkipEmptyStringValue(t *testing.T) {
}
func TestSkipSpecifiedStringValue(t *testing.T) {
p, err := NewParser(
&Config{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
SkipValues: []string{"MM"},
},
)
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
SkipValues: []string{"MM"},
}
err := p.Init()
require.NoError(t, err)
testCSV := `a,b
1,MM`
metrics, err := p.Parse([]byte(testCSV))
@ -819,17 +796,17 @@ func TestSkipSpecifiedStringValue(t *testing.T) {
}
func TestSkipErrorOnCorruptedCSVLine(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
TimestampColumn: "date",
TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
SkipErrors: true,
},
)
p := &Parser{
HeaderRowCount: 1,
TimestampColumn: "date",
TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
SkipErrors: true,
Log: testutil.Logger{},
}
err := p.Init()
require.NoError(t, err)
p.Log = testutil.Logger{}
testCSV := `date,a,b
23/05/09 11:05:06 PM,1,2
corrupted_line

View File

@ -5,7 +5,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
@ -22,6 +21,17 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/xpath"
)
// Creator is the function to create a new parser
type Creator func(defaultMetricName string) telegraf.Parser
// Parsers contains the registry of all known parsers (following the new style)
var Parsers = map[string]Creator{}
// Add adds a parser to the registry. Usually this function is called in the plugin's init function
func Add(name string, creator Creator) {
Parsers[name] = creator
}
type ParserFunc func() (Parser, error)
// ParserInput is an interface for input plugins that are able to parse
@ -62,6 +72,12 @@ type Parser interface {
SetDefaultTags(tags map[string]string)
}
// ParserCompatibility is an interface for backward-compatible initialization of new parsers
type ParserCompatibility interface {
// InitFromConfig sets the parser internal variables from the old-style config
InitFromConfig(config *Config) error
}
// Config is a struct that covers the data types needed for all parser types,
// and can be used to instantiate _any_ of the parsers.
type Config struct {
@ -152,7 +168,6 @@ type Config struct {
CSVTimezone string `toml:"csv_timezone"`
CSVTrimSpace bool `toml:"csv_trim_space"`
CSVSkipValues []string `toml:"csv_skip_values"`
CSVSkipErrors bool `toml:"csv_skip_errors"`
// FormData configuration
FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"`
@ -233,28 +248,6 @@ func NewParser(config *Config) (Parser, error) {
config.GrokCustomPatternFiles,
config.GrokTimezone,
config.GrokUniqueTimestamp)
case "csv":
config := &csv.Config{
MetricName: config.MetricName,
HeaderRowCount: config.CSVHeaderRowCount,
SkipRows: config.CSVSkipRows,
SkipColumns: config.CSVSkipColumns,
Delimiter: config.CSVDelimiter,
Comment: config.CSVComment,
TrimSpace: config.CSVTrimSpace,
ColumnNames: config.CSVColumnNames,
ColumnTypes: config.CSVColumnTypes,
TagColumns: config.CSVTagColumns,
MeasurementColumn: config.CSVMeasurementColumn,
TimestampColumn: config.CSVTimestampColumn,
TimestampFormat: config.CSVTimestampFormat,
Timezone: config.CSVTimezone,
DefaultTags: config.DefaultTags,
SkipValues: config.CSVSkipValues,
SkipErrors: config.CSVSkipErrors,
}
return csv.NewParser(config)
case "logfmt":
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
case "form_urlencoded":
@ -282,7 +275,19 @@ func NewParser(config *Config) (Parser, error) {
case "json_v2":
parser, err = NewJSONPathParser(config.JSONV2Config)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
creator, found := Parsers[config.DataFormat]
if !found {
return nil, fmt.Errorf("invalid data format: %s", config.DataFormat)
}
// Try to create new-style parsers the old way...
// DEPRECATED: Please instantiate the parser directly instead of using this function.
parser = creator(config.MetricName)
p, ok := parser.(ParserCompatibility)
if !ok {
return nil, fmt.Errorf("parser for %q cannot be created the old way", config.DataFormat)
}
err = p.InitFromConfig(config)
}
return parser, err
}

View File

@ -0,0 +1,70 @@
package parsers_test
import (
"reflect"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
_ "github.com/influxdata/telegraf/plugins/parsers/all"
)
func TestRegistry_BackwardCompatibility(t *testing.T) {
cfg := &parsers.Config{
MetricName: "parser_compatibility_test",
CSVHeaderRowCount: 42,
}
// Some parsers need certain settings to not error. Furthermore, we
// might need to clear some (pointer) fields for comparison...
override := map[string]struct {
param map[string]interface{}
mask []string
}{
"csv": {
param: map[string]interface{}{
"HeaderRowCount": cfg.CSVHeaderRowCount,
},
mask: []string{"TimeFunc"},
},
}
for name, creator := range parsers.Parsers {
t.Logf("testing %q...", name)
cfg.DataFormat = name
// Create parser the new way
expected := creator(cfg.MetricName)
if settings, found := override[name]; found {
s := reflect.Indirect(reflect.ValueOf(expected))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
}
if p, ok := expected.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
// Create parser the old way
actual, err := parsers.NewParser(cfg)
require.NoError(t, err)
// Compare with mask
if settings, found := override[name]; found {
a := reflect.Indirect(reflect.ValueOf(actual))
e := reflect.Indirect(reflect.ValueOf(expected))
for _, key := range settings.mask {
af := a.FieldByName(key)
ef := e.FieldByName(key)
v := reflect.Zero(ef.Type())
af.Set(v)
ef.Set(v)
}
}
require.EqualValuesf(t, expected, actual, "format %q", name)
}
}

View File

@ -13,7 +13,7 @@ type Parser struct {
Merge string `toml:"merge"`
ParseFields []string `toml:"parse_fields"`
Log telegraf.Logger `toml:"-"`
parser parsers.Parser
parser telegraf.Parser
}
var SampleConfig = `