chore(parsers)!: Remove old-style creation (#13310)
This commit is contained in:
parent
8762c71e65
commit
7ced2606b2
|
|
@ -1,6 +1,12 @@
|
|||
<!-- markdownlint-disable MD024 -->
|
||||
# Changelog
|
||||
|
||||
## next
|
||||
|
||||
### BREAKING API CHANGES
|
||||
|
||||
- Removal of old-style parser creation
|
||||
|
||||
## v1.26.3 [2023-05-22]
|
||||
|
||||
### Bugfixes
|
||||
|
|
|
|||
|
|
@ -1228,17 +1228,6 @@ func (c *Config) addInput(name string, table *ast.Table) error {
|
|||
t.SetParser(parser)
|
||||
}
|
||||
|
||||
// Keep the old interface for backward compatibility
|
||||
if t, ok := input.(parsers.ParserInput); ok {
|
||||
// DEPRECATED: Please switch your plugin to telegraf.ParserPlugin.
|
||||
missCountThreshold = 1
|
||||
parser, err := c.addParser("inputs", name, table)
|
||||
if err != nil {
|
||||
return fmt.Errorf("adding parser failed: %w", err)
|
||||
}
|
||||
t.SetParser(parser)
|
||||
}
|
||||
|
||||
if t, ok := input.(telegraf.ParserFuncPlugin); ok {
|
||||
missCountThreshold = 1
|
||||
if !c.probeParser("inputs", name, table) {
|
||||
|
|
@ -1249,17 +1238,6 @@ func (c *Config) addInput(name string, table *ast.Table) error {
|
|||
})
|
||||
}
|
||||
|
||||
if t, ok := input.(parsers.ParserFuncInput); ok {
|
||||
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncPlugin.
|
||||
missCountThreshold = 1
|
||||
if !c.probeParser("inputs", name, table) {
|
||||
return errors.New("parser not found")
|
||||
}
|
||||
t.SetParserFunc(func() (parsers.Parser, error) {
|
||||
return c.addParser("inputs", name, table)
|
||||
})
|
||||
}
|
||||
|
||||
pluginConfig, err := c.buildInput(name, table)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -758,7 +758,7 @@ func TestConfig_SerializerInterfaceOldFormat(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
|
||||
func TestConfig_ParserInterface(t *testing.T) {
|
||||
formats := []string{
|
||||
"collectd",
|
||||
"csv",
|
||||
|
|
@ -782,21 +782,13 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
|
|||
require.NoError(t, c.LoadConfig("./testdata/parsers_new.toml"))
|
||||
require.Len(t, c.Inputs, len(formats))
|
||||
|
||||
cfg := parsers.Config{
|
||||
CSVHeaderRowCount: 42,
|
||||
DropwizardTagPathsMap: make(map[string]string),
|
||||
GrokPatterns: []string{"%{COMBINED_LOG_FORMAT}"},
|
||||
JSONStrict: true,
|
||||
MetricName: "parser_test_new",
|
||||
}
|
||||
|
||||
override := map[string]struct {
|
||||
param map[string]interface{}
|
||||
mask []string
|
||||
}{
|
||||
"csv": {
|
||||
param: map[string]interface{}{
|
||||
"HeaderRowCount": cfg.CSVHeaderRowCount,
|
||||
"HeaderRowCount": 42,
|
||||
},
|
||||
mask: []string{"TimeFunc", "ResetMode"},
|
||||
},
|
||||
|
|
@ -810,15 +802,12 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
|
|||
|
||||
expected := make([]telegraf.Parser, 0, len(formats))
|
||||
for _, format := range formats {
|
||||
formatCfg := &cfg
|
||||
formatCfg.DataFormat = format
|
||||
|
||||
logger := models.NewLogger("parsers", format, cfg.MetricName)
|
||||
logger := models.NewLogger("parsers", format, "parser_test_new")
|
||||
|
||||
creator, found := parsers.Parsers[format]
|
||||
require.Truef(t, found, "No parser for format %q", format)
|
||||
|
||||
parser := creator(formatCfg.MetricName)
|
||||
parser := creator("parser_test_new")
|
||||
if settings, found := override[format]; found {
|
||||
s := reflect.Indirect(reflect.ValueOf(parser))
|
||||
for key, value := range settings.param {
|
||||
|
|
@ -878,126 +867,6 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
|
||||
formats := []string{
|
||||
"collectd",
|
||||
"csv",
|
||||
"dropwizard",
|
||||
"form_urlencoded",
|
||||
"graphite",
|
||||
"grok",
|
||||
"influx",
|
||||
"json",
|
||||
"json_v2",
|
||||
"logfmt",
|
||||
"nagios",
|
||||
"prometheus",
|
||||
"prometheusremotewrite",
|
||||
"value",
|
||||
"wavefront",
|
||||
"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf",
|
||||
}
|
||||
|
||||
c := NewConfig()
|
||||
require.NoError(t, c.LoadConfig("./testdata/parsers_old.toml"))
|
||||
require.Len(t, c.Inputs, len(formats))
|
||||
|
||||
cfg := parsers.Config{
|
||||
CSVHeaderRowCount: 42,
|
||||
DropwizardTagPathsMap: make(map[string]string),
|
||||
GrokPatterns: []string{"%{COMBINED_LOG_FORMAT}"},
|
||||
JSONStrict: true,
|
||||
MetricName: "parser_test_old",
|
||||
}
|
||||
|
||||
override := map[string]struct {
|
||||
param map[string]interface{}
|
||||
mask []string
|
||||
}{
|
||||
"csv": {
|
||||
param: map[string]interface{}{
|
||||
"HeaderRowCount": cfg.CSVHeaderRowCount,
|
||||
},
|
||||
mask: []string{"TimeFunc", "ResetMode"},
|
||||
},
|
||||
"xpath_protobuf": {
|
||||
param: map[string]interface{}{
|
||||
"ProtobufMessageDef": "testdata/addressbook.proto",
|
||||
"ProtobufMessageType": "addressbook.AddressBook",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
expected := make([]telegraf.Parser, 0, len(formats))
|
||||
for _, format := range formats {
|
||||
formatCfg := &cfg
|
||||
formatCfg.DataFormat = format
|
||||
|
||||
logger := models.NewLogger("parsers", format, cfg.MetricName)
|
||||
|
||||
creator, found := parsers.Parsers[format]
|
||||
require.Truef(t, found, "No parser for format %q", format)
|
||||
|
||||
parser := creator(formatCfg.MetricName)
|
||||
if settings, found := override[format]; found {
|
||||
s := reflect.Indirect(reflect.ValueOf(parser))
|
||||
for key, value := range settings.param {
|
||||
v := reflect.ValueOf(value)
|
||||
s.FieldByName(key).Set(v)
|
||||
}
|
||||
}
|
||||
models.SetLoggerOnPlugin(parser, logger)
|
||||
if p, ok := parser.(telegraf.Initializer); ok {
|
||||
require.NoError(t, p.Init())
|
||||
}
|
||||
expected = append(expected, parser)
|
||||
}
|
||||
require.Len(t, expected, len(formats))
|
||||
|
||||
actual := make([]interface{}, 0)
|
||||
generated := make([]interface{}, 0)
|
||||
for _, plugin := range c.Inputs {
|
||||
input, ok := plugin.Input.(*MockupInputPluginParserOld)
|
||||
require.True(t, ok)
|
||||
// Get the parser set with 'SetParser()'
|
||||
if p, ok := input.Parser.(*models.RunningParser); ok {
|
||||
actual = append(actual, p.Parser)
|
||||
} else {
|
||||
actual = append(actual, input.Parser)
|
||||
}
|
||||
// Get the parser set with 'SetParserFunc()'
|
||||
g, err := input.ParserFunc()
|
||||
require.NoError(t, err)
|
||||
if rp, ok := g.(*models.RunningParser); ok {
|
||||
generated = append(generated, rp.Parser)
|
||||
} else {
|
||||
generated = append(generated, g)
|
||||
}
|
||||
}
|
||||
require.Len(t, actual, len(formats))
|
||||
|
||||
for i, format := range formats {
|
||||
// Determine the underlying type of the parser
|
||||
stype := reflect.Indirect(reflect.ValueOf(expected[i])).Interface()
|
||||
// Ignore all unexported fields and fields not relevant for functionality
|
||||
options := []cmp.Option{
|
||||
cmpopts.IgnoreUnexported(stype),
|
||||
cmpopts.IgnoreTypes(sync.Mutex{}),
|
||||
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
|
||||
}
|
||||
if settings, found := override[format]; found {
|
||||
options = append(options, cmpopts.IgnoreFields(stype, settings.mask...))
|
||||
}
|
||||
|
||||
// Do a manual comparison as require.EqualValues will also work on unexported fields
|
||||
// that cannot be cleared or ignored.
|
||||
diff := cmp.Diff(expected[i], actual[i], options...)
|
||||
require.Emptyf(t, diff, "Difference in SetParser() for %q", format)
|
||||
diff = cmp.Diff(expected[i], generated[i], options...)
|
||||
require.Emptyf(t, diff, "Difference in SetParserFunc() for %q", format)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfig_MultipleProcessorsOrder(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
@ -1362,25 +1231,6 @@ func TestPersisterProcessorRegistration(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
/*** Mockup INPUT plugin for (old) parser testing to avoid cyclic dependencies ***/
|
||||
type MockupInputPluginParserOld struct {
|
||||
Parser parsers.Parser
|
||||
ParserFunc parsers.ParserFunc
|
||||
}
|
||||
|
||||
func (m *MockupInputPluginParserOld) SampleConfig() string {
|
||||
return "Mockup old parser test plugin"
|
||||
}
|
||||
func (m *MockupInputPluginParserOld) Gather(_ telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
func (m *MockupInputPluginParserOld) SetParser(parser parsers.Parser) {
|
||||
m.Parser = parser
|
||||
}
|
||||
func (m *MockupInputPluginParserOld) SetParserFunc(f parsers.ParserFunc) {
|
||||
m.ParserFunc = f
|
||||
}
|
||||
|
||||
/*** Mockup INPUT plugin for (new) parser testing to avoid cyclic dependencies ***/
|
||||
type MockupInputPluginParserNew struct {
|
||||
Parser telegraf.Parser
|
||||
|
|
@ -1695,9 +1545,6 @@ func init() {
|
|||
inputs.Add("parser_test_new", func() telegraf.Input {
|
||||
return &MockupInputPluginParserNew{}
|
||||
})
|
||||
inputs.Add("parser_test_old", func() telegraf.Input {
|
||||
return &MockupInputPluginParserOld{}
|
||||
})
|
||||
inputs.Add("parser", func() telegraf.Input {
|
||||
return &MockupInputPluginParserOnly{}
|
||||
})
|
||||
|
|
|
|||
|
|
@ -1,60 +0,0 @@
|
|||
[[inputs.parser_test_old]]
|
||||
data_format = "collectd"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "csv"
|
||||
csv_header_row_count = 42
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "dropwizard"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "form_urlencoded"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "graphite"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "grok"
|
||||
grok_patterns = ["%{COMBINED_LOG_FORMAT}"]
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "influx"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "json"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "json_v2"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "logfmt"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "nagios"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "prometheus"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "prometheusremotewrite"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "value"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "wavefront"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "xml"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "xpath_json"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "xpath_msgpack"
|
||||
|
||||
[[inputs.parser_test_old]]
|
||||
data_format = "xpath_protobuf"
|
||||
xpath_protobuf_file = "testdata/addressbook.proto"
|
||||
xpath_protobuf_type = "addressbook.AddressBook"
|
||||
|
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -62,7 +61,7 @@ type AMQPConsumer struct {
|
|||
|
||||
deliveries map[telegraf.TrackingID]amqp.Delivery
|
||||
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
conn *amqp.Connection
|
||||
wg *sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
|
|
@ -122,7 +121,7 @@ func (a *AMQPConsumer) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *AMQPConsumer) SetParser(parser parsers.Parser) {
|
||||
func (a *AMQPConsumer) SetParser(parser telegraf.Parser) {
|
||||
a.parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ import (
|
|||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -58,7 +57,7 @@ type PubSub struct {
|
|||
|
||||
cancel context.CancelFunc
|
||||
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
wg *sync.WaitGroup
|
||||
acc telegraf.TrackingAccumulator
|
||||
|
||||
|
|
@ -78,7 +77,7 @@ func (ps *PubSub) Gather(_ telegraf.Accumulator) error {
|
|||
}
|
||||
|
||||
// SetParser implements ParserInput interface.
|
||||
func (ps *PubSub) SetParser(parser parsers.Parser) {
|
||||
func (ps *PubSub) SetParser(parser telegraf.Parser) {
|
||||
ps.parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/influxdata/telegraf/config"
|
||||
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -41,7 +40,7 @@ type PubSubPush struct {
|
|||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||
|
||||
tlsint.ServerConfig
|
||||
parsers.Parser
|
||||
telegraf.Parser
|
||||
|
||||
server *http.Server
|
||||
acc telegraf.TrackingAccumulator
|
||||
|
|
@ -74,7 +73,7 @@ func (p *PubSubPush) Gather(_ telegraf.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *PubSubPush) SetParser(parser parsers.Parser) {
|
||||
func (p *PubSubPush) SetParser(parser telegraf.Parser) {
|
||||
p.Parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ type DirectoryMonitor struct {
|
|||
filesInUse sync.Map
|
||||
cancel context.CancelFunc
|
||||
context context.Context
|
||||
parserFunc parsers.ParserFunc
|
||||
parserFunc telegraf.ParserFunc
|
||||
filesProcessed selfstat.Stat
|
||||
filesProcessedDir selfstat.Stat
|
||||
filesDropped selfstat.Stat
|
||||
|
|
@ -253,7 +253,7 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error {
|
|||
return monitor.parseFile(parser, reader, file.Name())
|
||||
}
|
||||
|
||||
func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error {
|
||||
func (monitor *DirectoryMonitor) parseFile(parser telegraf.Parser, reader io.Reader, fileName string) error {
|
||||
var splitter bufio.SplitFunc
|
||||
|
||||
// Decide on how to split the file
|
||||
|
|
@ -283,7 +283,7 @@ func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Read
|
|||
return scanner.Err()
|
||||
}
|
||||
|
||||
func (monitor *DirectoryMonitor) parseAtOnce(parser parsers.Parser, reader io.Reader, fileName string) error {
|
||||
func (monitor *DirectoryMonitor) parseAtOnce(parser telegraf.Parser, reader io.Reader, fileName string) error {
|
||||
bytes, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -297,7 +297,7 @@ func (monitor *DirectoryMonitor) parseAtOnce(parser parsers.Parser, reader io.Re
|
|||
return monitor.sendMetrics(metrics)
|
||||
}
|
||||
|
||||
func (monitor *DirectoryMonitor) parseMetrics(parser parsers.Parser, line []byte, fileName string) (metrics []telegraf.Metric, err error) {
|
||||
func (monitor *DirectoryMonitor) parseMetrics(parser telegraf.Parser, line []byte, fileName string) (metrics []telegraf.Metric, err error) {
|
||||
metrics, err = parser.Parse(line)
|
||||
if err != nil {
|
||||
if errors.Is(err, parsers.ErrEOF) {
|
||||
|
|
@ -391,7 +391,7 @@ func (monitor *DirectoryMonitor) isIgnoredFile(fileName string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (monitor *DirectoryMonitor) SetParserFunc(fn parsers.ParserFunc) {
|
||||
func (monitor *DirectoryMonitor) SetParserFunc(fn telegraf.ParserFunc) {
|
||||
monitor.parserFunc = fn
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,8 +10,8 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/csv"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/json"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
|
|
@ -53,7 +53,7 @@ func TestCSVGZImport(t *testing.T) {
|
|||
err := r.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||
r.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
parser := csv.Parser{
|
||||
HeaderRowCount: 1,
|
||||
}
|
||||
|
|
@ -119,7 +119,7 @@ func TestCSVGZImportWithHeader(t *testing.T) {
|
|||
err := r.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||
r.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
parser := csv.Parser{
|
||||
HeaderRowCount: 1,
|
||||
SkipRows: 1,
|
||||
|
|
@ -189,7 +189,7 @@ func TestMultipleJSONFileImports(t *testing.T) {
|
|||
err := r.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||
r.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
p := &json.Parser{NameKey: "Name"}
|
||||
err := p.Init()
|
||||
return p, err
|
||||
|
|
@ -240,7 +240,7 @@ func TestFileTag(t *testing.T) {
|
|||
err := r.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||
r.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
p := &json.Parser{NameKey: "Name"}
|
||||
err := p.Init()
|
||||
return p, err
|
||||
|
|
@ -292,7 +292,7 @@ func TestCSVNoSkipRows(t *testing.T) {
|
|||
err := r.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||
r.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
parser := csv.Parser{
|
||||
HeaderRowCount: 1,
|
||||
SkipRows: 0,
|
||||
|
|
@ -361,7 +361,7 @@ func TestCSVSkipRows(t *testing.T) {
|
|||
err := r.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||
r.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
parser := csv.Parser{
|
||||
HeaderRowCount: 1,
|
||||
SkipRows: 2,
|
||||
|
|
@ -432,7 +432,7 @@ func TestCSVMultiHeader(t *testing.T) {
|
|||
err := r.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||
r.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
parser := csv.Parser{
|
||||
HeaderRowCount: 2,
|
||||
TagColumns: []string{"line1"},
|
||||
|
|
@ -501,7 +501,7 @@ func TestParseCompleteFile(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
r.Log = testutil.Logger{}
|
||||
|
||||
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||
r.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
parser := &json.Parser{
|
||||
NameKey: "name",
|
||||
TagKeys: []string{"tag1"},
|
||||
|
|
@ -553,7 +553,7 @@ func TestParseSubdirectories(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
r.Log = testutil.Logger{}
|
||||
|
||||
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||
r.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
parser := &json.Parser{
|
||||
NameKey: "name",
|
||||
TagKeys: []string{"tag1"},
|
||||
|
|
@ -631,7 +631,7 @@ func TestParseSubdirectoriesFilesIgnore(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
r.Log = testutil.Logger{}
|
||||
|
||||
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||
r.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
parser := &json.Parser{
|
||||
NameKey: "name",
|
||||
TagKeys: []string{"tag1"},
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ import (
|
|||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -64,7 +63,7 @@ type EventHub struct {
|
|||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
in chan []telegraf.Metric
|
||||
}
|
||||
|
||||
|
|
@ -73,7 +72,7 @@ func (*EventHub) SampleConfig() string {
|
|||
}
|
||||
|
||||
// SetParser sets the parser
|
||||
func (e *EventHub) SetParser(parser parsers.Parser) {
|
||||
func (e *EventHub) SetParser(parser telegraf.Parser) {
|
||||
e.parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/models"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/nagios"
|
||||
)
|
||||
|
||||
|
|
@ -40,7 +39,7 @@ type Exec struct {
|
|||
Timeout config.Duration `toml:"timeout"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
|
||||
runner Runner
|
||||
|
||||
|
|
@ -164,7 +163,7 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync
|
|||
}
|
||||
}
|
||||
|
||||
func (e *Exec) SetParser(parser parsers.Parser) {
|
||||
func (e *Exec) SetParser(parser telegraf.Parser) {
|
||||
e.parser = parser
|
||||
unwrapped, ok := parser.(*models.RunningParser)
|
||||
if ok {
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/influxdata/telegraf/internal/process"
|
||||
"github.com/influxdata/telegraf/models"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
)
|
||||
|
||||
|
|
@ -33,7 +32,7 @@ type Execd struct {
|
|||
|
||||
process *process.Process
|
||||
acc telegraf.Accumulator
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
outputReader func(io.Reader)
|
||||
}
|
||||
|
||||
|
|
@ -41,7 +40,7 @@ func (*Execd) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
|
||||
func (e *Execd) SetParser(parser parsers.Parser) {
|
||||
func (e *Execd) SetParser(parser telegraf.Parser) {
|
||||
e.parser = parser
|
||||
e.outputReader = e.cmdReadOut
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -40,7 +39,7 @@ type GCS struct {
|
|||
Log telegraf.Logger
|
||||
offSet OffSet
|
||||
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
client *storage.Client
|
||||
|
||||
ctx context.Context
|
||||
|
|
@ -66,7 +65,7 @@ func (gcs *GCS) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
|
||||
func (gcs *GCS) SetParser(parser parsers.Parser) {
|
||||
func (gcs *GCS) SetParser(parser telegraf.Parser) {
|
||||
gcs.parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
_ "github.com/influxdata/telegraf/plugins/parsers/all"
|
||||
"github.com/influxdata/telegraf"
|
||||
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
|
|
@ -179,17 +179,20 @@ func TestRunGatherIterationWithPages(t *testing.T) {
|
|||
require.Equal(t, 0, len(emptyAcc.Metrics))
|
||||
}
|
||||
|
||||
func createParser() parsers.Parser {
|
||||
testParser, _ := parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "cpu",
|
||||
JSONQuery: "metrics",
|
||||
TagKeys: []string{"tags_datacenter", "tags_host"},
|
||||
JSONTimeKey: "timestamp",
|
||||
JSONTimeFormat: "unix_ms",
|
||||
})
|
||||
func createParser() telegraf.Parser {
|
||||
p := &jsonparser.Parser{
|
||||
MetricName: "cpu",
|
||||
Query: "metrics",
|
||||
TagKeys: []string{"tags_datacenter", "tags_host"},
|
||||
TimeKey: "timestamp",
|
||||
TimeFormat: "unix_ms",
|
||||
Strict: true,
|
||||
}
|
||||
if err := p.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return testParser
|
||||
return p
|
||||
}
|
||||
|
||||
func startGCSServer(t *testing.T) *httptest.Server {
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
"github.com/influxdata/telegraf/internal/choice"
|
||||
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -70,7 +69,7 @@ type HTTPListenerV2 struct {
|
|||
|
||||
listener net.Listener
|
||||
|
||||
parsers.Parser
|
||||
telegraf.Parser
|
||||
acc telegraf.Accumulator
|
||||
}
|
||||
|
||||
|
|
@ -82,7 +81,7 @@ func (h *HTTPListenerV2) Gather(_ telegraf.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (h *HTTPListenerV2) SetParser(parser parsers.Parser) {
|
||||
func (h *HTTPListenerV2) SetParser(parser telegraf.Parser) {
|
||||
h.Parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/common/kafka"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -64,7 +63,7 @@ type KafkaConsumer struct {
|
|||
ticker *time.Ticker
|
||||
fingerprint string
|
||||
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
topicLock sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
|
|
@ -90,7 +89,7 @@ func (*KafkaConsumer) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
|
||||
func (k *KafkaConsumer) SetParser(parser parsers.Parser) {
|
||||
func (k *KafkaConsumer) SetParser(parser telegraf.Parser) {
|
||||
k.parser = parser
|
||||
}
|
||||
|
||||
|
|
@ -359,7 +358,7 @@ type Message struct {
|
|||
session sarama.ConsumerGroupSession
|
||||
}
|
||||
|
||||
func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser parsers.Parser, log telegraf.Logger) *ConsumerGroupHandler {
|
||||
func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser telegraf.Parser, log telegraf.Logger) *ConsumerGroupHandler {
|
||||
handler := &ConsumerGroupHandler{
|
||||
acc: acc.WithTracking(maxUndelivered),
|
||||
sem: make(chan empty, maxUndelivered),
|
||||
|
|
@ -377,7 +376,7 @@ type ConsumerGroupHandler struct {
|
|||
|
||||
acc telegraf.TrackingAccumulator
|
||||
sem semaphore
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
wg sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -32,7 +31,7 @@ type Kafka struct {
|
|||
PointBuffer int
|
||||
|
||||
Offset string
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
|
||||
Log telegraf.Logger
|
||||
|
||||
|
|
@ -56,7 +55,7 @@ func (*Kafka) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
|
||||
func (k *Kafka) SetParser(parser parsers.Parser) {
|
||||
func (k *Kafka) SetParser(parser telegraf.Parser) {
|
||||
k.parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
"github.com/influxdata/telegraf"
|
||||
internalaws "github.com/influxdata/telegraf/plugins/common/aws"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -44,7 +43,7 @@ type (
|
|||
Log telegraf.Logger
|
||||
|
||||
cons *consumer.Consumer
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
cancel context.CancelFunc
|
||||
acc telegraf.TrackingAccumulator
|
||||
sem chan struct{}
|
||||
|
|
@ -82,7 +81,7 @@ func (*KinesisConsumer) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
|
||||
func (k *KinesisConsumer) SetParser(parser parsers.Parser) {
|
||||
func (k *KinesisConsumer) SetParser(parser telegraf.Parser) {
|
||||
k.parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/json"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
|
@ -54,7 +53,7 @@ func TestKinesisConsumer_onMessage(t *testing.T) {
|
|||
|
||||
type fields struct {
|
||||
ContentEncoding string
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
records map[telegraf.TrackingID]string
|
||||
}
|
||||
type args struct {
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ import (
|
|||
"github.com/influxdata/telegraf/internal/globpath"
|
||||
"github.com/influxdata/telegraf/models"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/grok"
|
||||
)
|
||||
|
||||
|
|
@ -64,7 +63,7 @@ type LogParserPlugin struct {
|
|||
|
||||
sync.Mutex
|
||||
|
||||
GrokParser parsers.Parser
|
||||
GrokParser telegraf.Parser
|
||||
GrokConfig GrokConfig `toml:"grok"`
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/selfstat"
|
||||
)
|
||||
|
||||
|
|
@ -70,7 +69,7 @@ type MQTTConsumer struct {
|
|||
QoS int `toml:"qos"`
|
||||
ConnectionTimeout config.Duration `toml:"connection_timeout"`
|
||||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
|
||||
MetricBuffer int `toml:"metric_buffer" deprecated:"0.10.3;2.0.0;option is ignored"`
|
||||
PersistentSession bool
|
||||
|
|
@ -98,7 +97,7 @@ func (*MQTTConsumer) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
|
||||
func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
|
||||
func (m *MQTTConsumer) SetParser(parser telegraf.Parser) {
|
||||
m.parser = parser
|
||||
}
|
||||
func (m *MQTTConsumer) Init() error {
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
|
@ -48,8 +47,8 @@ func (c *FakeClient) Disconnect(quiesce uint) {
|
|||
type FakeParser struct {
|
||||
}
|
||||
|
||||
// FakeParser satisfies parsers.Parser
|
||||
var _ parsers.Parser = &FakeParser{}
|
||||
// FakeParser satisfies telegraf.Parser
|
||||
var _ telegraf.Parser = &FakeParser{}
|
||||
|
||||
func (p *FakeParser) Parse(_ []byte) ([]telegraf.Metric, error) {
|
||||
panic("not implemented")
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ import (
|
|||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -63,7 +62,7 @@ type natsConsumer struct {
|
|||
subs []*nats.Subscription
|
||||
jsSubs []*nats.Subscription
|
||||
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
// channel for all incoming NATS messages
|
||||
in chan *nats.Msg
|
||||
// channel for all NATS read errors
|
||||
|
|
@ -77,7 +76,7 @@ func (*natsConsumer) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
|
||||
func (n *natsConsumer) SetParser(parser parsers.Parser) {
|
||||
func (n *natsConsumer) SetParser(parser telegraf.Parser) {
|
||||
n.parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -45,7 +44,7 @@ type NSQConsumer struct {
|
|||
|
||||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
consumer *nsq.Consumer
|
||||
|
||||
Log telegraf.Logger
|
||||
|
|
@ -61,7 +60,7 @@ func (*NSQConsumer) SampleConfig() string {
|
|||
}
|
||||
|
||||
// SetParser takes the data_format from the config and finds the right parser for that format
|
||||
func (n *NSQConsumer) SetParser(parser parsers.Parser) {
|
||||
func (n *NSQConsumer) SetParser(parser telegraf.Parser) {
|
||||
n.parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/influxdata/telegraf/internal"
|
||||
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -55,7 +54,7 @@ type SocketListener struct {
|
|||
tlsint.ServerConfig
|
||||
|
||||
wg sync.WaitGroup
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
splitter bufio.SplitFunc
|
||||
|
||||
listener listener
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ type Tail struct {
|
|||
Log telegraf.Logger `toml:"-"`
|
||||
tailers map[string]*tail.Tail
|
||||
offsets map[string]int64
|
||||
parserFunc parsers.ParserFunc
|
||||
parserFunc telegraf.ParserFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
acc telegraf.TrackingAccumulator
|
||||
|
|
@ -246,7 +246,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
|
|||
}
|
||||
|
||||
// ParseLine parses a line of text.
|
||||
func parseLine(parser parsers.Parser, line string) ([]telegraf.Metric, error) {
|
||||
func parseLine(parser telegraf.Parser, line string) ([]telegraf.Metric, error) {
|
||||
m, err := parser.Parse([]byte(line))
|
||||
if err != nil {
|
||||
if errors.Is(err, parsers.ErrEOF) {
|
||||
|
|
@ -259,7 +259,7 @@ func parseLine(parser parsers.Parser, line string) ([]telegraf.Metric, error) {
|
|||
|
||||
// Receiver is launched as a goroutine to continuously watch a tailed logfile
|
||||
// for changes, parse any incoming msgs, and add to the accumulator.
|
||||
func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
|
||||
func (t *Tail) receiver(parser telegraf.Parser, tailer *tail.Tail) {
|
||||
// holds the individual lines of multi-line log entries.
|
||||
var buffer bytes.Buffer
|
||||
|
||||
|
|
@ -393,7 +393,7 @@ func (t *Tail) Stop() {
|
|||
offsetsMutex.Unlock()
|
||||
}
|
||||
|
||||
func (t *Tail) SetParserFunc(fn parsers.ParserFunc) {
|
||||
func (t *Tail) SetParserFunc(fn telegraf.ParserFunc) {
|
||||
t.parserFunc = fn
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ import (
|
|||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/csv"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/grok"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
|
|
@ -27,7 +26,7 @@ var (
|
|||
testdataDir = getTestdataDir()
|
||||
)
|
||||
|
||||
func NewInfluxParser() (parsers.Parser, error) {
|
||||
func NewInfluxParser() (telegraf.Parser, error) {
|
||||
parser := &influx.Parser{}
|
||||
err := parser.Init()
|
||||
if err != nil {
|
||||
|
|
@ -318,7 +317,7 @@ func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *test
|
|||
})
|
||||
}
|
||||
|
||||
func createGrokParser() (parsers.Parser, error) {
|
||||
func createGrokParser() (telegraf.Parser, error) {
|
||||
parser := &grok.Parser{
|
||||
Measurement: "tail_grok",
|
||||
Patterns: []string{"%{TEST_LOG_MULTILINE}"},
|
||||
|
|
@ -347,7 +346,7 @@ cpu,42
|
|||
plugin.Log = testutil.Logger{}
|
||||
plugin.FromBeginning = true
|
||||
plugin.Files = []string{tmpfile.Name()}
|
||||
plugin.SetParserFunc(func() (parsers.Parser, error) {
|
||||
plugin.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
parser := csv.Parser{
|
||||
MeasurementColumn: "measurement",
|
||||
HeaderRowCount: 1,
|
||||
|
|
@ -408,7 +407,7 @@ skip2,mem,100
|
|||
plugin.Log = testutil.Logger{}
|
||||
plugin.FromBeginning = true
|
||||
plugin.Files = []string{tmpfile.Name()}
|
||||
plugin.SetParserFunc(func() (parsers.Parser, error) {
|
||||
plugin.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
parser := csv.Parser{
|
||||
MeasurementColumn: "measurement1",
|
||||
HeaderRowCount: 2,
|
||||
|
|
@ -470,7 +469,7 @@ func TestMultipleMetricsOnFirstLine(t *testing.T) {
|
|||
plugin.FromBeginning = true
|
||||
plugin.Files = []string{tmpfile.Name()}
|
||||
plugin.PathTag = "customPathTagMyFile"
|
||||
plugin.SetParserFunc(func() (parsers.Parser, error) {
|
||||
plugin.SetParserFunc(func() (telegraf.Parser, error) {
|
||||
p := &json.Parser{MetricName: "cpu"}
|
||||
err := p.Init()
|
||||
return p, err
|
||||
|
|
@ -698,7 +697,7 @@ func TestCSVBehavior(t *testing.T) {
|
|||
require.NoError(t, input.Sync())
|
||||
|
||||
// Setup the CSV parser creator function
|
||||
parserFunc := func() (parsers.Parser, error) {
|
||||
parserFunc := func() (telegraf.Parser, error) {
|
||||
parser := &csv.Parser{
|
||||
MetricName: "tail",
|
||||
HeaderRowCount: 1,
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ import (
|
|||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/selfstat"
|
||||
)
|
||||
|
||||
|
|
@ -44,7 +43,7 @@ type TCPListener struct {
|
|||
// track current connections so we can close them in Stop()
|
||||
conns map[string]*net.TCPConn
|
||||
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
acc telegraf.Accumulator
|
||||
|
||||
MaxConnections selfstat.Stat
|
||||
|
|
@ -73,7 +72,7 @@ func (t *TCPListener) Gather(_ telegraf.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *TCPListener) SetParser(parser parsers.Parser) {
|
||||
func (t *TCPListener) SetParser(parser telegraf.Parser) {
|
||||
t.parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/selfstat"
|
||||
)
|
||||
|
||||
|
|
@ -49,7 +48,7 @@ type UDPListener struct {
|
|||
// malformed tracks the number of malformed packets
|
||||
malformed int
|
||||
|
||||
parser parsers.Parser
|
||||
parser telegraf.Parser
|
||||
|
||||
// Keep the accumulator in this struct
|
||||
acc telegraf.Accumulator
|
||||
|
|
@ -83,7 +82,7 @@ func (u *UDPListener) Gather(_ telegraf.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (u *UDPListener) SetParser(parser parsers.Parser) {
|
||||
func (u *UDPListener) SetParser(parser telegraf.Parser) {
|
||||
u.parser = parser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ import (
|
|||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
serializer "github.com/influxdata/telegraf/plugins/serializers/influx"
|
||||
)
|
||||
|
|
@ -48,7 +47,7 @@ type (
|
|||
stubTopic struct {
|
||||
Settings pubsub.PublishSettings
|
||||
ReturnErr map[string]bool
|
||||
parsers.Parser
|
||||
telegraf.Parser
|
||||
*testing.T
|
||||
Base64Data bool
|
||||
ContentEncoding string
|
||||
|
|
|
|||
|
|
@ -206,12 +206,3 @@ func init() {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.AuthFile = config.CollectdAuthFile
|
||||
p.SecurityLevel = config.CollectdSecurityLevel
|
||||
p.TypesDB = config.CollectdTypesDB
|
||||
p.ParseMultiValue = config.CollectdSplit
|
||||
|
||||
return p.Init()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -506,28 +506,3 @@ func init() {
|
|||
return &Parser{MetricName: defaultMetricName}
|
||||
})
|
||||
}
|
||||
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.HeaderRowCount = config.CSVHeaderRowCount
|
||||
p.SkipRows = config.CSVSkipRows
|
||||
p.SkipColumns = config.CSVSkipColumns
|
||||
p.Delimiter = config.CSVDelimiter
|
||||
p.Comment = config.CSVComment
|
||||
p.TrimSpace = config.CSVTrimSpace
|
||||
p.ColumnNames = config.CSVColumnNames
|
||||
p.ColumnTypes = config.CSVColumnTypes
|
||||
p.TagColumns = config.CSVTagColumns
|
||||
p.TagOverwrite = config.CSVTagOverwrite
|
||||
p.MeasurementColumn = config.CSVMeasurementColumn
|
||||
p.TimestampColumn = config.CSVTimestampColumn
|
||||
p.TimestampFormat = config.CSVTimestampFormat
|
||||
p.Timezone = config.CSVTimezone
|
||||
p.DefaultTags = config.DefaultTags
|
||||
p.SkipValues = config.CSVSkipValues
|
||||
p.MetadataRows = config.CSVMetadataRows
|
||||
p.MetadataSeparators = config.CSVMetadataSeparators
|
||||
p.MetadataTrimSet = config.CSVMetadataTrimSet
|
||||
p.ResetMode = "none"
|
||||
|
||||
return p.Init()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -237,16 +237,3 @@ func init() {
|
|||
return &Parser{}
|
||||
})
|
||||
}
|
||||
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.MetricRegistryPath = config.DropwizardMetricRegistryPath
|
||||
p.TimePath = config.DropwizardTimePath
|
||||
p.TimeFormat = config.DropwizardTimeFormat
|
||||
p.TagsPath = config.DropwizardTagsPath
|
||||
p.TagPathsMap = config.DropwizardTagPathsMap
|
||||
p.Separator = config.Separator
|
||||
p.Templates = append(p.Templates, config.Templates...)
|
||||
p.DefaultTags = config.DefaultTags
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,12 +99,6 @@ func (p Parser) parseFields(values url.Values) map[string]interface{} {
|
|||
return fields
|
||||
}
|
||||
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.MetricName = config.MetricName
|
||||
p.TagKeys = config.FormUrlencodedTagKeys
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
parsers.Add("form_urlencoded",
|
||||
func(defaultMetricName string) telegraf.Parser {
|
||||
|
|
|
|||
|
|
@ -196,11 +196,3 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
|
|||
func init() {
|
||||
parsers.Add("graphite", func(_ string) telegraf.Parser { return &Parser{} })
|
||||
}
|
||||
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.Templates = append(p.Templates, config.Templates...)
|
||||
p.Separator = config.Separator
|
||||
p.DefaultTags = config.DefaultTags
|
||||
|
||||
return p.Init()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -574,21 +574,6 @@ func (t *tsModder) tsMod(ts time.Time) time.Time {
|
|||
return ts.Add(t.incr*t.incrn + t.rollover)
|
||||
}
|
||||
|
||||
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.Measurement = config.MetricName
|
||||
p.DefaultTags = config.DefaultTags
|
||||
p.CustomPatterns = config.GrokCustomPatterns
|
||||
p.CustomPatternFiles = config.GrokCustomPatternFiles
|
||||
p.NamedPatterns = config.GrokNamedPatterns
|
||||
p.Patterns = config.GrokPatterns
|
||||
p.Timezone = config.GrokTimezone
|
||||
p.UniqueTimestamp = config.GrokUniqueTimestamp
|
||||
p.Multiline = config.GrokMultiline
|
||||
|
||||
return p.Init()
|
||||
}
|
||||
|
||||
func (p *Parser) Init() error {
|
||||
if len(p.Patterns) == 0 {
|
||||
p.Patterns = []string{"%{COMBINED_LOG_FORMAT}"}
|
||||
|
|
|
|||
|
|
@ -188,13 +188,6 @@ func (p *Parser) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.DefaultTags = config.DefaultTags
|
||||
|
||||
return p.Init()
|
||||
}
|
||||
|
||||
func init() {
|
||||
parsers.Add("influx_upstream",
|
||||
func(_ string) telegraf.Parser {
|
||||
|
|
|
|||
|
|
@ -158,13 +158,6 @@ func (p *Parser) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.DefaultTags = config.DefaultTags
|
||||
|
||||
return p.Init()
|
||||
}
|
||||
|
||||
func init() {
|
||||
parsers.Add("influx",
|
||||
func(_ string) telegraf.Parser {
|
||||
|
|
|
|||
|
|
@ -246,18 +246,3 @@ func init() {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.MetricName = config.MetricName
|
||||
p.TagKeys = config.TagKeys
|
||||
p.NameKey = config.JSONNameKey
|
||||
p.StringFields = config.JSONStringFields
|
||||
p.Query = config.JSONQuery
|
||||
p.TimeKey = config.JSONTimeKey
|
||||
p.TimeFormat = config.JSONTimeFormat
|
||||
p.Timezone = config.JSONTimezone
|
||||
p.Strict = config.JSONStrict
|
||||
p.DefaultTags = config.DefaultTags
|
||||
|
||||
return p.Init()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,12 +14,11 @@ import (
|
|||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/temporary/json_v2"
|
||||
)
|
||||
|
||||
// Parser adheres to the parser interface, contains the parser configuration, and data required to parse JSON
|
||||
type Parser struct {
|
||||
Configs []json_v2.Config `toml:"json_v2"`
|
||||
Configs []Config `toml:"json_v2"`
|
||||
DefaultMetricName string `toml:"-"`
|
||||
DefaultTags map[string]string `toml:"-"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
|
@ -35,13 +34,50 @@ type Parser struct {
|
|||
// iterateObjects dictates if ExpandArray function will handle objects
|
||||
iterateObjects bool
|
||||
// objectConfig contains the config for an object, some info is needed while iterating over the gjson results
|
||||
objectConfig json_v2.Object
|
||||
objectConfig Object
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
MeasurementName string `toml:"measurement_name"` // OPTIONAL
|
||||
MeasurementNamePath string `toml:"measurement_name_path"` // OPTIONAL
|
||||
TimestampPath string `toml:"timestamp_path"` // OPTIONAL
|
||||
TimestampFormat string `toml:"timestamp_format"` // OPTIONAL, but REQUIRED when timestamp_path is defined
|
||||
TimestampTimezone string `toml:"timestamp_timezone"` // OPTIONAL, but REQUIRES timestamp_path
|
||||
|
||||
Fields []DataSet `toml:"field"`
|
||||
Tags []DataSet `toml:"tag"`
|
||||
JSONObjects []Object `toml:"object"`
|
||||
|
||||
Location *time.Location
|
||||
}
|
||||
|
||||
type DataSet struct {
|
||||
Path string `toml:"path"` // REQUIRED
|
||||
Type string `toml:"type"` // OPTIONAL, can't be set for tags they will always be a string
|
||||
Rename string `toml:"rename"`
|
||||
Optional bool `toml:"optional"` // Will suppress errors if there isn't a match with Path
|
||||
}
|
||||
|
||||
type Object struct {
|
||||
Path string `toml:"path"` // REQUIRED
|
||||
Optional bool `toml:"optional"` // Will suppress errors if there isn't a match with Path
|
||||
TimestampKey string `toml:"timestamp_key"`
|
||||
TimestampFormat string `toml:"timestamp_format"` // OPTIONAL, but REQUIRED when timestamp_path is defined
|
||||
TimestampTimezone string `toml:"timestamp_timezone"` // OPTIONAL, but REQUIRES timestamp_path
|
||||
Renames map[string]string `toml:"renames"`
|
||||
Fields map[string]string `toml:"fields"`
|
||||
Tags []string `toml:"tags"`
|
||||
IncludedKeys []string `toml:"included_keys"`
|
||||
ExcludedKeys []string `toml:"excluded_keys"`
|
||||
DisablePrependKeys bool `toml:"disable_prepend_keys"`
|
||||
FieldPaths []DataSet `toml:"field"`
|
||||
TagPaths []DataSet `toml:"tag"`
|
||||
}
|
||||
|
||||
type PathResult struct {
|
||||
result gjson.Result
|
||||
tag bool
|
||||
json_v2.DataSet
|
||||
DataSet
|
||||
}
|
||||
|
||||
type MetricNode struct {
|
||||
|
|
@ -162,7 +198,7 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
|
|||
// processMetric will iterate over all 'field' or 'tag' configs and create metrics for each
|
||||
// A field/tag can either be a single value or an array of values, each resulting in its own metric
|
||||
// For multiple configs, a set of metrics is created from the cartesian product of each separate config
|
||||
func (p *Parser) processMetric(input []byte, data []json_v2.DataSet, tag bool, timestamp time.Time) ([]telegraf.Metric, error) {
|
||||
func (p *Parser) processMetric(input []byte, data []DataSet, tag bool, timestamp time.Time) ([]telegraf.Metric, error) {
|
||||
if len(data) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
@ -409,7 +445,7 @@ func (p *Parser) existsInpathResults(index int) *PathResult {
|
|||
}
|
||||
|
||||
// processObjects will iterate over all 'object' configs and create metrics for each
|
||||
func (p *Parser) processObjects(input []byte, objects []json_v2.Object, timestamp time.Time) ([]telegraf.Metric, error) {
|
||||
func (p *Parser) processObjects(input []byte, objects []Object, timestamp time.Time) ([]telegraf.Metric, error) {
|
||||
p.iterateObjects = true
|
||||
var t []telegraf.Metric
|
||||
for _, c := range objects {
|
||||
|
|
@ -686,17 +722,3 @@ func init() {
|
|||
},
|
||||
)
|
||||
}
|
||||
|
||||
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.DefaultMetricName = config.MetricName
|
||||
p.DefaultTags = config.DefaultTags
|
||||
|
||||
// Convert the config formats which is a one-to-one copy
|
||||
if len(config.JSONV2Config) > 0 {
|
||||
p.Configs = make([]json_v2.Config, 0, len(config.JSONV2Config))
|
||||
p.Configs = append(p.Configs, config.JSONV2Config...)
|
||||
}
|
||||
|
||||
return p.Init()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -123,12 +123,3 @@ func init() {
|
|||
},
|
||||
)
|
||||
}
|
||||
|
||||
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.metricName = config.MetricName
|
||||
p.DefaultTags = config.DefaultTags
|
||||
p.TagKeys = append(p.TagKeys, config.LogFmtTagKeys...)
|
||||
|
||||
return p.Init()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -321,10 +321,3 @@ func init() {
|
|||
},
|
||||
)
|
||||
}
|
||||
|
||||
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.metricName = config.MetricName
|
||||
p.DefaultTags = config.DefaultTags
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -209,11 +209,6 @@ func (p *Parser) GetTimestamp(m *dto.Metric, now time.Time) time.Time {
|
|||
return t
|
||||
}
|
||||
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.IgnoreTimestamp = config.PrometheusIgnoreTimestamp
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
parsers.Add("prometheus",
|
||||
func(defaultMetricName string) telegraf.Parser {
|
||||
|
|
|
|||
|
|
@ -84,10 +84,6 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
|
|||
p.DefaultTags = tags
|
||||
}
|
||||
|
||||
func (p *Parser) InitFromConfig(_ *parsers.Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
parsers.Add("prometheusremotewrite",
|
||||
func(defaultMetricName string) telegraf.Parser {
|
||||
|
|
|
|||
|
|
@ -1,11 +1,7 @@
|
|||
package parsers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/temporary/json_v2"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/temporary/xpath"
|
||||
)
|
||||
|
||||
// Creator is the function to create a new parser
|
||||
|
|
@ -18,192 +14,3 @@ var Parsers = map[string]Creator{}
|
|||
func Add(name string, creator Creator) {
|
||||
Parsers[name] = creator
|
||||
}
|
||||
|
||||
type ParserFunc func() (Parser, error)
|
||||
|
||||
// ParserInput is an interface for input plugins that are able to parse
|
||||
// arbitrary data formats.
|
||||
type ParserInput interface {
|
||||
// SetParser sets the parser function for the interface
|
||||
SetParser(parser Parser)
|
||||
}
|
||||
|
||||
// ParserFuncInput is an interface for input plugins that are able to parse
|
||||
// arbitrary data formats.
|
||||
type ParserFuncInput interface {
|
||||
// SetParserFunc returns a new parser.
|
||||
SetParserFunc(fn ParserFunc)
|
||||
}
|
||||
|
||||
// Parser is an interface defining functions that a parser plugin must satisfy.
|
||||
type Parser interface {
|
||||
// Parse takes a byte buffer separated by newlines
|
||||
// ie, `cpu.usage.idle 90\ncpu.usage.busy 10`
|
||||
// and parses it into telegraf metrics
|
||||
//
|
||||
// Must be thread-safe.
|
||||
Parse(buf []byte) ([]telegraf.Metric, error)
|
||||
|
||||
// ParseLine takes a single string metric
|
||||
// ie, "cpu.usage.idle 90"
|
||||
// and parses it into a telegraf metric.
|
||||
//
|
||||
// Must be thread-safe.
|
||||
// This function is only called by plugins that expect line based protocols
|
||||
// Doesn't need to be implemented by non-linebased parsers (e.g. json, xml)
|
||||
ParseLine(line string) (telegraf.Metric, error)
|
||||
|
||||
// SetDefaultTags tells the parser to add all of the given tags
|
||||
// to each parsed metric.
|
||||
// NOTE: do _not_ modify the map after you've passed it here!!
|
||||
SetDefaultTags(tags map[string]string)
|
||||
}
|
||||
|
||||
// ParserCompatibility is an interface for backward-compatible initialization of new parsers
|
||||
type ParserCompatibility interface {
|
||||
// InitFromConfig sets the parser internal variables from the old-style config
|
||||
InitFromConfig(config *Config) error
|
||||
}
|
||||
|
||||
// Config is a struct that covers the data types needed for all parser types,
|
||||
// and can be used to instantiate _any_ of the parsers.
|
||||
type Config struct {
|
||||
// DataFormat can be one of: avro, json, influx, graphite, value, nagios, opentsdb
|
||||
DataFormat string `toml:"data_format"`
|
||||
|
||||
// Separator only applied to Graphite data.
|
||||
Separator string `toml:"separator"`
|
||||
// Templates only apply to Graphite data.
|
||||
Templates []string `toml:"templates"`
|
||||
|
||||
// TagKeys only apply to JSON data
|
||||
TagKeys []string `toml:"tag_keys"`
|
||||
// Array of glob pattern strings keys that should be added as string fields.
|
||||
JSONStringFields []string `toml:"json_string_fields"`
|
||||
|
||||
JSONNameKey string `toml:"json_name_key"`
|
||||
// MetricName applies to JSON & value. This will be the name of the measurement.
|
||||
MetricName string `toml:"metric_name"`
|
||||
|
||||
// holds a gjson path for json parser
|
||||
JSONQuery string `toml:"json_query"`
|
||||
|
||||
// key of time
|
||||
JSONTimeKey string `toml:"json_time_key"`
|
||||
|
||||
// time format
|
||||
JSONTimeFormat string `toml:"json_time_format"`
|
||||
|
||||
// default timezone
|
||||
JSONTimezone string `toml:"json_timezone"`
|
||||
|
||||
// Whether to continue if a JSON object can't be coerced
|
||||
JSONStrict bool `toml:"json_strict"`
|
||||
|
||||
// Authentication file for collectd
|
||||
CollectdAuthFile string `toml:"collectd_auth_file"`
|
||||
// One of none (default), sign, or encrypt
|
||||
CollectdSecurityLevel string `toml:"collectd_security_level"`
|
||||
// Dataset specification for collectd
|
||||
CollectdTypesDB []string `toml:"collectd_types_db"`
|
||||
|
||||
// whether to split or join multivalue metrics
|
||||
CollectdSplit string `toml:"collectd_split"`
|
||||
|
||||
// DataType only applies to value, this will be the type to parse value to
|
||||
DataType string `toml:"data_type"`
|
||||
|
||||
// DefaultTags are the default tags that will be added to all parsed metrics.
|
||||
DefaultTags map[string]string `toml:"default_tags"`
|
||||
|
||||
// an optional json path containing the metric registry object
|
||||
// if left empty, the whole json object is parsed as a metric registry
|
||||
DropwizardMetricRegistryPath string `toml:"dropwizard_metric_registry_path"`
|
||||
// an optional json path containing the default time of the metrics
|
||||
// if left empty, the processing time is used
|
||||
DropwizardTimePath string `toml:"dropwizard_time_path"`
|
||||
// time format to use for parsing the time field
|
||||
// defaults to time.RFC3339
|
||||
DropwizardTimeFormat string `toml:"dropwizard_time_format"`
|
||||
// an optional json path pointing to a json object with tag key/value pairs
|
||||
// takes precedence over DropwizardTagPathsMap
|
||||
DropwizardTagsPath string `toml:"dropwizard_tags_path"`
|
||||
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
|
||||
// used if TagsPath is empty or doesn't return any tags
|
||||
DropwizardTagPathsMap map[string]string `toml:"dropwizard_tag_paths_map"`
|
||||
|
||||
//grok patterns
|
||||
GrokPatterns []string `toml:"grok_patterns"`
|
||||
GrokNamedPatterns []string `toml:"grok_named_patterns"`
|
||||
GrokCustomPatterns string `toml:"grok_custom_patterns"`
|
||||
GrokCustomPatternFiles []string `toml:"grok_custom_pattern_files"`
|
||||
GrokTimezone string `toml:"grok_timezone"`
|
||||
GrokUniqueTimestamp string `toml:"grok_unique_timestamp"`
|
||||
GrokMultiline bool `toml:"grok_multiline"`
|
||||
|
||||
//csv configuration
|
||||
CSVColumnNames []string `toml:"csv_column_names"`
|
||||
CSVColumnTypes []string `toml:"csv_column_types"`
|
||||
CSVComment string `toml:"csv_comment"`
|
||||
CSVDelimiter string `toml:"csv_delimiter"`
|
||||
CSVHeaderRowCount int `toml:"csv_header_row_count"`
|
||||
CSVMeasurementColumn string `toml:"csv_measurement_column"`
|
||||
CSVSkipColumns int `toml:"csv_skip_columns"`
|
||||
CSVSkipRows int `toml:"csv_skip_rows"`
|
||||
CSVTagColumns []string `toml:"csv_tag_columns"`
|
||||
CSVTagOverwrite bool `toml:"csv_tag_overwrite"`
|
||||
CSVTimestampColumn string `toml:"csv_timestamp_column"`
|
||||
CSVTimestampFormat string `toml:"csv_timestamp_format"`
|
||||
CSVTimezone string `toml:"csv_timezone"`
|
||||
CSVTrimSpace bool `toml:"csv_trim_space"`
|
||||
CSVSkipValues []string `toml:"csv_skip_values"`
|
||||
CSVSkipErrors bool `toml:"csv_skip_errors"`
|
||||
CSVMetadataRows int `toml:"csv_metadata_rows"`
|
||||
CSVMetadataSeparators []string `toml:"csv_metadata_separators"`
|
||||
CSVMetadataTrimSet string `toml:"csv_metadata_trim_set"`
|
||||
|
||||
// FormData configuration
|
||||
FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"`
|
||||
|
||||
// Prometheus configuration
|
||||
PrometheusIgnoreTimestamp bool `toml:"prometheus_ignore_timestamp"`
|
||||
|
||||
// Value configuration
|
||||
ValueFieldName string `toml:"value_field_name"`
|
||||
|
||||
// XPath configuration
|
||||
XPathPrintDocument bool `toml:"xpath_print_document"`
|
||||
XPathProtobufFile string `toml:"xpath_protobuf_file"`
|
||||
XPathProtobufType string `toml:"xpath_protobuf_type"`
|
||||
XPathProtobufImportPaths []string `toml:"xpath_protobuf_import_paths"`
|
||||
XPathAllowEmptySelection bool `toml:"xpath_allow_empty_selection"`
|
||||
XPathConfig []xpath.Config `toml:"xpath"`
|
||||
|
||||
// JSONPath configuration
|
||||
JSONV2Config []json_v2.Config `toml:"json_v2"`
|
||||
|
||||
// Influx configuration
|
||||
InfluxParserType string `toml:"influx_parser_type"`
|
||||
|
||||
// LogFmt configuration
|
||||
LogFmtTagKeys []string `toml:"logfmt_tag_keys"`
|
||||
}
|
||||
|
||||
// NewParser returns a Parser interface based on the given config.
|
||||
// DEPRECATED: Please instantiate the parser directly instead of using this function.
|
||||
func NewParser(config *Config) (Parser, error) {
|
||||
creator, found := Parsers[config.DataFormat]
|
||||
if !found {
|
||||
return nil, fmt.Errorf("invalid data format: %s", config.DataFormat)
|
||||
}
|
||||
|
||||
// Try to create new-style parsers the old way...
|
||||
parser := creator(config.MetricName)
|
||||
p, ok := parser.(ParserCompatibility)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("parser for %q cannot be created the old way", config.DataFormat)
|
||||
}
|
||||
err := p.InitFromConfig(config)
|
||||
|
||||
return parser, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,93 +0,0 @@
|
|||
package parsers_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal/choice"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
_ "github.com/influxdata/telegraf/plugins/parsers/all"
|
||||
)
|
||||
|
||||
func TestRegistry_BackwardCompatibility(t *testing.T) {
|
||||
cfg := &parsers.Config{
|
||||
MetricName: "parser_compatibility_test",
|
||||
CSVHeaderRowCount: 42,
|
||||
XPathProtobufFile: "xpath/testcases/protos/addressbook.proto",
|
||||
XPathProtobufType: "addressbook.AddressBook",
|
||||
JSONStrict: true,
|
||||
}
|
||||
|
||||
// Some parsers need certain settings to not error. Furthermore, we
|
||||
// might need to clear some (pointer) fields for comparison...
|
||||
override := map[string]struct {
|
||||
param map[string]interface{}
|
||||
mask []string
|
||||
}{
|
||||
"csv": {
|
||||
param: map[string]interface{}{
|
||||
"HeaderRowCount": cfg.CSVHeaderRowCount,
|
||||
},
|
||||
mask: []string{"TimeFunc"},
|
||||
},
|
||||
"xpath_protobuf": {
|
||||
param: map[string]interface{}{
|
||||
"ProtobufMessageDef": cfg.XPathProtobufFile,
|
||||
"ProtobufMessageType": cfg.XPathProtobufType,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Define parsers that do not have an old-school init
|
||||
newStyleOnly := []string{"binary", "avro", "opentsdb"}
|
||||
for name, creator := range parsers.Parsers {
|
||||
if choice.Contains(name, newStyleOnly) {
|
||||
t.Logf("skipping new-style-only %q...", name)
|
||||
continue
|
||||
}
|
||||
t.Logf("testing %q...", name)
|
||||
cfg.DataFormat = name
|
||||
|
||||
// Create parser the new way
|
||||
expected := creator(cfg.MetricName)
|
||||
if settings, found := override[name]; found {
|
||||
s := reflect.Indirect(reflect.ValueOf(expected))
|
||||
for key, value := range settings.param {
|
||||
v := reflect.ValueOf(value)
|
||||
s.FieldByName(key).Set(v)
|
||||
}
|
||||
}
|
||||
if p, ok := expected.(telegraf.Initializer); ok {
|
||||
require.NoError(t, p.Init())
|
||||
}
|
||||
|
||||
// Create parser the old way
|
||||
actual, err := parsers.NewParser(cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Determine the underlying type of the parser
|
||||
stype := reflect.Indirect(reflect.ValueOf(expected)).Interface()
|
||||
// Ignore all unexported fields and fields not relevant for functionality
|
||||
options := []cmp.Option{
|
||||
cmpopts.IgnoreUnexported(stype),
|
||||
cmpopts.IgnoreTypes(sync.Mutex{}),
|
||||
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
|
||||
}
|
||||
|
||||
// Add overrides and masks to compare options
|
||||
if settings, found := override[name]; found {
|
||||
options = append(options, cmpopts.IgnoreFields(stype, settings.mask...))
|
||||
}
|
||||
|
||||
// Do a manual comparison as require.EqualValues will also work on unexported fields
|
||||
// that cannot be cleared or ignored.
|
||||
diff := cmp.Diff(expected, actual, options...)
|
||||
require.Emptyf(t, diff, "Difference for %q", name)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,44 +0,0 @@
|
|||
package json_v2
|
||||
|
||||
import "time"
|
||||
|
||||
// Config definition for backward compatibility ONLY.
|
||||
// We need this here to avoid cyclic dependencies. However, we need
|
||||
// to move this to plugins/parsers/json_v2 once we deprecate parser
|
||||
// construction via `NewParser()`.
|
||||
type Config struct {
|
||||
MeasurementName string `toml:"measurement_name"` // OPTIONAL
|
||||
MeasurementNamePath string `toml:"measurement_name_path"` // OPTIONAL
|
||||
TimestampPath string `toml:"timestamp_path"` // OPTIONAL
|
||||
TimestampFormat string `toml:"timestamp_format"` // OPTIONAL, but REQUIRED when timestamp_path is defined
|
||||
TimestampTimezone string `toml:"timestamp_timezone"` // OPTIONAL, but REQUIRES timestamp_path
|
||||
|
||||
Fields []DataSet `toml:"field"`
|
||||
Tags []DataSet `toml:"tag"`
|
||||
JSONObjects []Object `toml:"object"`
|
||||
|
||||
Location *time.Location
|
||||
}
|
||||
|
||||
type DataSet struct {
|
||||
Path string `toml:"path"` // REQUIRED
|
||||
Type string `toml:"type"` // OPTIONAL, can't be set for tags they will always be a string
|
||||
Rename string `toml:"rename"`
|
||||
Optional bool `toml:"optional"` // Will suppress errors if there isn't a match with Path
|
||||
}
|
||||
|
||||
type Object struct {
|
||||
Path string `toml:"path"` // REQUIRED
|
||||
Optional bool `toml:"optional"` // Will suppress errors if there isn't a match with Path
|
||||
TimestampKey string `toml:"timestamp_key"`
|
||||
TimestampFormat string `toml:"timestamp_format"` // OPTIONAL, but REQUIRED when timestamp_path is defined
|
||||
TimestampTimezone string `toml:"timestamp_timezone"` // OPTIONAL, but REQUIRES timestamp_path
|
||||
Renames map[string]string `toml:"renames"`
|
||||
Fields map[string]string `toml:"fields"`
|
||||
Tags []string `toml:"tags"`
|
||||
IncludedKeys []string `toml:"included_keys"`
|
||||
ExcludedKeys []string `toml:"excluded_keys"`
|
||||
DisablePrependKeys bool `toml:"disable_prepend_keys"`
|
||||
FieldPaths []DataSet `toml:"field"`
|
||||
TagPaths []DataSet `toml:"tag"`
|
||||
}
|
||||
|
|
@ -1,36 +0,0 @@
|
|||
package xpath
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/filter"
|
||||
)
|
||||
|
||||
// Config definition for backward compatibility ONLY.
|
||||
// We need this here to avoid cyclic dependencies. However, we need
|
||||
// to move this to plugins/parsers/xpath once we deprecate parser
|
||||
// construction via `NewParser()`.
|
||||
type Config struct {
|
||||
MetricQuery string `toml:"metric_name"`
|
||||
Selection string `toml:"metric_selection"`
|
||||
Timestamp string `toml:"timestamp"`
|
||||
TimestampFmt string `toml:"timestamp_format"`
|
||||
Timezone string `toml:"timezone"`
|
||||
Tags map[string]string `toml:"tags"`
|
||||
Fields map[string]string `toml:"fields"`
|
||||
FieldsInt map[string]string `toml:"fields_int"`
|
||||
FieldsHex []string `toml:"fields_bytes_as_hex"`
|
||||
|
||||
FieldSelection string `toml:"field_selection"`
|
||||
FieldNameQuery string `toml:"field_name"`
|
||||
FieldValueQuery string `toml:"field_value"`
|
||||
FieldNameExpand bool `toml:"field_name_expansion"`
|
||||
|
||||
TagSelection string `toml:"tag_selection"`
|
||||
TagNameQuery string `toml:"tag_name"`
|
||||
TagValueQuery string `toml:"tag_value"`
|
||||
TagNameExpand bool `toml:"tag_name_expansion"`
|
||||
|
||||
FieldsHexFilter filter.Filter
|
||||
Location *time.Location
|
||||
}
|
||||
|
|
@ -73,13 +73,6 @@ func (v *Parser) SetDefaultTags(tags map[string]string) {
|
|||
v.DefaultTags = tags
|
||||
}
|
||||
|
||||
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||
func (v *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
v.MetricName = config.MetricName
|
||||
v.DefaultTags = config.DefaultTags
|
||||
return v.Init()
|
||||
}
|
||||
|
||||
func (v *Parser) Init() error {
|
||||
if v.FieldName == "" {
|
||||
v.FieldName = "value"
|
||||
|
|
|
|||
|
|
@ -215,10 +215,6 @@ func (p *PointParser) reset(buf []byte) {
|
|||
p.buf.n = 0
|
||||
}
|
||||
|
||||
func (p *Parser) InitFromConfig(_ *parsers.Config) error {
|
||||
return p.Init()
|
||||
}
|
||||
|
||||
func init() {
|
||||
parsers.Add("wavefront",
|
||||
func(_ string) telegraf.Parser {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/models"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/temporary/xpath"
|
||||
)
|
||||
|
||||
type dataNode interface{}
|
||||
|
|
@ -40,20 +39,45 @@ type Parser struct {
|
|||
PrintDocument bool `toml:"xpath_print_document"`
|
||||
AllowEmptySelection bool `toml:"xpath_allow_empty_selection"`
|
||||
NativeTypes bool `toml:"xpath_native_types"`
|
||||
Configs []xpath.Config `toml:"xpath"`
|
||||
Configs []Config `toml:"xpath"`
|
||||
DefaultMetricName string `toml:"-"`
|
||||
DefaultTags map[string]string `toml:"-"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
// Required for backward compatibility
|
||||
ConfigsXML []xpath.Config `toml:"xml" deprecated:"1.23.1;use 'xpath' instead"`
|
||||
ConfigsJSON []xpath.Config `toml:"xpath_json" deprecated:"1.23.1;use 'xpath' instead"`
|
||||
ConfigsMsgPack []xpath.Config `toml:"xpath_msgpack" deprecated:"1.23.1;use 'xpath' instead"`
|
||||
ConfigsProto []xpath.Config `toml:"xpath_protobuf" deprecated:"1.23.1;use 'xpath' instead"`
|
||||
ConfigsXML []Config `toml:"xml" deprecated:"1.23.1;use 'xpath' instead"`
|
||||
ConfigsJSON []Config `toml:"xpath_json" deprecated:"1.23.1;use 'xpath' instead"`
|
||||
ConfigsMsgPack []Config `toml:"xpath_msgpack" deprecated:"1.23.1;use 'xpath' instead"`
|
||||
ConfigsProto []Config `toml:"xpath_protobuf" deprecated:"1.23.1;use 'xpath' instead"`
|
||||
|
||||
document dataDocument
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
MetricQuery string `toml:"metric_name"`
|
||||
Selection string `toml:"metric_selection"`
|
||||
Timestamp string `toml:"timestamp"`
|
||||
TimestampFmt string `toml:"timestamp_format"`
|
||||
Timezone string `toml:"timezone"`
|
||||
Tags map[string]string `toml:"tags"`
|
||||
Fields map[string]string `toml:"fields"`
|
||||
FieldsInt map[string]string `toml:"fields_int"`
|
||||
FieldsHex []string `toml:"fields_bytes_as_hex"`
|
||||
|
||||
FieldSelection string `toml:"field_selection"`
|
||||
FieldNameQuery string `toml:"field_name"`
|
||||
FieldValueQuery string `toml:"field_value"`
|
||||
FieldNameExpand bool `toml:"field_name_expansion"`
|
||||
|
||||
TagSelection string `toml:"tag_selection"`
|
||||
TagNameQuery string `toml:"tag_name"`
|
||||
TagValueQuery string `toml:"tag_value"`
|
||||
TagNameExpand bool `toml:"tag_name_expansion"`
|
||||
|
||||
FieldsHexFilter filter.Filter
|
||||
Location *time.Location
|
||||
}
|
||||
|
||||
func (p *Parser) Init() error {
|
||||
switch p.Format {
|
||||
case "", "xml":
|
||||
|
|
@ -211,7 +235,7 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
|
|||
p.DefaultTags = tags
|
||||
}
|
||||
|
||||
func (p *Parser) parseQuery(starttime time.Time, doc, selected dataNode, config xpath.Config) (telegraf.Metric, error) {
|
||||
func (p *Parser) parseQuery(starttime time.Time, doc, selected dataNode, config Config) (telegraf.Metric, error) {
|
||||
var timestamp time.Time
|
||||
var metricname string
|
||||
|
||||
|
|
@ -596,23 +620,3 @@ func init() {
|
|||
},
|
||||
)
|
||||
}
|
||||
|
||||
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||
p.Format = config.DataFormat
|
||||
if p.Format == "xpath_protobuf" {
|
||||
p.ProtobufMessageDef = config.XPathProtobufFile
|
||||
p.ProtobufMessageType = config.XPathProtobufType
|
||||
}
|
||||
p.PrintDocument = config.XPathPrintDocument
|
||||
p.DefaultMetricName = config.MetricName
|
||||
p.DefaultTags = config.DefaultTags
|
||||
|
||||
// Convert the config formats which is a one-to-one copy
|
||||
if len(config.XPathConfig) > 0 {
|
||||
p.Configs = make([]xpath.Config, 0, len(config.XPathConfig))
|
||||
p.Configs = append(p.Configs, config.XPathConfig...)
|
||||
}
|
||||
|
||||
return p.Init()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/file"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/temporary/xpath"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/influxdata/toml"
|
||||
|
||||
|
|
@ -111,14 +110,14 @@ func TestParseInvalidXML(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
defaultTags map[string]string
|
||||
expectedError string
|
||||
}{
|
||||
{
|
||||
name: "invalid XML (missing close tag)",
|
||||
input: invalidXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
MetricQuery: "test",
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
|
|
@ -150,14 +149,14 @@ func TestInvalidTypeQueriesFail(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
defaultTags map[string]string
|
||||
expectedError string
|
||||
}{
|
||||
{
|
||||
name: "invalid field (int) type",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
FieldsInt: map[string]string{
|
||||
|
|
@ -191,14 +190,14 @@ func TestInvalidTypeQueries(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
defaultTags map[string]string
|
||||
expected telegraf.Metric
|
||||
}{
|
||||
{
|
||||
name: "invalid field type (number)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -219,7 +218,7 @@ func TestInvalidTypeQueries(t *testing.T) {
|
|||
{
|
||||
name: "invalid field type (boolean)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -261,14 +260,14 @@ func TestParseTimestamps(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
defaultTags map[string]string
|
||||
expected telegraf.Metric
|
||||
}{
|
||||
{
|
||||
name: "parse timestamp (no fmt)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
},
|
||||
|
|
@ -284,7 +283,7 @@ func TestParseTimestamps(t *testing.T) {
|
|||
{
|
||||
name: "parse timestamp (unix)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
TimestampFmt: "unix",
|
||||
|
|
@ -301,7 +300,7 @@ func TestParseTimestamps(t *testing.T) {
|
|||
{
|
||||
name: "parse timestamp (unix_ms)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix_ms",
|
||||
TimestampFmt: "unix_ms",
|
||||
|
|
@ -318,7 +317,7 @@ func TestParseTimestamps(t *testing.T) {
|
|||
{
|
||||
name: "parse timestamp (unix_us)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix_us",
|
||||
TimestampFmt: "unix_us",
|
||||
|
|
@ -335,7 +334,7 @@ func TestParseTimestamps(t *testing.T) {
|
|||
{
|
||||
name: "parse timestamp (unix_us)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix_ns",
|
||||
TimestampFmt: "unix_ns",
|
||||
|
|
@ -352,7 +351,7 @@ func TestParseTimestamps(t *testing.T) {
|
|||
{
|
||||
name: "parse timestamp (RFC3339)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_iso",
|
||||
TimestampFmt: "2006-01-02T15:04:05Z",
|
||||
|
|
@ -390,14 +389,14 @@ func TestParseSingleValues(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
defaultTags map[string]string
|
||||
expected telegraf.Metric
|
||||
}{
|
||||
{
|
||||
name: "parse scalar values as string fields",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -424,7 +423,7 @@ func TestParseSingleValues(t *testing.T) {
|
|||
{
|
||||
name: "parse scalar values as typed fields (w/o int)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -451,7 +450,7 @@ func TestParseSingleValues(t *testing.T) {
|
|||
{
|
||||
name: "parse values as typed fields (w/ int)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -480,7 +479,7 @@ func TestParseSingleValues(t *testing.T) {
|
|||
{
|
||||
name: "parse substring values",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -503,7 +502,7 @@ func TestParseSingleValues(t *testing.T) {
|
|||
{
|
||||
name: "parse substring values (typed)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -526,7 +525,7 @@ func TestParseSingleValues(t *testing.T) {
|
|||
{
|
||||
name: "parse substring values (typed int)",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
FieldsInt: map[string]string{
|
||||
|
|
@ -549,7 +548,7 @@ func TestParseSingleValues(t *testing.T) {
|
|||
{
|
||||
name: "parse tags",
|
||||
input: singleMetricValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
Tags: map[string]string{
|
||||
|
|
@ -593,14 +592,14 @@ func TestParseSingleAttributes(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
defaultTags map[string]string
|
||||
expected telegraf.Metric
|
||||
}{
|
||||
{
|
||||
name: "parse attr timestamp (unix)",
|
||||
input: singleMetricAttributesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix/@value",
|
||||
},
|
||||
|
|
@ -616,7 +615,7 @@ func TestParseSingleAttributes(t *testing.T) {
|
|||
{
|
||||
name: "parse attr timestamp (RFC3339)",
|
||||
input: singleMetricAttributesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_iso/@value",
|
||||
TimestampFmt: "2006-01-02T15:04:05Z",
|
||||
|
|
@ -633,7 +632,7 @@ func TestParseSingleAttributes(t *testing.T) {
|
|||
{
|
||||
name: "parse attr as string fields",
|
||||
input: singleMetricAttributesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix/@value",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -660,7 +659,7 @@ func TestParseSingleAttributes(t *testing.T) {
|
|||
{
|
||||
name: "parse attr as typed fields (w/o int)",
|
||||
input: singleMetricAttributesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix/@value",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -687,7 +686,7 @@ func TestParseSingleAttributes(t *testing.T) {
|
|||
{
|
||||
name: "parse attr as typed fields (w/ int)",
|
||||
input: singleMetricAttributesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix/@value",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -716,7 +715,7 @@ func TestParseSingleAttributes(t *testing.T) {
|
|||
{
|
||||
name: "parse attr substring",
|
||||
input: singleMetricAttributesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix/@value",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -737,7 +736,7 @@ func TestParseSingleAttributes(t *testing.T) {
|
|||
{
|
||||
name: "parse attr tags",
|
||||
input: singleMetricAttributesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix/@value",
|
||||
Tags: map[string]string{
|
||||
|
|
@ -760,7 +759,7 @@ func TestParseSingleAttributes(t *testing.T) {
|
|||
{
|
||||
name: "parse attr bool",
|
||||
input: singleMetricAttributesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Device_1/Timestamp_unix/@value",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -802,14 +801,14 @@ func TestParseMultiValues(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
defaultTags map[string]string
|
||||
expected telegraf.Metric
|
||||
}{
|
||||
{
|
||||
name: "select values (float)",
|
||||
input: singleMetricMultiValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Timestamp/@value",
|
||||
Fields: map[string]string{
|
||||
|
|
@ -840,7 +839,7 @@ func TestParseMultiValues(t *testing.T) {
|
|||
{
|
||||
name: "select values (int)",
|
||||
input: singleMetricMultiValuesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Timestamp: "/Timestamp/@value",
|
||||
FieldsInt: map[string]string{
|
||||
|
|
@ -892,14 +891,14 @@ func TestParseMultiNodes(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
defaultTags map[string]string
|
||||
expected []telegraf.Metric
|
||||
}{
|
||||
{
|
||||
name: "select all devices",
|
||||
input: multipleNodesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Selection: "/Device",
|
||||
Timestamp: "/Timestamp/@value",
|
||||
|
|
@ -1009,14 +1008,14 @@ func TestParseMetricQuery(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
defaultTags map[string]string
|
||||
expected telegraf.Metric
|
||||
}{
|
||||
{
|
||||
name: "parse metric name query",
|
||||
input: metricNameQueryXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
MetricQuery: "name(/Device_1/Metric/@*[1])",
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
|
|
@ -1038,7 +1037,7 @@ func TestParseMetricQuery(t *testing.T) {
|
|||
{
|
||||
name: "parse metric name constant",
|
||||
input: metricNameQueryXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
MetricQuery: "'the_metric'",
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
|
|
@ -1081,13 +1080,13 @@ func TestParseErrors(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "string metric name query",
|
||||
input: metricNameQueryXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
MetricQuery: "arbitrary",
|
||||
Timestamp: "/Device_1/Timestamp_unix",
|
||||
|
|
@ -1121,12 +1120,12 @@ func TestEmptySelection(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
}{
|
||||
{
|
||||
name: "empty path",
|
||||
input: multipleNodesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Selection: "/Device/NonExisting",
|
||||
Fields: map[string]string{"value": "number(Value)"},
|
||||
|
|
@ -1138,7 +1137,7 @@ func TestEmptySelection(t *testing.T) {
|
|||
{
|
||||
name: "empty pattern",
|
||||
input: multipleNodesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Selection: "//NonExisting",
|
||||
Fields: map[string]string{"value": "number(Value)"},
|
||||
|
|
@ -1150,7 +1149,7 @@ func TestEmptySelection(t *testing.T) {
|
|||
{
|
||||
name: "empty axis",
|
||||
input: multipleNodesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Selection: "/Device/child::NonExisting",
|
||||
Fields: map[string]string{"value": "number(Value)"},
|
||||
|
|
@ -1162,7 +1161,7 @@ func TestEmptySelection(t *testing.T) {
|
|||
{
|
||||
name: "empty predicate",
|
||||
input: multipleNodesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Selection: "/Device[@NonExisting=true]",
|
||||
Fields: map[string]string{"value": "number(Value)"},
|
||||
|
|
@ -1194,12 +1193,12 @@ func TestEmptySelectionAllowed(t *testing.T) {
|
|||
var tests = []struct {
|
||||
name string
|
||||
input string
|
||||
configs []xpath.Config
|
||||
configs []Config
|
||||
}{
|
||||
{
|
||||
name: "empty path",
|
||||
input: multipleNodesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Selection: "/Device/NonExisting",
|
||||
Fields: map[string]string{"value": "number(Value)"},
|
||||
|
|
@ -1211,7 +1210,7 @@ func TestEmptySelectionAllowed(t *testing.T) {
|
|||
{
|
||||
name: "empty pattern",
|
||||
input: multipleNodesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Selection: "//NonExisting",
|
||||
Fields: map[string]string{"value": "number(Value)"},
|
||||
|
|
@ -1223,7 +1222,7 @@ func TestEmptySelectionAllowed(t *testing.T) {
|
|||
{
|
||||
name: "empty axis",
|
||||
input: multipleNodesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Selection: "/Device/child::NonExisting",
|
||||
Fields: map[string]string{"value": "number(Value)"},
|
||||
|
|
@ -1235,7 +1234,7 @@ func TestEmptySelectionAllowed(t *testing.T) {
|
|||
{
|
||||
name: "empty predicate",
|
||||
input: multipleNodesXML,
|
||||
configs: []xpath.Config{
|
||||
configs: []Config{
|
||||
{
|
||||
Selection: "/Device[@NonExisting=true]",
|
||||
Fields: map[string]string{"value": "number(Value)"},
|
||||
|
|
@ -1360,7 +1359,7 @@ func TestTestCases(t *testing.T) {
|
|||
Format: fileformat,
|
||||
ProtobufMessageDef: pbmsgdef,
|
||||
ProtobufMessageType: pbmsgtype,
|
||||
Configs: []xpath.Config{*cfg},
|
||||
Configs: []Config{*cfg},
|
||||
Log: testutil.Logger{Name: "parsers.xml"},
|
||||
}
|
||||
require.NoError(t, parser.Init())
|
||||
|
|
@ -1386,7 +1385,7 @@ func TestProtobufImporting(t *testing.T) {
|
|||
ProtobufMessageDef: "person.proto",
|
||||
ProtobufMessageType: "importtest.Person",
|
||||
ProtobufImportPaths: []string{"testcases/protos"},
|
||||
Configs: []xpath.Config{},
|
||||
Configs: []Config{},
|
||||
Log: testutil.Logger{Name: "parsers.protobuf"},
|
||||
}
|
||||
require.NoError(t, parser.Init())
|
||||
|
|
@ -1477,7 +1476,7 @@ func TestMultipleConfigs(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func loadTestConfiguration(filename string) (*xpath.Config, []string, error) {
|
||||
func loadTestConfiguration(filename string) (*Config, []string, error) {
|
||||
buf, err := os.ReadFile(filename)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
|
@ -1490,7 +1489,7 @@ func loadTestConfiguration(filename string) (*xpath.Config, []string, error) {
|
|||
header = append(header, line)
|
||||
}
|
||||
}
|
||||
cfg := xpath.Config{}
|
||||
cfg := Config{}
|
||||
err = toml.Unmarshal(buf, &cfg)
|
||||
return &cfg, header, err
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue