fix(parsers.json_v2): allow optional paths and handle wrong paths correctly (#10468)
This commit is contained in:
parent
9cfd7491f8
commit
75946f56f1
|
|
@ -1640,6 +1640,7 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config,
|
||||||
for _, objectConfig := range objectconfigs {
|
for _, objectConfig := range objectconfigs {
|
||||||
var o json_v2.JSONObject
|
var o json_v2.JSONObject
|
||||||
c.getFieldString(objectConfig, "path", &o.Path)
|
c.getFieldString(objectConfig, "path", &o.Path)
|
||||||
|
c.getFieldBool(objectConfig, "optional", &o.Optional)
|
||||||
c.getFieldString(objectConfig, "timestamp_key", &o.TimestampKey)
|
c.getFieldString(objectConfig, "timestamp_key", &o.TimestampKey)
|
||||||
c.getFieldString(objectConfig, "timestamp_format", &o.TimestampFormat)
|
c.getFieldString(objectConfig, "timestamp_format", &o.TimestampFormat)
|
||||||
c.getFieldString(objectConfig, "timestamp_timezone", &o.TimestampTimezone)
|
c.getFieldString(objectConfig, "timestamp_timezone", &o.TimestampTimezone)
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,12 @@ You configure this parser by describing the line protocol you want by defining t
|
||||||
[[inputs.file.json_v2.object]]
|
[[inputs.file.json_v2.object]]
|
||||||
path = "" # A string with valid GJSON path syntax, can include array's and object's
|
path = "" # A string with valid GJSON path syntax, can include array's and object's
|
||||||
|
|
||||||
|
## WARNING: Setting optional to true will suppress errors if the configured Path doesn't match the JSON
|
||||||
|
## This should be used with caution because it removes the safety net of verifying the provided path
|
||||||
|
## This was introduced to support situations when parsing multiple incoming JSON payloads with wildcards
|
||||||
|
## More context: https://github.com/influxdata/telegraf/issues/10072
|
||||||
|
optional = false
|
||||||
|
|
||||||
## Configuration to define what JSON keys should be used as timestamps ##
|
## Configuration to define what JSON keys should be used as timestamps ##
|
||||||
timestamp_key = "" # A JSON key (for a nested key, prepend the parent keys with underscores) to a valid timestamp
|
timestamp_key = "" # A JSON key (for a nested key, prepend the parent keys with underscores) to a valid timestamp
|
||||||
timestamp_format = "" # A string with a valid timestamp format (see below for possible values)
|
timestamp_format = "" # A string with a valid timestamp format (see below for possible values)
|
||||||
|
|
|
||||||
|
|
@ -54,24 +54,25 @@ type Config struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type DataSet struct {
|
type DataSet struct {
|
||||||
Path string `toml:"path"` // REQUIRED
|
Path string `toml:"path"` // REQUIRED
|
||||||
Type string `toml:"type"` // OPTIONAL, can't be set for tags they will always be a string
|
Type string `toml:"type"` // OPTIONAL, can't be set for tags they will always be a string
|
||||||
Rename string `toml:"rename"` // OPTIONAL
|
Rename string `toml:"rename"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type JSONObject struct {
|
type JSONObject struct {
|
||||||
Path string `toml:"path"` // REQUIRED
|
Path string `toml:"path"` // REQUIRED
|
||||||
TimestampKey string `toml:"timestamp_key"` // OPTIONAL
|
Optional bool `toml:"optional"` // Will suppress errors if there isn't a match with Path
|
||||||
TimestampFormat string `toml:"timestamp_format"` // OPTIONAL, but REQUIRED when timestamp_path is defined
|
TimestampKey string `toml:"timestamp_key"`
|
||||||
TimestampTimezone string `toml:"timestamp_timezone"` // OPTIONAL, but REQUIRES timestamp_path
|
TimestampFormat string `toml:"timestamp_format"` // OPTIONAL, but REQUIRED when timestamp_path is defined
|
||||||
Renames map[string]string `toml:"renames"` // OPTIONAL
|
TimestampTimezone string `toml:"timestamp_timezone"` // OPTIONAL, but REQUIRES timestamp_path
|
||||||
Fields map[string]string `toml:"fields"` // OPTIONAL
|
Renames map[string]string `toml:"renames"`
|
||||||
Tags []string `toml:"tags"` // OPTIONAL
|
Fields map[string]string `toml:"fields"`
|
||||||
IncludedKeys []string `toml:"included_keys"` // OPTIONAL
|
Tags []string `toml:"tags"`
|
||||||
ExcludedKeys []string `toml:"excluded_keys"` // OPTIONAL
|
IncludedKeys []string `toml:"included_keys"`
|
||||||
DisablePrependKeys bool `toml:"disable_prepend_keys"` // OPTIONAL
|
ExcludedKeys []string `toml:"excluded_keys"`
|
||||||
FieldPaths []DataSet // OPTIONAL
|
DisablePrependKeys bool `toml:"disable_prepend_keys"`
|
||||||
TagPaths []DataSet // OPTIONAL
|
FieldPaths []DataSet
|
||||||
|
TagPaths []DataSet
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricNode struct {
|
type MetricNode struct {
|
||||||
|
|
@ -90,6 +91,8 @@ type MetricNode struct {
|
||||||
gjson.Result
|
gjson.Result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const GJSONPathNUllErrorMSG = "GJSON Path returned null, either couldn't find value or path has null value"
|
||||||
|
|
||||||
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
|
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
|
||||||
// Only valid JSON is supported
|
// Only valid JSON is supported
|
||||||
if !gjson.Valid(string(input)) {
|
if !gjson.Valid(string(input)) {
|
||||||
|
|
@ -112,6 +115,9 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
|
||||||
p.timestamp = time.Now()
|
p.timestamp = time.Now()
|
||||||
if c.TimestampPath != "" {
|
if c.TimestampPath != "" {
|
||||||
result := gjson.GetBytes(input, c.TimestampPath)
|
result := gjson.GetBytes(input, c.TimestampPath)
|
||||||
|
if result.Type == gjson.Null {
|
||||||
|
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
|
||||||
|
}
|
||||||
if !result.IsArray() && !result.IsObject() {
|
if !result.IsArray() && !result.IsObject() {
|
||||||
if c.TimestampFormat == "" {
|
if c.TimestampFormat == "" {
|
||||||
err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'")
|
err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'")
|
||||||
|
|
@ -175,6 +181,9 @@ func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegr
|
||||||
return nil, fmt.Errorf("GJSON path is required")
|
return nil, fmt.Errorf("GJSON path is required")
|
||||||
}
|
}
|
||||||
result := gjson.GetBytes(input, c.Path)
|
result := gjson.GetBytes(input, c.Path)
|
||||||
|
if result.Type == gjson.Null {
|
||||||
|
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
|
||||||
|
}
|
||||||
|
|
||||||
if result.IsObject() {
|
if result.IsObject() {
|
||||||
p.Log.Debugf("Found object in the path: %s, ignoring it please use 'object' to gather metrics from objects", c.Path)
|
p.Log.Debugf("Found object in the path: %s, ignoring it please use 'object' to gather metrics from objects", c.Path)
|
||||||
|
|
@ -313,7 +322,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if result.SetName == p.objectConfig.TimestampKey {
|
if p.objectConfig.TimestampKey != "" && result.SetName == p.objectConfig.TimestampKey {
|
||||||
if p.objectConfig.TimestampFormat == "" {
|
if p.objectConfig.TimestampFormat == "" {
|
||||||
err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'")
|
err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'")
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -400,12 +409,25 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.
|
||||||
if c.Path == "" {
|
if c.Path == "" {
|
||||||
return nil, fmt.Errorf("GJSON path is required")
|
return nil, fmt.Errorf("GJSON path is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
result := gjson.GetBytes(input, c.Path)
|
result := gjson.GetBytes(input, c.Path)
|
||||||
|
if result.Type == gjson.Null {
|
||||||
|
if c.Optional {
|
||||||
|
// If path is marked as optional don't error if path doesn't return a result
|
||||||
|
p.Log.Debugf(GJSONPathNUllErrorMSG)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
|
||||||
|
}
|
||||||
|
|
||||||
scopedJSON := []byte(result.Raw)
|
scopedJSON := []byte(result.Raw)
|
||||||
for _, f := range c.FieldPaths {
|
for _, f := range c.FieldPaths {
|
||||||
var r PathResult
|
var r PathResult
|
||||||
r.result = gjson.GetBytes(scopedJSON, f.Path)
|
r.result = gjson.GetBytes(scopedJSON, f.Path)
|
||||||
|
if r.result.Type == gjson.Null {
|
||||||
|
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
|
||||||
|
}
|
||||||
r.DataSet = f
|
r.DataSet = f
|
||||||
p.subPathResults = append(p.subPathResults, r)
|
p.subPathResults = append(p.subPathResults, r)
|
||||||
}
|
}
|
||||||
|
|
@ -413,6 +435,9 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.
|
||||||
for _, f := range c.TagPaths {
|
for _, f := range c.TagPaths {
|
||||||
var r PathResult
|
var r PathResult
|
||||||
r.result = gjson.GetBytes(scopedJSON, f.Path)
|
r.result = gjson.GetBytes(scopedJSON, f.Path)
|
||||||
|
if r.result.Type == gjson.Null {
|
||||||
|
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
|
||||||
|
}
|
||||||
r.DataSet = f
|
r.DataSet = f
|
||||||
r.tag = true
|
r.tag = true
|
||||||
p.subPathResults = append(p.subPathResults, r)
|
p.subPathResults = append(p.subPathResults, r)
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/file"
|
"github.com/influxdata/telegraf/plugins/inputs/file"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/json_v2"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
@ -25,6 +26,16 @@ func TestMultipleConfigs(t *testing.T) {
|
||||||
// Make sure testdata contains data
|
// Make sure testdata contains data
|
||||||
require.Greater(t, len(folders), 0)
|
require.Greater(t, len(folders), 0)
|
||||||
|
|
||||||
|
expectedErrors := []struct {
|
||||||
|
Name string
|
||||||
|
Error error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Name: "wrong_path",
|
||||||
|
Error: fmt.Errorf(json_v2.GJSONPathNUllErrorMSG),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
for _, f := range folders {
|
for _, f := range folders {
|
||||||
t.Run(f.Name(), func(t *testing.T) {
|
t.Run(f.Name(), func(t *testing.T) {
|
||||||
// Process the telegraf config file for the test
|
// Process the telegraf config file for the test
|
||||||
|
|
@ -39,11 +50,22 @@ func TestMultipleConfigs(t *testing.T) {
|
||||||
|
|
||||||
// Gather the metrics from the input file configure
|
// Gather the metrics from the input file configure
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
for _, i := range cfg.Inputs {
|
for _, input := range cfg.Inputs {
|
||||||
err = i.Init()
|
err = input.Init()
|
||||||
require.NoError(t, err)
|
|
||||||
err = i.Gather(&acc)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
err = input.Gather(&acc)
|
||||||
|
// If the test has an expected error then require one was received
|
||||||
|
var expectedError bool
|
||||||
|
for _, e := range expectedErrors {
|
||||||
|
if e.Name == f.Name() {
|
||||||
|
require.Equal(t, e.Error, err)
|
||||||
|
expectedError = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !expectedError {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process expected metrics and compare with resulting metrics
|
// Process expected metrics and compare with resulting metrics
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
{
|
||||||
|
"test": "test"
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,12 @@
|
||||||
|
# Example taken from: https://github.com/influxdata/telegraf/issues/7097
|
||||||
|
|
||||||
|
# Parse String types from JSON
|
||||||
|
[[inputs.file]]
|
||||||
|
files = ["./testdata/optional/input.json"]
|
||||||
|
data_format = "json_v2"
|
||||||
|
[[inputs.file.json_v2]]
|
||||||
|
[[inputs.file.json_v2.object]]
|
||||||
|
path = "wrong"
|
||||||
|
optional = true
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
{
|
||||||
|
"correct": "test"
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,46 @@
|
||||||
|
# Example taken from: https://github.com/influxdata/telegraf/issues/7097
|
||||||
|
|
||||||
|
[[inputs.file]]
|
||||||
|
files = ["./testdata/wrong_path/input.json"]
|
||||||
|
data_format = "json_v2"
|
||||||
|
[[inputs.file.json_v2]]
|
||||||
|
[[inputs.file.json_v2.object]]
|
||||||
|
path = "wrong"
|
||||||
|
|
||||||
|
[[inputs.file]]
|
||||||
|
files = ["./testdata/wrong_path/input.json"]
|
||||||
|
data_format = "json_v2"
|
||||||
|
[[inputs.file.json_v2]]
|
||||||
|
[[inputs.file.json_v2.object]]
|
||||||
|
path = "correct"
|
||||||
|
[[inputs.file.json_v2.object.tag]]
|
||||||
|
path = "wrong"
|
||||||
|
|
||||||
|
[[inputs.file]]
|
||||||
|
files = ["./testdata/wrong_path/input.json"]
|
||||||
|
data_format = "json_v2"
|
||||||
|
[[inputs.file.json_v2]]
|
||||||
|
[[inputs.file.json_v2.object]]
|
||||||
|
path = "correct"
|
||||||
|
[[inputs.file.json_v2.object.field]]
|
||||||
|
path = "wrong"
|
||||||
|
|
||||||
|
[[inputs.file]]
|
||||||
|
files = ["./testdata/wrong_path/input.json"]
|
||||||
|
data_format = "json_v2"
|
||||||
|
[[inputs.file.json_v2]]
|
||||||
|
timestamp_path = "wrong"
|
||||||
|
|
||||||
|
[[inputs.file]]
|
||||||
|
files = ["./testdata/wrong_path/input.json"]
|
||||||
|
data_format = "json_v2"
|
||||||
|
[[inputs.file.json_v2]]
|
||||||
|
[[inputs.file.json_v2.tag]]
|
||||||
|
path = "wrong"
|
||||||
|
|
||||||
|
[[inputs.file]]
|
||||||
|
files = ["./testdata/wrong_path/input.json"]
|
||||||
|
data_format = "json_v2"
|
||||||
|
[[inputs.file.json_v2]]
|
||||||
|
[[inputs.file.json_v2.field]]
|
||||||
|
path = "wrong"
|
||||||
Loading…
Reference in New Issue