diff --git a/plugins/parsers/json_v2/parser.go b/plugins/parsers/json_v2/parser.go index ebeb6545b..c0570fddd 100644 --- a/plugins/parsers/json_v2/parser.go +++ b/plugins/parsers/json_v2/parser.go @@ -12,19 +12,27 @@ import ( "github.com/tidwall/gjson" ) +// Parser adheres to the parser interface, contains the parser configuration, and data required to parse JSON type Parser struct { - InputJSON []byte + // These struct fields are common for a parser Configs []Config DefaultTags map[string]string Log telegraf.Logger - Timestamp time.Time + // **** The struct fields bellow this comment are used for processing indvidual configs **** + + // measurementName is the the name of the current config used in each line protocol measurementName string + // timestamp is the timestamp used in each line protocol, defaults to time.Now() + timestamp time.Time + // **** Specific for object configuration **** + // subPathResults contains the results of sub-gjson path expressions provided in fields/tags table within object config + subPathResults []PathResult + // iterateObjects dictates if ExpandArray function will handle objects iterateObjects bool - - currentSettings JSONObject - pathResults []PathResult + // objectConfig contains the config for an object, some info is needed while iterating over the gjson results + objectConfig JSONObject } type PathResult struct { @@ -83,28 +91,27 @@ type MetricNode struct { } func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { - p.InputJSON = input // Only valid JSON is supported - if !gjson.Valid(string(p.InputJSON)) { - return nil, fmt.Errorf("Invalid JSON provided, unable to parse") + if !gjson.Valid(string(input)) { + return nil, fmt.Errorf("invalid JSON provided, unable to parse") } var metrics []telegraf.Metric for _, c := range p.Configs { - // Measurement name configuration + // Measurement name can either be hardcoded, or parsed from the JSON using a GJSON path expression p.measurementName = c.MeasurementName if c.MeasurementNamePath != "" { - result := gjson.GetBytes(p.InputJSON, c.MeasurementNamePath) + result := gjson.GetBytes(input, c.MeasurementNamePath) if !result.IsArray() && !result.IsObject() { p.measurementName = result.String() } } - // Timestamp configuration - p.Timestamp = time.Now() + // timestamp defaults to current time, or can be parsed from the JSON using a GJSON path expression + p.timestamp = time.Now() if c.TimestampPath != "" { - result := gjson.GetBytes(p.InputJSON, c.TimestampPath) + result := gjson.GetBytes(input, c.TimestampPath) if !result.IsArray() && !result.IsObject() { if c.TimestampFormat == "" { err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'") @@ -112,24 +119,24 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { } var err error - p.Timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.Value(), c.TimestampTimezone) + p.timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.Value(), c.TimestampTimezone) if err != nil { return nil, err } } } - fields, err := p.processMetric(c.Fields, false) + fields, err := p.processMetric(input, c.Fields, false) if err != nil { return nil, err } - tags, err := p.processMetric(c.Tags, true) + tags, err := p.processMetric(input, c.Tags, true) if err != nil { return nil, err } - objects, err := p.processObjects(c.JSONObjects) + objects, err := p.processObjects(input, c.JSONObjects) if err != nil { return nil, err } @@ -155,7 +162,7 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { // processMetric will iterate over all 'field' or 'tag' configs and create metrics for each // A field/tag can either be a single value or an array of values, each resulting in its own metric // For multiple configs, a set of metrics is created from the cartesian product of each separate config -func (p *Parser) processMetric(data []DataSet, tag bool) ([]telegraf.Metric, error) { +func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegraf.Metric, error) { if len(data) == 0 { return nil, nil } @@ -167,7 +174,7 @@ func (p *Parser) processMetric(data []DataSet, tag bool) ([]telegraf.Metric, err if c.Path == "" { return nil, fmt.Errorf("GJSON path is required") } - result := gjson.GetBytes(p.InputJSON, c.Path) + result := gjson.GetBytes(input, c.Path) if result.IsObject() { p.Log.Debugf("Found object in the path: %s, ignoring it please use 'object' to gather metrics from objects", c.Path) @@ -191,7 +198,7 @@ func (p *Parser) processMetric(data []DataSet, tag bool) ([]telegraf.Metric, err p.measurementName, map[string]string{}, map[string]interface{}{}, - p.Timestamp, + p.timestamp, ), Result: result, } @@ -251,7 +258,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { p.Log.Debugf("Found object in query ignoring it please use 'object' to gather metrics from objects") return results, nil } - if result.IncludeCollection == nil && (len(p.currentSettings.FieldPaths) > 0 || len(p.currentSettings.TagPaths) > 0) { + if result.IncludeCollection == nil && (len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0) { result.IncludeCollection = p.existsInpathResults(result.Index, result.Raw) } r, err := p.combineObject(result) @@ -264,7 +271,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { if result.IsArray() { var err error - if result.IncludeCollection == nil && (len(p.currentSettings.FieldPaths) > 0 || len(p.currentSettings.TagPaths) > 0) { + if result.IncludeCollection == nil && (len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0) { result.IncludeCollection = p.existsInpathResults(result.Index, result.Raw) } result.ForEach(func(_, val gjson.Result) bool { @@ -272,7 +279,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { p.measurementName, map[string]string{}, map[string]interface{}{}, - p.Timestamp, + p.timestamp, ) if val.IsObject() { if p.iterateObjects { @@ -280,7 +287,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { n.ParentIndex += val.Index n.Metric = m n.Result = val - if n.IncludeCollection == nil && (len(p.currentSettings.FieldPaths) > 0 || len(p.currentSettings.TagPaths) > 0) { + if n.IncludeCollection == nil && (len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0) { n.IncludeCollection = p.existsInpathResults(n.Index, n.Raw) } r, err := p.combineObject(n) @@ -310,7 +317,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { n.ParentIndex += val.Index n.Metric = m n.Result = val - if n.IncludeCollection == nil && (len(p.currentSettings.FieldPaths) > 0 || len(p.currentSettings.TagPaths) > 0) { + if n.IncludeCollection == nil && (len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0) { n.IncludeCollection = p.existsInpathResults(n.Index, n.Raw) } r, err := p.expandArray(n) @@ -324,12 +331,12 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { return nil, err } } else { - if result.SetName == p.currentSettings.TimestampKey { - if p.currentSettings.TimestampFormat == "" { + if result.SetName == p.objectConfig.TimestampKey { + if p.objectConfig.TimestampFormat == "" { err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'") return nil, err } - timestamp, err := internal.ParseTimestamp(p.currentSettings.TimestampFormat, result.Value(), p.currentSettings.TimestampTimezone) + timestamp, err := internal.ParseTimestamp(p.objectConfig.TimestampFormat, result.Value(), p.objectConfig.TimestampTimezone) if err != nil { return nil, err } @@ -341,7 +348,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { outputName := result.OutputName desiredType := result.DesiredType - if len(p.currentSettings.FieldPaths) > 0 || len(p.currentSettings.TagPaths) > 0 { + if len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0 { var pathResult *PathResult // When IncludeCollection isn't nil, that means the current result is included in the collection. if result.IncludeCollection != nil { @@ -386,7 +393,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { } func (p *Parser) existsInpathResults(index int, raw string) *PathResult { - for _, f := range p.pathResults { + for _, f := range p.subPathResults { if f.result.Index == 0 { for _, i := range f.result.Indexes { if i == index { @@ -401,23 +408,23 @@ func (p *Parser) existsInpathResults(index int, raw string) *PathResult { } // processObjects will iterate over all 'object' configs and create metrics for each -func (p *Parser) processObjects(objects []JSONObject) ([]telegraf.Metric, error) { +func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.Metric, error) { p.iterateObjects = true var t []telegraf.Metric for _, c := range objects { - p.currentSettings = c + p.objectConfig = c if c.Path == "" { return nil, fmt.Errorf("GJSON path is required") } - result := gjson.GetBytes(p.InputJSON, c.Path) + result := gjson.GetBytes(input, c.Path) scopedJSON := []byte(result.Raw) for _, f := range c.FieldPaths { var r PathResult r.result = gjson.GetBytes(scopedJSON, f.Path) r.DataSet = f - p.pathResults = append(p.pathResults, r) + p.subPathResults = append(p.subPathResults, r) } for _, f := range c.TagPaths { @@ -425,7 +432,7 @@ func (p *Parser) processObjects(objects []JSONObject) ([]telegraf.Metric, error) r.result = gjson.GetBytes(scopedJSON, f.Path) r.DataSet = f r.tag = true - p.pathResults = append(p.pathResults, r) + p.subPathResults = append(p.subPathResults, r) } if result.Type == gjson.Null { @@ -438,7 +445,7 @@ func (p *Parser) processObjects(objects []JSONObject) ([]telegraf.Metric, error) p.measurementName, map[string]string{}, map[string]interface{}{}, - p.Timestamp, + p.timestamp, ), Result: result, } @@ -472,12 +479,12 @@ func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) { } var outputName string - if p.currentSettings.DisablePrependKeys { + if p.objectConfig.DisablePrependKeys { outputName = strings.ReplaceAll(key.String(), " ", "_") } else { outputName = setName } - for k, n := range p.currentSettings.Renames { + for k, n := range p.objectConfig.Renames { if k == setName { outputName = n break @@ -490,7 +497,7 @@ func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) { arrayNode.SetName = setName arrayNode.Result = val - for k, t := range p.currentSettings.Fields { + for k, t := range p.objectConfig.Fields { if setName == k { arrayNode.DesiredType = t break @@ -498,7 +505,7 @@ func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) { } tag := false - for _, t := range p.currentSettings.Tags { + for _, t := range p.objectConfig.Tags { if setName == t { tag = true break @@ -531,11 +538,11 @@ func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) { } func (p *Parser) isIncluded(key string, val gjson.Result) bool { - if len(p.currentSettings.IncludedKeys) == 0 { + if len(p.objectConfig.IncludedKeys) == 0 { return true } // automatically adds tags to included_keys so it does NOT have to be repeated in the config - allKeys := append(p.currentSettings.IncludedKeys, p.currentSettings.Tags...) + allKeys := append(p.objectConfig.IncludedKeys, p.objectConfig.Tags...) for _, i := range allKeys { if i == key { return true @@ -551,7 +558,7 @@ func (p *Parser) isIncluded(key string, val gjson.Result) bool { } func (p *Parser) isExcluded(key string) bool { - for _, i := range p.currentSettings.ExcludedKeys { + for _, i := range p.objectConfig.ExcludedKeys { if i == key { return true } @@ -576,25 +583,25 @@ func (p *Parser) convertType(input gjson.Result, desiredType string, name string case "uint": r, err := strconv.ParseUint(inputType, 10, 64) if err != nil { - return nil, fmt.Errorf("Unable to convert field '%s' to type uint: %v", name, err) + return nil, fmt.Errorf("unable to convert field '%s' to type uint: %v", name, err) } return r, nil case "int": r, err := strconv.ParseInt(inputType, 10, 64) if err != nil { - return nil, fmt.Errorf("Unable to convert field '%s' to type int: %v", name, err) + return nil, fmt.Errorf("unable to convert field '%s' to type int: %v", name, err) } return r, nil case "float": r, err := strconv.ParseFloat(inputType, 64) if err != nil { - return nil, fmt.Errorf("Unable to convert field '%s' to type float: %v", name, err) + return nil, fmt.Errorf("unable to convert field '%s' to type float: %v", name, err) } return r, nil case "bool": r, err := strconv.ParseBool(inputType) if err != nil { - return nil, fmt.Errorf("Unable to convert field '%s' to type bool: %v", name, err) + return nil, fmt.Errorf("unable to convert field '%s' to type bool: %v", name, err) } return r, nil } @@ -631,7 +638,7 @@ func (p *Parser) convertType(input gjson.Result, desiredType string, name string } else if inputType == 1 { return true, nil } else { - return nil, fmt.Errorf("Unable to convert field '%s' to type bool", name) + return nil, fmt.Errorf("unable to convert field '%s' to type bool", name) } } }