fix(parsers.json_v2): Check if gpath exists and support optional in fields/tags (#10799)
This commit is contained in:
parent
35d69de1fc
commit
b526945c64
|
|
@ -1710,6 +1710,7 @@ func getFieldSubtable(c *Config, metricConfig *ast.Table) []json_v2.DataSet {
|
|||
c.getFieldString(fieldconfig, "path", &f.Path)
|
||||
c.getFieldString(fieldconfig, "rename", &f.Rename)
|
||||
c.getFieldString(fieldconfig, "type", &f.Type)
|
||||
c.getFieldBool(fieldconfig, "optional", &f.Optional)
|
||||
fields = append(fields, f)
|
||||
}
|
||||
}
|
||||
|
|
@ -1729,6 +1730,7 @@ func getTagSubtable(c *Config, metricConfig *ast.Table) []json_v2.DataSet {
|
|||
c.getFieldString(fieldconfig, "rename", &t.Rename)
|
||||
t.Type = "string"
|
||||
tags = append(tags, t)
|
||||
c.getFieldBool(fieldconfig, "optional", &t.Optional)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,17 +21,18 @@ You configure this parser by describing the line protocol you want by defining t
|
|||
[[inputs.file.json_v2.tag]]
|
||||
path = "" # A string with valid GJSON path syntax to a non-array/non-object value
|
||||
rename = "new name" # A string with a new name for the tag key
|
||||
## Setting optional to true will suppress errors if the configured Path doesn't match the JSON
|
||||
optional = false
|
||||
[[inputs.file.json_v2.field]]
|
||||
path = "" # A string with valid GJSON path syntax to a non-array/non-object value
|
||||
rename = "new name" # A string with a new name for the tag key
|
||||
type = "int" # A string specifying the type (int,uint,float,string,bool)
|
||||
## Setting optional to true will suppress errors if the configured Path doesn't match the JSON
|
||||
optional = false
|
||||
[[inputs.file.json_v2.object]]
|
||||
path = "" # A string with valid GJSON path syntax, can include array's and object's
|
||||
|
||||
## WARNING: Setting optional to true will suppress errors if the configured Path doesn't match the JSON
|
||||
## This should be used with caution because it removes the safety net of verifying the provided path
|
||||
## This was introduced to support situations when parsing multiple incoming JSON payloads with wildcards
|
||||
## More context: https://github.com/influxdata/telegraf/issues/10072
|
||||
## Setting optional to true will suppress errors if the configured Path doesn't match the JSON
|
||||
optional = false
|
||||
|
||||
## Configuration to define what JSON keys should be used as timestamps ##
|
||||
|
|
@ -99,6 +100,7 @@ Using this field configuration you can gather a non-array/non-object values. Not
|
|||
* **path (REQUIRED)**: A string with valid GJSON path syntax to a non-array/non-object value
|
||||
* **name (OPTIONAL)**: You can define a string value to set the field name. If not defined it will use the trailing word from the provided query.
|
||||
* **type (OPTIONAL)**: You can define a string value to set the desired type (float, int, uint, string, bool). If not defined it won't enforce a type and default to using the original type defined in the JSON (bool, float, or string).
|
||||
* **optional (OPTIONAL)**: Setting optional to true will suppress errors if the configured Path doesn't match the JSON. This should be used with caution because it removes the safety net of verifying the provided path. An example case to use this is with the `inputs.mqtt_consumer` plugin when you are expecting multiple JSON files.
|
||||
|
||||
#### **tag**
|
||||
|
||||
|
|
@ -106,6 +108,7 @@ Using this tag configuration you can gather a non-array/non-object values. Note
|
|||
|
||||
* **path (REQUIRED)**: A string with valid GJSON path syntax to a non-array/non-object value
|
||||
* **name (OPTIONAL)**: You can define a string value to set the field name. If not defined it will use the trailing word from the provided query.
|
||||
* **optional (OPTIONAL)**: Setting optional to true will suppress errors if the configured Path doesn't match the JSON. This should be used with caution because it removes the safety net of verifying the provided path. An example case to use this is with the `inputs.mqtt_consumer` plugin when you are expecting multiple JSON files.
|
||||
|
||||
For good examples in using `field` and `tag` you can reference the following example configs:
|
||||
|
||||
|
|
@ -118,6 +121,7 @@ With the configuration section `object`, you can gather values from [JSON object
|
|||
#### The following keys can be set for `object`
|
||||
|
||||
* **path (REQUIRED)**: You must define the path query that gathers the object with [GJSON Path Syntax](https://github.com/tidwall/gjson/blob/v1.7.5/SYNTAX.md)
|
||||
* **optional (OPTIONAL)**: Setting optional to true will suppress errors if the configured Path doesn't match the JSON. This should be used with caution because it removes the safety net of verifying the provided path. An example case to use this is with the `inputs.mqtt_consumer` plugin when you are expecting multiple JSON files.
|
||||
|
||||
*Keys to define what JSON keys should be used as timestamps:*
|
||||
|
||||
|
|
|
|||
|
|
@ -52,9 +52,10 @@ type Config struct {
|
|||
}
|
||||
|
||||
type DataSet struct {
|
||||
Path string `toml:"path"` // REQUIRED
|
||||
Type string `toml:"type"` // OPTIONAL, can't be set for tags they will always be a string
|
||||
Rename string `toml:"rename"`
|
||||
Path string `toml:"path"` // REQUIRED
|
||||
Type string `toml:"type"` // OPTIONAL, can't be set for tags they will always be a string
|
||||
Rename string `toml:"rename"`
|
||||
Optional bool `toml:"optional"` // Will suppress errors if there isn't a match with Path
|
||||
}
|
||||
|
||||
type JSONObject struct {
|
||||
|
|
@ -89,8 +90,6 @@ type MetricNode struct {
|
|||
gjson.Result
|
||||
}
|
||||
|
||||
const GJSONPathNUllErrorMSG = "GJSON Path returned null, either couldn't find value or path has null value"
|
||||
|
||||
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
|
||||
// Only valid JSON is supported
|
||||
if !gjson.Valid(string(input)) {
|
||||
|
|
@ -116,7 +115,7 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
|
|||
|
||||
if result.Type == gjson.Null {
|
||||
p.Log.Debugf("Message: %s", input)
|
||||
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
|
||||
return nil, fmt.Errorf("The timestamp path %s returned NULL", c.TimestampPath)
|
||||
}
|
||||
if !result.IsArray() && !result.IsObject() {
|
||||
if c.TimestampFormat == "" {
|
||||
|
|
@ -182,8 +181,11 @@ func (p *Parser) processMetric(input []byte, data []DataSet, tag bool, timestamp
|
|||
return nil, fmt.Errorf("GJSON path is required")
|
||||
}
|
||||
result := gjson.GetBytes(input, c.Path)
|
||||
if result.Type == gjson.Null {
|
||||
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
|
||||
if skip, err := p.checkResult(result, c.Path, c.Optional); err != nil {
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if result.IsObject() {
|
||||
|
|
@ -226,6 +228,10 @@ func (p *Parser) processMetric(input []byte, data []DataSet, tag bool, timestamp
|
|||
metrics[i] = cartesianProduct(metrics[i-1], metrics[i])
|
||||
}
|
||||
|
||||
if len(metrics) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return metrics[len(metrics)-1], nil
|
||||
}
|
||||
|
||||
|
|
@ -412,22 +418,22 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject, timestamp ti
|
|||
}
|
||||
|
||||
result := gjson.GetBytes(input, c.Path)
|
||||
if result.Type == gjson.Null {
|
||||
if c.Optional {
|
||||
// If path is marked as optional don't error if path doesn't return a result
|
||||
p.Log.Debugf(GJSONPathNUllErrorMSG)
|
||||
if skip, err := p.checkResult(result, c.Path, c.Optional); err != nil {
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
scopedJSON := []byte(result.Raw)
|
||||
for _, f := range c.FieldPaths {
|
||||
var r PathResult
|
||||
r.result = gjson.GetBytes(scopedJSON, f.Path)
|
||||
if r.result.Type == gjson.Null {
|
||||
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
|
||||
if skip, err := p.checkResult(r.result, f.Path, f.Optional); err != nil {
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
r.DataSet = f
|
||||
p.subPathResults = append(p.subPathResults, r)
|
||||
|
|
@ -436,8 +442,11 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject, timestamp ti
|
|||
for _, f := range c.TagPaths {
|
||||
var r PathResult
|
||||
r.result = gjson.GetBytes(scopedJSON, f.Path)
|
||||
if r.result.Type == gjson.Null {
|
||||
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
|
||||
if skip, err := p.checkResult(r.result, f.Path, f.Optional); err != nil {
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
r.DataSet = f
|
||||
r.tag = true
|
||||
|
|
@ -649,3 +658,17 @@ func (p *Parser) convertType(input gjson.Result, desiredType string, name string
|
|||
|
||||
return input.Value(), nil
|
||||
}
|
||||
|
||||
func (p *Parser) checkResult(result gjson.Result, path string, optional bool) (bool, error) {
|
||||
if !result.Exists() {
|
||||
if optional {
|
||||
// If path is marked as optional don't error if path doesn't return a result
|
||||
p.Log.Debugf("the path %s doesn't exist", path)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, fmt.Errorf("the path %s doesn't exist", path)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/file"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/json_v2"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
@ -28,11 +27,11 @@ func TestMultipleConfigs(t *testing.T) {
|
|||
|
||||
expectedErrors := []struct {
|
||||
Name string
|
||||
Error error
|
||||
Error string
|
||||
}{
|
||||
{
|
||||
Name: "wrong_path",
|
||||
Error: fmt.Errorf(json_v2.GJSONPathNUllErrorMSG),
|
||||
Error: "wrong",
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -58,7 +57,7 @@ func TestMultipleConfigs(t *testing.T) {
|
|||
var expectedError bool
|
||||
for _, e := range expectedErrors {
|
||||
if e.Name == f.Name() {
|
||||
require.Equal(t, e.Error, err)
|
||||
require.Contains(t, err.Error(), e.Error)
|
||||
expectedError = true
|
||||
break
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
weight,customer_name=Customer,imei=123,serial_number=AX00 weight=289.799
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"weight_ROWKEY": "123",
|
||||
"weight_serialNumber": "AX00",
|
||||
"weight_createdAt": 1644708158939,
|
||||
"weight_weight": 289.799,
|
||||
"sensor_imei": "123",
|
||||
"sensor_distributor_name": null,
|
||||
"sensor_customer_name": "Customer",
|
||||
"sensor_dist_name": null
|
||||
}
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
# Example taken from: https://github.com/influxdata/telegraf/issues/5940
|
||||
|
||||
[[inputs.file]]
|
||||
files = ["./testdata/10670/input.json"]
|
||||
data_format = "json_v2"
|
||||
[[inputs.file.json_v2]]
|
||||
measurement_name = "weight"
|
||||
timestamp_format = "unix_ms"
|
||||
timestamp_path = "weight_createdAt"
|
||||
[[inputs.file.json_v2.field]]
|
||||
path = "weight_weight"
|
||||
rename = "weight"
|
||||
type = "float"
|
||||
[[inputs.file.json_v2.tag]]
|
||||
path = "weight_serialNumber"
|
||||
rename = "serial_number"
|
||||
[[inputs.file.json_v2.tag]]
|
||||
path = "weight_ROWKEY"
|
||||
rename = "imei"
|
||||
[[inputs.file.json_v2.tag]]
|
||||
path = "sensor_customer_name"
|
||||
rename = "customer_name"
|
||||
[[inputs.file.json_v2.tag]]
|
||||
path = "sensor_distributor_name"
|
||||
rename = "distributor_name"
|
||||
|
|
@ -5,6 +5,9 @@
|
|||
files = ["./testdata/optional/input.json"]
|
||||
data_format = "json_v2"
|
||||
[[inputs.file.json_v2]]
|
||||
[[inputs.file.json_v2.field]]
|
||||
path = "wrong"
|
||||
optional = true
|
||||
[[inputs.file.json_v2.object]]
|
||||
path = "wrong"
|
||||
optional = true
|
||||
|
|
|
|||
Loading…
Reference in New Issue