feat: Migrate json parser to new style (#11226)

This commit is contained in:
Sven Rebhan 2022-06-22 17:56:51 +02:00 committed by GitHub
parent 336ae2f110
commit c46f42f797
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 406 additions and 409 deletions

View File

@ -1750,8 +1750,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"grace", "graphite_separator", "graphite_tag_sanitize_mode", "graphite_tag_support",
"grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns",
"grok_timezone", "grok_unique_timestamp", "influx_max_line_bytes", "influx_parser_type", "influx_sort_fields",
"influx_uint_support", "interval", "json_name_key", "json_query", "json_strict",
"json_string_fields", "json_time_format", "json_time_key", "json_timestamp_format", "json_timestamp_units", "json_timezone", "json_v2",
"influx_uint_support", "interval", "json_timestamp_units", "json_v2",
"lvm", "metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix",
"name_suffix", "namedrop", "namepass", "order", "pass", "period", "precision",
"prefix", "prometheus_export_timestamp", "prometheus_ignore_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
)
@ -116,13 +117,10 @@ func TestMultipleJSONFileImports(t *testing.T) {
err := r.Init()
require.NoError(t, err)
parserConfig := parsers.Config{
DataFormat: "json",
JSONNameKey: "Name",
}
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
p := &json.Parser{NameKey: "Name"}
err := p.Init()
return p, err
})
// Let's drop a 5-line LINE-DELIMITED json.
@ -166,13 +164,10 @@ func TestFileTag(t *testing.T) {
err := r.Init()
require.NoError(t, err)
parserConfig := parsers.Config{
DataFormat: "json",
JSONNameKey: "Name",
}
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
p := &json.Parser{NameKey: "Name"}
err := p.Init()
return p, err
})
// Let's drop a 1-line LINE-DELIMITED json.

View File

@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
)
@ -80,10 +81,8 @@ func (r runnerMock) Run(_ string, _ []string, _ time.Duration) ([]byte, []byte,
}
func TestExec(t *testing.T) {
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
parser := &json.Parser{MetricName: "exec"}
require.NoError(t, parser.Init())
e := &Exec{
Log: testutil.Logger{},
runner: newRunnerMock([]byte(validJSON), nil, nil),
@ -110,10 +109,8 @@ func TestExec(t *testing.T) {
}
func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
parser := &json.Parser{MetricName: "exec"}
require.NoError(t, parser.Init())
e := &Exec{
Log: testutil.Logger{},
runner: newRunnerMock([]byte(malformedJSON), nil, nil),
@ -127,10 +124,8 @@ func TestExecMalformed(t *testing.T) {
}
func TestCommandError(t *testing.T) {
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
parser := &json.Parser{MetricName: "exec"}
require.NoError(t, parser.Init())
e := &Exec{
Log: testutil.Logger{},
runner: newRunnerMock(nil, nil, fmt.Errorf("exit status code 1")),
@ -144,14 +139,14 @@ func TestCommandError(t *testing.T) {
}
func TestExecCommandWithGlob(t *testing.T) {
parser, _ := parsers.NewValueParser("metric", "string", "", nil)
parser, err := parsers.NewValueParser("metric", "string", "", nil)
require.NoError(t, err)
e := NewExec()
e.Commands = []string{"/bin/ech* metric_value"}
e.SetParser(parser)
var acc testutil.Accumulator
err := acc.GatherError(e.Gather)
require.NoError(t, err)
require.NoError(t, acc.GatherError(e.Gather))
fields := map[string]interface{}{
"value": "metric_value",
@ -160,14 +155,15 @@ func TestExecCommandWithGlob(t *testing.T) {
}
func TestExecCommandWithoutGlob(t *testing.T) {
parser, _ := parsers.NewValueParser("metric", "string", "", nil)
parser, err := parsers.NewValueParser("metric", "string", "", nil)
require.NoError(t, err)
e := NewExec()
e.Commands = []string{"/bin/echo metric_value"}
e.SetParser(parser)
var acc testutil.Accumulator
err := acc.GatherError(e.Gather)
require.NoError(t, err)
require.NoError(t, acc.GatherError(e.Gather))
fields := map[string]interface{}{
"value": "metric_value",
@ -176,14 +172,14 @@ func TestExecCommandWithoutGlob(t *testing.T) {
}
func TestExecCommandWithoutGlobAndPath(t *testing.T) {
parser, _ := parsers.NewValueParser("metric", "string", "", nil)
parser, err := parsers.NewValueParser("metric", "string", "", nil)
require.NoError(t, err)
e := NewExec()
e.Commands = []string{"echo metric_value"}
e.SetParser(parser)
var acc testutil.Accumulator
err := acc.GatherError(e.Gather)
require.NoError(t, err)
require.NoError(t, acc.GatherError(e.Gather))
fields := map[string]interface{}{
"value": "metric_value",
@ -192,15 +188,15 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) {
}
func TestExecCommandWithEnv(t *testing.T) {
parser, _ := parsers.NewValueParser("metric", "string", "", nil)
parser, err := parsers.NewValueParser("metric", "string", "", nil)
require.NoError(t, err)
e := NewExec()
e.Commands = []string{"/bin/sh -c 'echo ${METRIC_NAME}'"}
e.Environment = []string{"METRIC_NAME=metric_value"}
e.SetParser(parser)
var acc testutil.Accumulator
err := acc.GatherError(e.Gather)
require.NoError(t, err)
require.NoError(t, acc.GatherError(e.Gather))
fields := map[string]interface{}{
"value": "metric_value",

View File

@ -17,6 +17,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
)
@ -43,16 +44,15 @@ func TestFileTag(t *testing.T) {
Files: []string{filepath.Join(wd, "dev/testfiles/json_a.log")},
FileTag: "filename",
}
err = r.Init()
require.NoError(t, err)
require.NoError(t, r.Init())
parserConfig := parsers.Config{
DataFormat: "json",
}
r.SetParserFunc(func() (telegraf.Parser, error) { return parsers.NewParser(&parserConfig) })
r.SetParserFunc(func() (telegraf.Parser, error) {
p := &json.Parser{}
err := p.Init()
return p, err
})
err = r.Gather(&acc)
require.NoError(t, err)
require.NoError(t, r.Gather(&acc))
for _, m := range acc.Metrics {
for key, value := range m.Tags {
@ -68,13 +68,13 @@ func TestJSONParserCompile(t *testing.T) {
r := File{
Files: []string{filepath.Join(wd, "dev/testfiles/json_a.log")},
}
err := r.Init()
require.NoError(t, err)
parserConfig := parsers.Config{
DataFormat: "json",
TagKeys: []string{"parent_ignored_child"},
}
r.SetParserFunc(func() (telegraf.Parser, error) { return parsers.NewParser(&parserConfig) })
require.NoError(t, r.Init())
r.SetParserFunc(func() (telegraf.Parser, error) {
p := &json.Parser{TagKeys: []string{"parent_ignored_child"}}
err := p.Init()
return p, err
})
require.NoError(t, r.Gather(&acc))
require.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags)

View File

@ -18,6 +18,7 @@ import (
httpplugin "github.com/influxdata/telegraf/plugins/inputs/http"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
)
@ -39,10 +40,9 @@ func TestHTTPWithJSONFormat(t *testing.T) {
metricName := "metricName"
plugin.SetParserFunc(func() (telegraf.Parser, error) {
return parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
p := &json.Parser{MetricName: "metricName"}
err := p.Init()
return p, err
})
var acc testutil.Accumulator
@ -83,10 +83,9 @@ func TestHTTPHeaders(t *testing.T) {
}
plugin.SetParserFunc(func() (telegraf.Parser, error) {
return parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
p := &json.Parser{MetricName: "metricName"}
err := p.Init()
return p, err
})
var acc testutil.Accumulator
@ -117,10 +116,9 @@ func TestHTTPContentLengthHeader(t *testing.T) {
}
plugin.SetParserFunc(func() (telegraf.Parser, error) {
return parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
p := &json.Parser{MetricName: "metricName"}
err := p.Init()
return p, err
})
var acc testutil.Accumulator
@ -141,10 +139,9 @@ func TestInvalidStatusCode(t *testing.T) {
}
plugin.SetParserFunc(func() (telegraf.Parser, error) {
return parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
p := &json.Parser{MetricName: "metricName"}
err := p.Init()
return p, err
})
var acc testutil.Accumulator
@ -166,10 +163,9 @@ func TestSuccessStatusCodes(t *testing.T) {
}
plugin.SetParserFunc(func() (telegraf.Parser, error) {
return parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
p := &json.Parser{MetricName: "metricName"}
err := p.Init()
return p, err
})
var acc testutil.Accumulator
@ -194,10 +190,9 @@ func TestMethod(t *testing.T) {
}
plugin.SetParserFunc(func() (telegraf.Parser, error) {
return parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
p := &json.Parser{MetricName: "metricName"}
err := p.Init()
return p, err
})
var acc testutil.Accumulator

View File

@ -16,7 +16,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/json"
)
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
@ -136,13 +136,12 @@ func (h *HTTPJSON) gatherServer(
"server": serverURL,
}
parser, err := parsers.NewParser(&parsers.Config{
DataFormat: "json",
parser := &json.Parser{
MetricName: msrmntName,
TagKeys: h.TagKeys,
DefaultTags: tags,
})
if err != nil {
}
if err := parser.Init(); err != nil {
return err
}

View File

@ -7,6 +7,7 @@ import (
"github.com/Shopify/sarama"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
@ -42,7 +43,9 @@ func TestRunParser(t *testing.T) {
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewInfluxParser()
var err error
k.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
go k.receiver()
in <- saramaMsg(testMsg)
acc.Wait(1)
@ -57,7 +60,9 @@ func TestRunParserInvalidMsg(t *testing.T) {
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewInfluxParser()
var err error
k.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
go k.receiver()
in <- saramaMsg(invalidMsg)
acc.WaitError(1)
@ -89,7 +94,9 @@ func TestRunParserAndGather(t *testing.T) {
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewInfluxParser()
var err error
k.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
go k.receiver()
in <- saramaMsg(testMsg)
acc.Wait(1)
@ -108,7 +115,9 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
var err error
k.parser, err = parsers.NewGraphiteParser("_", []string{}, nil)
require.NoError(t, err)
go k.receiver()
in <- saramaMsg(testMsgGraphite)
acc.Wait(1)
@ -127,10 +136,11 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
parser := &json.Parser{
MetricName: "kafka_json_test",
})
}
require.NoError(t, parser.Init())
k.parser = parser
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)

View File

@ -23,11 +23,12 @@ func TestKinesisConsumer_onMessage(t *testing.T) {
{"id":"","timestamp":1510254469274,"message":"{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"},"},
{"id":"","timestamp":1510254469274,"message":"{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"}"}
]}`)
parser, _ := json.New(&json.Config{
parser := &json.Parser{
MetricName: "json_test",
Query: "logEvents",
StringFields: []string{"message"},
})
}
require.NoError(t, parser.Init())
type fields struct {
ContentEncoding string

View File

@ -458,10 +458,9 @@ func TestMultipleMetricsOnFirstLine(t *testing.T) {
plugin.Files = []string{tmpfile.Name()}
plugin.PathTag = "customPathTagMyFile"
plugin.SetParserFunc(func() (parsers.Parser, error) {
return json.New(
&json.Config{
MetricName: "cpu",
})
p := &json.Parser{MetricName: "cpu"}
err := p.Init()
return p, err
})
err = plugin.Init()

View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
)
@ -45,7 +46,9 @@ func BenchmarkTCP(b *testing.B) {
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
listener.parser, _ = parsers.NewInfluxParser()
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(b, err)
acc := &testutil.Accumulator{Discard: true}
// send multiple messages to socket
@ -75,7 +78,9 @@ func TestHighTrafficTCP(t *testing.T) {
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
listener.parser, _ = parsers.NewInfluxParser()
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
acc := &testutil.Accumulator{}
// send multiple messages to socket
@ -103,7 +108,9 @@ func TestConnectTCP(t *testing.T) {
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
}
listener.parser, _ = parsers.NewInfluxParser()
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
@ -143,13 +150,15 @@ func TestConcurrentConns(t *testing.T) {
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
listener.parser, _ = parsers.NewInfluxParser()
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
_, err := net.Dial("tcp", "127.0.0.1:8195")
_, err = net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
@ -180,13 +189,15 @@ func TestConcurrentConns1(t *testing.T) {
AllowedPendingMessages: 10000,
MaxTCPConnections: 1,
}
listener.parser, _ = parsers.NewInfluxParser()
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
_, err := net.Dial("tcp", "127.0.0.1:8196")
_, err = net.Dial("tcp", "127.0.0.1:8196")
require.NoError(t, err)
// Connection over the limit:
@ -215,12 +226,14 @@ func TestCloseConcurrentConns(t *testing.T) {
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
listener.parser, _ = parsers.NewInfluxParser()
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
_, err := net.Dial("tcp", "127.0.0.1:8195")
_, err = net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
@ -236,7 +249,9 @@ func TestRunParser(t *testing.T) {
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewInfluxParser()
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
listener.wg.Add(1)
go listener.tcpParser()
@ -257,7 +272,9 @@ func TestRunParserInvalidMsg(t *testing.T) {
listener.Log = &testutil.CaptureLogger{}
listener.acc = &testutil.Accumulator{}
listener.parser, _ = parsers.NewInfluxParser()
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
listener.wg.Add(1)
go listener.tcpParser()
@ -276,7 +293,9 @@ func TestRunParserGraphiteMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
var err error
listener.parser, err = parsers.NewGraphiteParser("_", []string{}, nil)
require.NoError(t, err)
listener.wg.Add(1)
go listener.tcpParser()
@ -296,10 +315,10 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
parser := &json.Parser{MetricName: "udp_json_test"}
require.NoError(t, parser.Init())
listener.parser = parser
listener.wg.Add(1)
go listener.tcpParser()

View File

@ -3,5 +3,6 @@ package all
import (
//Blank imports for plugins to register themselves
_ "github.com/influxdata/telegraf/plugins/parsers/csv"
_ "github.com/influxdata/telegraf/plugins/parsers/json"
_ "github.com/influxdata/telegraf/plugins/parsers/xpath"
)

View File

@ -0,0 +1,77 @@
package json
import (
"fmt"
"strconv"
)
type JSONFlattener struct {
Fields map[string]interface{}
}
// FlattenJSON flattens nested maps/interfaces into a fields map (ignoring bools and string)
func (f *JSONFlattener) FlattenJSON(
fieldname string,
v interface{}) error {
if f.Fields == nil {
f.Fields = make(map[string]interface{})
}
return f.FullFlattenJSON(fieldname, v, false, false)
}
// FullFlattenJSON flattens nested maps/interfaces into a fields map (including bools and string)
func (f *JSONFlattener) FullFlattenJSON(
fieldname string,
v interface{},
convertString bool,
convertBool bool,
) error {
if f.Fields == nil {
f.Fields = make(map[string]interface{})
}
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
fieldkey := k
if fieldname != "" {
fieldkey = fieldname + "_" + fieldkey
}
err := f.FullFlattenJSON(fieldkey, v, convertString, convertBool)
if err != nil {
return err
}
}
case []interface{}:
for i, v := range t {
fieldkey := strconv.Itoa(i)
if fieldname != "" {
fieldkey = fieldname + "_" + fieldkey
}
err := f.FullFlattenJSON(fieldkey, v, convertString, convertBool)
if err != nil {
return err
}
}
case float64:
f.Fields[fieldname] = t
case string:
if !convertString {
return nil
}
f.Fields[fieldname] = v.(string)
case bool:
if !convertBool {
return nil
}
f.Fields[fieldname] = v.(bool)
case nil:
return nil
default:
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",
t, t, fieldname)
}
return nil
}

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)
var (
@ -21,57 +22,22 @@ var (
ErrWrongType = errors.New("must be an object or an array of objects")
)
type Config struct {
MetricName string
TagKeys []string
NameKey string
StringFields []string
Query string
TimeKey string
TimeFormat string
Timezone string
DefaultTags map[string]string
Strict bool
}
type Parser struct {
metricName string
tagKeys filter.Filter
stringFields filter.Filter
nameKey string
query string
timeKey string
timeFormat string
timezone string
defaultTags map[string]string
strict bool
MetricName string `toml:"metric_name"`
TagKeys []string `toml:"tag_keys"`
NameKey string `toml:"json_name_key"`
StringFields []string `toml:"json_string_fields"`
Query string `toml:"json_query"`
TimeKey string `toml:"json_time_key"`
TimeFormat string `toml:"json_time_format"`
Timezone string `toml:"json_timezone"`
Strict bool `toml:"json_strict"`
Log telegraf.Logger `toml:"-"`
}
DefaultTags map[string]string `toml:"-"`
Log telegraf.Logger `toml:"-"`
func New(config *Config) (*Parser, error) {
stringFilter, err := filter.Compile(config.StringFields)
if err != nil {
return nil, err
}
tagKeyFilter, err := filter.Compile(config.TagKeys)
if err != nil {
return nil, err
}
return &Parser{
metricName: config.MetricName,
tagKeys: tagKeyFilter,
nameKey: config.NameKey,
stringFields: stringFilter,
query: config.Query,
timeKey: config.TimeKey,
timeFormat: config.TimeFormat,
timezone: config.Timezone,
defaultTags: config.DefaultTags,
strict: config.Strict,
}, nil
tagFilter filter.Filter
stringFilter filter.Filter
}
func (p *Parser) parseArray(data []interface{}, timestamp time.Time) ([]telegraf.Metric, error) {
@ -82,7 +48,7 @@ func (p *Parser) parseArray(data []interface{}, timestamp time.Time) ([]telegraf
case map[string]interface{}:
metrics, err := p.parseObject(v, timestamp)
if err != nil {
if p.strict {
if p.Strict {
return nil, err
}
continue
@ -98,7 +64,7 @@ func (p *Parser) parseArray(data []interface{}, timestamp time.Time) ([]telegraf
func (p *Parser) parseObject(data map[string]interface{}, timestamp time.Time) ([]telegraf.Metric, error) {
tags := make(map[string]string)
for k, v := range p.defaultTags {
for k, v := range p.DefaultTags {
tags[k] = v
}
@ -108,33 +74,33 @@ func (p *Parser) parseObject(data map[string]interface{}, timestamp time.Time) (
return nil, err
}
name := p.metricName
name := p.MetricName
// checks if json_name_key is set
if p.nameKey != "" {
if field, ok := f.Fields[p.nameKey].(string); ok {
if p.NameKey != "" {
if field, ok := f.Fields[p.NameKey].(string); ok {
name = field
}
}
// if time key is specified, set timestamp to it
if p.timeKey != "" {
if p.timeFormat == "" {
if p.TimeKey != "" {
if p.TimeFormat == "" {
err := fmt.Errorf("use of 'json_time_key' requires 'json_time_format'")
return nil, err
}
if f.Fields[p.timeKey] == nil {
if f.Fields[p.TimeKey] == nil {
err := fmt.Errorf("JSON time key could not be found")
return nil, err
}
timestamp, err = internal.ParseTimestamp(p.timeFormat, f.Fields[p.timeKey], p.timezone)
timestamp, err = internal.ParseTimestamp(p.TimeFormat, f.Fields[p.TimeKey], p.Timezone)
if err != nil {
return nil, err
}
delete(f.Fields, p.timeKey)
delete(f.Fields, p.TimeKey)
// if the year is 0, set to current year
if timestamp.Year() == 0 {
@ -149,19 +115,19 @@ func (p *Parser) parseObject(data map[string]interface{}, timestamp time.Time) (
}
// will take in field map with strings and bools,
// search for TagKeys that match fieldnames and add them to tags
// search for tag-keys that match fieldnames and add them to tags
// will delete any strings/bools that shouldn't be fields
// assumes that any non-numeric values in TagKeys should be displayed as tags
func (p *Parser) switchFieldToTag(tags map[string]string, fields map[string]interface{}) (map[string]string, map[string]interface{}) {
for name, value := range fields {
if p.tagKeys == nil {
if p.tagFilter == nil {
continue
}
// skip switch statement if tagkey doesn't match fieldname
if !p.tagKeys.Match(name) {
if !p.tagFilter.Match(name) {
continue
}
// switch any fields in tagkeys into tags
// switch any fields in TagKeys into tags
switch t := value.(type) {
case string:
tags[name] = t
@ -181,7 +147,7 @@ func (p *Parser) switchFieldToTag(tags map[string]string, fields map[string]inte
for fk := range fields {
switch fields[fk].(type) {
case string, bool:
if p.stringFields != nil && p.stringFields.Match(fk) {
if p.stringFilter != nil && p.stringFilter.Match(fk) {
continue
}
delete(fields, fk)
@ -190,9 +156,25 @@ func (p *Parser) switchFieldToTag(tags map[string]string, fields map[string]inte
return tags, fields
}
func (p *Parser) Init() error {
var err error
p.stringFilter, err = filter.Compile(p.StringFields)
if err != nil {
return fmt.Errorf("compiling string-fields filter failed: %v", err)
}
p.tagFilter, err = filter.Compile(p.TagKeys)
if err != nil {
return fmt.Errorf("compiling tag-key filter failed: %v", err)
}
return nil
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
if p.query != "" {
result := gjson.GetBytes(buf, p.query)
if p.Query != "" {
result := gjson.GetBytes(buf, p.Query)
buf = []byte(result.Raw)
if !result.IsArray() && !result.IsObject() && result.Type != gjson.Null {
err := fmt.Errorf("query path must lead to a JSON object, array of objects or null, but lead to: %v", result.Type)
@ -243,76 +225,27 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.defaultTags = tags
p.DefaultTags = tags
}
type JSONFlattener struct {
Fields map[string]interface{}
func init() {
parsers.Add("json",
func(defaultMetricName string) telegraf.Parser {
return &Parser{MetricName: defaultMetricName}
})
}
// FlattenJSON flattens nested maps/interfaces into a fields map (ignoring bools and string)
func (f *JSONFlattener) FlattenJSON(
fieldname string,
v interface{}) error {
if f.Fields == nil {
f.Fields = make(map[string]interface{})
}
func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.MetricName = config.MetricName
p.TagKeys = config.TagKeys
p.NameKey = config.JSONNameKey
p.StringFields = config.JSONStringFields
p.Query = config.JSONQuery
p.TimeKey = config.JSONTimeKey
p.TimeFormat = config.JSONTimeFormat
p.Timezone = config.JSONTimezone
p.Strict = config.JSONStrict
p.DefaultTags = config.DefaultTags
return f.FullFlattenJSON(fieldname, v, false, false)
}
// FullFlattenJSON flattens nested maps/interfaces into a fields map (including bools and string)
func (f *JSONFlattener) FullFlattenJSON(
fieldname string,
v interface{},
convertString bool,
convertBool bool,
) error {
if f.Fields == nil {
f.Fields = make(map[string]interface{})
}
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
fieldkey := k
if fieldname != "" {
fieldkey = fieldname + "_" + fieldkey
}
err := f.FullFlattenJSON(fieldkey, v, convertString, convertBool)
if err != nil {
return err
}
}
case []interface{}:
for i, v := range t {
fieldkey := strconv.Itoa(i)
if fieldname != "" {
fieldkey = fieldname + "_" + fieldkey
}
err := f.FullFlattenJSON(fieldkey, v, convertString, convertBool)
if err != nil {
return err
}
}
case float64:
f.Fields[fieldname] = t
case string:
if !convertString {
return nil
}
f.Fields[fieldname] = v.(string)
case bool:
if !convertBool {
return nil
}
f.Fields[fieldname] = v.(bool)
case nil:
return nil
default:
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",
t, t, fieldname)
}
return nil
return p.Init()
}

View File

@ -75,10 +75,8 @@ const validJSONArrayTags = `
`
func TestParseValidJSON(t *testing.T) {
parser, err := New(&Config{
MetricName: "json_test",
})
require.NoError(t, err)
parser := &Parser{MetricName: "json_test"}
require.NoError(t, parser.Init())
// Most basic vanilla test
metrics, err := parser.Parse([]byte(validJSON))
@ -125,10 +123,8 @@ func TestParseValidJSON(t *testing.T) {
}
func TestParseLineValidJSON(t *testing.T) {
parser, err := New(&Config{
MetricName: "json_test",
})
require.NoError(t, err)
parser := &Parser{MetricName: "json_test"}
require.NoError(t, parser.Init())
// Most basic vanilla test
metric, err := parser.ParseLine(validJSON)
@ -162,12 +158,10 @@ func TestParseLineValidJSON(t *testing.T) {
}
func TestParseInvalidJSON(t *testing.T) {
parser, err := New(&Config{
MetricName: "json_test",
})
require.NoError(t, err)
parser := &Parser{MetricName: "json_test"}
require.NoError(t, parser.Init())
_, err = parser.Parse([]byte(invalidJSON))
_, err := parser.Parse([]byte(invalidJSON))
require.Error(t, err)
_, err = parser.Parse([]byte(invalidJSON2))
require.Error(t, err)
@ -176,47 +170,47 @@ func TestParseInvalidJSON(t *testing.T) {
}
func TestParseJSONImplicitStrictness(t *testing.T) {
parserImplicitNoStrict, err := New(&Config{
parserImplicitNoStrict := &Parser{
MetricName: "json_test",
TimeKey: "time",
})
require.NoError(t, err)
}
require.NoError(t, parserImplicitNoStrict.Init())
_, err = parserImplicitNoStrict.Parse([]byte(mixedValidityJSON))
_, err := parserImplicitNoStrict.Parse([]byte(mixedValidityJSON))
require.NoError(t, err)
}
func TestParseJSONExplicitStrictnessFalse(t *testing.T) {
parserNoStrict, err := New(&Config{
parserNoStrict := &Parser{
MetricName: "json_test",
TimeKey: "time",
Strict: false,
})
require.NoError(t, err)
}
require.NoError(t, parserNoStrict.Init())
_, err = parserNoStrict.Parse([]byte(mixedValidityJSON))
_, err := parserNoStrict.Parse([]byte(mixedValidityJSON))
require.NoError(t, err)
}
func TestParseJSONExplicitStrictnessTrue(t *testing.T) {
parserStrict, err := New(&Config{
parserStrict := &Parser{
MetricName: "json_test",
TimeKey: "time",
Strict: true,
})
require.NoError(t, err)
}
require.NoError(t, parserStrict.Init())
_, err = parserStrict.Parse([]byte(mixedValidityJSON))
_, err := parserStrict.Parse([]byte(mixedValidityJSON))
require.Error(t, err)
}
func TestParseWithTagKeys(t *testing.T) {
// Test that strings not matching tag keys are ignored
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
TagKeys: []string{"wrongtagkey"},
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(validJSONTags))
require.NoError(t, err)
@ -229,11 +223,11 @@ func TestParseWithTagKeys(t *testing.T) {
require.Equal(t, map[string]string{}, metrics[0].Tags())
// Test that single tag key is found and applied
parser, err = New(&Config{
parser = &Parser{
MetricName: "json_test",
TagKeys: []string{"mytag"},
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err = parser.Parse([]byte(validJSONTags))
require.NoError(t, err)
@ -248,11 +242,12 @@ func TestParseWithTagKeys(t *testing.T) {
}, metrics[0].Tags())
// Test that both tag keys are found and applied
parser, err = New(&Config{
parser = &Parser{
MetricName: "json_test",
TagKeys: []string{"mytag", "othertag"},
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err = parser.Parse([]byte(validJSONTags))
require.NoError(t, err)
require.Len(t, metrics, 1)
@ -269,11 +264,12 @@ func TestParseWithTagKeys(t *testing.T) {
func TestParseLineWithTagKeys(t *testing.T) {
// Test that strings not matching tag keys are ignored
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
TagKeys: []string{"wrongtagkey"},
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metric, err := parser.ParseLine(validJSONTags)
require.NoError(t, err)
require.Equal(t, "json_test", metric.Name())
@ -284,11 +280,11 @@ func TestParseLineWithTagKeys(t *testing.T) {
require.Equal(t, map[string]string{}, metric.Tags())
// Test that single tag key is found and applied
parser, err = New(&Config{
parser = &Parser{
MetricName: "json_test",
TagKeys: []string{"mytag"},
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metric, err = parser.ParseLine(validJSONTags)
require.NoError(t, err)
@ -302,11 +298,11 @@ func TestParseLineWithTagKeys(t *testing.T) {
}, metric.Tags())
// Test that both tag keys are found and applied
parser, err = New(&Config{
parser = &Parser{
MetricName: "json_test",
TagKeys: []string{"mytag", "othertag"},
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metric, err = parser.ParseLine(validJSONTags)
require.NoError(t, err)
@ -322,14 +318,12 @@ func TestParseLineWithTagKeys(t *testing.T) {
}
func TestParseValidJSONDefaultTags(t *testing.T) {
parser, err := New(&Config{
MetricName: "json_test",
TagKeys: []string{"mytag"},
DefaultTags: map[string]string{
"t4g": "default",
},
})
require.NoError(t, err)
parser := &Parser{
MetricName: "json_test",
TagKeys: []string{"mytag"},
DefaultTags: map[string]string{"t4g": "default"},
}
require.NoError(t, parser.Init())
// Most basic vanilla test
metrics, err := parser.Parse([]byte(validJSON))
@ -359,14 +353,12 @@ func TestParseValidJSONDefaultTags(t *testing.T) {
// Test that default tags are overridden by tag keys
func TestParseValidJSONDefaultTagsOverride(t *testing.T) {
parser, err := New(&Config{
MetricName: "json_test",
TagKeys: []string{"mytag"},
DefaultTags: map[string]string{
"mytag": "default",
},
})
require.NoError(t, err)
parser := &Parser{
MetricName: "json_test",
TagKeys: []string{"mytag"},
DefaultTags: map[string]string{"mytag": "default"},
}
require.NoError(t, parser.Init())
// Most basic vanilla test
metrics, err := parser.Parse([]byte(validJSON))
@ -395,10 +387,8 @@ func TestParseValidJSONDefaultTagsOverride(t *testing.T) {
// Test that json arrays can be parsed
func TestParseValidJSONArray(t *testing.T) {
parser, err := New(&Config{
MetricName: "json_array_test",
})
require.NoError(t, err)
parser := &Parser{MetricName: "json_array_test"}
require.NoError(t, parser.Init())
// Most basic vanilla test
metrics, err := parser.Parse([]byte(validJSONArray))
@ -431,11 +421,11 @@ func TestParseValidJSONArray(t *testing.T) {
func TestParseArrayWithTagKeys(t *testing.T) {
// Test that strings not matching tag keys are ignored
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_array_test",
TagKeys: []string{"wrongtagkey"},
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(validJSONArrayTags))
require.NoError(t, err)
@ -455,11 +445,11 @@ func TestParseArrayWithTagKeys(t *testing.T) {
require.Equal(t, map[string]string{}, metrics[1].Tags())
// Test that single tag key is found and applied
parser, err = New(&Config{
parser = &Parser{
MetricName: "json_array_test",
TagKeys: []string{"mytag"},
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err = parser.Parse([]byte(validJSONArrayTags))
require.NoError(t, err)
@ -483,11 +473,11 @@ func TestParseArrayWithTagKeys(t *testing.T) {
}, metrics[1].Tags())
// Test that both tag keys are found and applied
parser, err = New(&Config{
parser = &Parser{
MetricName: "json_array_test",
TagKeys: []string{"mytag", "othertag"},
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err = parser.Parse([]byte(validJSONArrayTags))
require.NoError(t, err)
@ -516,13 +506,11 @@ func TestParseArrayWithTagKeys(t *testing.T) {
var jsonBOM = []byte("\xef\xbb\xbf[{\"value\":17}]")
func TestHttpJsonBOM(t *testing.T) {
parser, err := New(&Config{
MetricName: "json_test",
})
require.NoError(t, err)
parser := &Parser{MetricName: "json_test"}
require.NoError(t, parser.Init())
// Most basic vanilla test
_, err = parser.Parse(jsonBOM)
_, err := parser.Parse(jsonBOM)
require.NoError(t, err)
}
@ -546,11 +534,11 @@ func TestJSONParseNestedArray(t *testing.T) {
}
}`
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"},
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(testString))
require.Len(t, metrics, 1)
@ -575,14 +563,14 @@ func TestJSONQueryErrorOnArray(t *testing.T) {
}
}`
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
TagKeys: []string{},
Query: "shares.myArr",
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
_, err = parser.Parse([]byte(testString))
_, err := parser.Parse([]byte(testString))
require.Error(t, err)
}
@ -609,12 +597,12 @@ func TestArrayOfObjects(t *testing.T) {
"more_stuff":"junk"
}`
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
TagKeys: []string{"ice"},
Query: "meta.shares",
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
@ -636,13 +624,13 @@ func TestUseCaseJSONQuery(t *testing.T) {
}
}`
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
StringFields: []string{"last"},
TagKeys: []string{"first"},
Query: "obj.friends",
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
@ -672,12 +660,13 @@ func TestTimeParser(t *testing.T) {
}
]`
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
TimeKey: "b_time",
TimeFormat: "02 Jan 06 15:04 MST",
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
require.Equal(t, 2, len(metrics))
@ -689,13 +678,14 @@ func TestTimeParserWithTimezone(t *testing.T) {
"time": "04 Jan 06 15:04"
}`
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
TimeKey: "time",
TimeFormat: "02 Jan 06 15:04",
Timezone: "America/New_York",
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
require.Equal(t, 1, len(metrics))
@ -724,12 +714,12 @@ func TestUnixTimeParser(t *testing.T) {
}
]`
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
TimeKey: "b_time",
TimeFormat: "unix",
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
@ -759,12 +749,12 @@ func TestUnixMsTimeParser(t *testing.T) {
}
]`
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
TimeKey: "b_time",
TimeFormat: "unix_ms",
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
@ -783,12 +773,12 @@ func TestTimeErrors(t *testing.T) {
"my_tag_2": "baz"
}`
parser, err := New(&Config{
parser := &Parser{
MetricName: "json_test",
TimeKey: "b_time",
TimeFormat: "02 January 06 15:04 MST",
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(testString))
require.Error(t, err)
@ -803,12 +793,12 @@ func TestTimeErrors(t *testing.T) {
"my_tag_2": "baz"
}`
parser, err = New(&Config{
parser = &Parser{
MetricName: "json_test",
TimeKey: "b_time",
TimeFormat: "02 January 06 15:04 MST",
})
require.NoError(t, err)
}
require.NoError(t, parser.Init())
metrics, err = parser.Parse([]byte(testString2))
require.Error(t, err)
@ -817,10 +807,8 @@ func TestTimeErrors(t *testing.T) {
}
func TestShareTimestamp(t *testing.T) {
parser, err := New(&Config{
MetricName: "json_test",
})
require.NoError(t, err)
parser := &Parser{MetricName: "json_test"}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(validJSONArrayMultiple))
require.NoError(t, err)
@ -839,10 +827,8 @@ func TestNameKey(t *testing.T) {
"my_tag_2": "baz"
}`
parser, err := New(&Config{
NameKey: "b_c",
})
require.NoError(t, err)
parser := &Parser{NameKey: "b_c"}
require.NoError(t, parser.Init())
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
@ -852,23 +838,23 @@ func TestNameKey(t *testing.T) {
func TestParseArrayWithWrongType(t *testing.T) {
data := `[{"answer": 42}, 123]`
parser, err := New(&Config{})
require.NoError(t, err)
parser := &Parser{}
require.NoError(t, parser.Init())
_, err = parser.Parse([]byte(data))
_, err := parser.Parse([]byte(data))
require.Error(t, err)
}
func TestParse(t *testing.T) {
tests := []struct {
name string
config *Config
parser *Parser
input []byte
expected []telegraf.Metric
}{
{
name: "tag keys with underscore issue 6705",
config: &Config{
parser: &Parser{
MetricName: "json",
TagKeys: []string{"metric___name__"},
},
@ -888,25 +874,25 @@ func TestParse(t *testing.T) {
},
{
name: "parse empty array",
config: &Config{},
parser: &Parser{},
input: []byte(`[]`),
expected: []telegraf.Metric{},
},
{
name: "parse null",
config: &Config{},
parser: &Parser{},
input: []byte(`null`),
expected: []telegraf.Metric{},
},
{
name: "parse null with query",
config: &Config{Query: "result.data"},
parser: &Parser{Query: "result.data"},
input: []byte(`{"error":null,"result":{"data":null,"items_per_page":10,"total_items":0,"total_pages":0}}`),
expected: []telegraf.Metric{},
},
{
name: "parse simple array",
config: &Config{
parser: &Parser{
MetricName: "json",
},
input: []byte(`[{"answer": 42}]`),
@ -923,7 +909,7 @@ func TestParse(t *testing.T) {
},
{
name: "string field glob",
config: &Config{
parser: &Parser{
MetricName: "json",
StringFields: []string{"*"},
},
@ -947,7 +933,7 @@ func TestParse(t *testing.T) {
},
{
name: "time key is deleted from fields",
config: &Config{
parser: &Parser{
MetricName: "json",
TimeKey: "timestamp",
TimeFormat: "unix",
@ -972,8 +958,8 @@ func TestParse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser, err := New(tt.config)
require.NoError(t, err)
parser := tt.parser
require.NoError(t, parser.Init())
actual, err := parser.Parse(tt.input)
require.NoError(t, err)
@ -986,13 +972,13 @@ func TestParse(t *testing.T) {
func TestParseWithWildcardTagKeys(t *testing.T) {
var tests = []struct {
name string
config *Config
parser *Parser
input []byte
expected []telegraf.Metric
}{
{
name: "wildcard matching with tags nested within object",
config: &Config{
parser: &Parser{
MetricName: "json_test",
TagKeys: []string{"tags_object_*"},
},
@ -1014,7 +1000,7 @@ func TestParseWithWildcardTagKeys(t *testing.T) {
},
{
name: "wildcard matching with keys containing tag",
config: &Config{
parser: &Parser{
MetricName: "json_test",
TagKeys: []string{"*tag"},
},
@ -1038,7 +1024,7 @@ func TestParseWithWildcardTagKeys(t *testing.T) {
},
{
name: "strings not matching tag keys are still also ignored",
config: &Config{
parser: &Parser{
MetricName: "json_test",
TagKeys: []string{"wrongtagkey", "tags_object_*"},
},
@ -1060,7 +1046,7 @@ func TestParseWithWildcardTagKeys(t *testing.T) {
},
{
name: "single tag key is also found and applied",
config: &Config{
parser: &Parser{
MetricName: "json_test",
TagKeys: []string{"mytag", "tags_object_*"},
},
@ -1084,8 +1070,8 @@ func TestParseWithWildcardTagKeys(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser, err := New(tt.config)
require.NoError(t, err)
parser := tt.parser
require.NoError(t, parser.Init())
actual, err := parser.Parse(tt.input)
require.NoError(t, err)
@ -1097,13 +1083,13 @@ func TestParseWithWildcardTagKeys(t *testing.T) {
func TestParseLineWithWildcardTagKeys(t *testing.T) {
var tests = []struct {
name string
config *Config
parser *Parser
input string
expected telegraf.Metric
}{
{
name: "wildcard matching with tags nested within object",
config: &Config{
parser: &Parser{
MetricName: "json_test",
TagKeys: []string{"tags_object_*"},
},
@ -1123,7 +1109,7 @@ func TestParseLineWithWildcardTagKeys(t *testing.T) {
},
{
name: "wildcard matching with keys containing tag",
config: &Config{
parser: &Parser{
MetricName: "json_test",
TagKeys: []string{"*tag"},
},
@ -1145,7 +1131,7 @@ func TestParseLineWithWildcardTagKeys(t *testing.T) {
},
{
name: "strings not matching tag keys are ignored",
config: &Config{
parser: &Parser{
MetricName: "json_test",
TagKeys: []string{"wrongtagkey", "tags_object_*"},
},
@ -1165,7 +1151,7 @@ func TestParseLineWithWildcardTagKeys(t *testing.T) {
},
{
name: "single tag key is also found and applied",
config: &Config{
parser: &Parser{
MetricName: "json_test",
TagKeys: []string{"mytag", "tags_object_*"},
},
@ -1188,8 +1174,8 @@ func TestParseLineWithWildcardTagKeys(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser, err := New(tt.config)
require.NoError(t, err)
parser := tt.parser
require.NoError(t, parser.Init())
actual, err := parser.ParseLine(tt.input)
require.NoError(t, err)
@ -1202,13 +1188,13 @@ func TestParseLineWithWildcardTagKeys(t *testing.T) {
func TestParseArrayWithWildcardTagKeys(t *testing.T) {
var tests = []struct {
name string
config *Config
parser *Parser
input []byte
expected []telegraf.Metric
}{
{
name: "wildcard matching with keys containing tag within array works",
config: &Config{
parser: &Parser{
MetricName: "json_array_test",
TagKeys: []string{"*tag"},
},
@ -1246,7 +1232,7 @@ func TestParseArrayWithWildcardTagKeys(t *testing.T) {
},
{
name: " wildcard matching with tags nested array within object works",
config: &Config{
parser: &Parser{
MetricName: "json_array_test",
TagKeys: []string{"tags_array_*"},
},
@ -1280,7 +1266,7 @@ func TestParseArrayWithWildcardTagKeys(t *testing.T) {
},
{
name: "strings not matching tag keys are still also ignored",
config: &Config{
parser: &Parser{
MetricName: "json_array_test",
TagKeys: []string{"mytag", "*tag"},
},
@ -1318,7 +1304,7 @@ func TestParseArrayWithWildcardTagKeys(t *testing.T) {
},
{
name: "single tag key is also found and applied",
config: &Config{
parser: &Parser{
MetricName: "json_array_test",
TagKeys: []string{"anothert", "*tag"},
},
@ -1360,8 +1346,8 @@ func TestParseArrayWithWildcardTagKeys(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser, err := New(tt.config)
require.NoError(t, err)
parser := tt.parser
require.NoError(t, parser.Init())
actual, err := parser.Parse(tt.input)
require.NoError(t, err)

View File

@ -11,7 +11,6 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/json_v2"
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
@ -233,21 +232,6 @@ func NewParser(config *Config) (Parser, error) {
var err error
var parser Parser
switch config.DataFormat {
case "json":
parser, err = json.New(
&json.Config{
MetricName: config.MetricName,
TagKeys: config.TagKeys,
NameKey: config.JSONNameKey,
StringFields: config.JSONStringFields,
Query: config.JSONQuery,
TimeKey: config.JSONTimeKey,
TimeFormat: config.JSONTimeFormat,
Timezone: config.JSONTimezone,
DefaultTags: config.DefaultTags,
Strict: config.JSONStrict,
},
)
case "value":
parser, err = NewValueParser(config.MetricName,
config.DataType, config.ValueFieldName, config.DefaultTags)

View File

@ -8,6 +8,9 @@ import (
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
//Blank import to register all new-style parsers
_ "github.com/influxdata/telegraf/plugins/parsers/all"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"