fix: timestamp change during execution of json_v2 parser. (#10657)

Co-authored-by: Kristian Grimsby <grimsby@Kristians-MacBook-Pro.local>
This commit is contained in:
Grimsby 2022-02-15 17:48:15 +01:00 committed by GitHub
parent 1e5daeda72
commit 6ae9320275
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 22 additions and 21 deletions

View File

@ -23,8 +23,6 @@ type Parser struct {
// measurementName is the the name of the current config used in each line protocol // measurementName is the the name of the current config used in each line protocol
measurementName string measurementName string
// timestamp is the timestamp used in each line protocol, defaults to time.Now()
timestamp time.Time
// **** Specific for object configuration **** // **** Specific for object configuration ****
// subPathResults contains the results of sub-gjson path expressions provided in fields/tags table within object config // subPathResults contains the results of sub-gjson path expressions provided in fields/tags table within object config
@ -112,10 +110,12 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
} }
// timestamp defaults to current time, or can be parsed from the JSON using a GJSON path expression // timestamp defaults to current time, or can be parsed from the JSON using a GJSON path expression
p.timestamp = time.Now() timestamp := time.Now()
if c.TimestampPath != "" { if c.TimestampPath != "" {
result := gjson.GetBytes(input, c.TimestampPath) result := gjson.GetBytes(input, c.TimestampPath)
if result.Type == gjson.Null { if result.Type == gjson.Null {
p.Log.Debugf("Message: %s", input)
return nil, fmt.Errorf(GJSONPathNUllErrorMSG) return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
} }
if !result.IsArray() && !result.IsObject() { if !result.IsArray() && !result.IsObject() {
@ -125,24 +125,25 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
} }
var err error var err error
p.timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.String(), c.TimestampTimezone) timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.String(), c.TimestampTimezone)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
} }
fields, err := p.processMetric(input, c.Fields, false) fields, err := p.processMetric(input, c.Fields, false, timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tags, err := p.processMetric(input, c.Tags, true) tags, err := p.processMetric(input, c.Tags, true, timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
objects, err := p.processObjects(input, c.JSONObjects) objects, err := p.processObjects(input, c.JSONObjects, timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -168,7 +169,7 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
// processMetric will iterate over all 'field' or 'tag' configs and create metrics for each // processMetric will iterate over all 'field' or 'tag' configs and create metrics for each
// A field/tag can either be a single value or an array of values, each resulting in its own metric // A field/tag can either be a single value or an array of values, each resulting in its own metric
// For multiple configs, a set of metrics is created from the cartesian product of each separate config // For multiple configs, a set of metrics is created from the cartesian product of each separate config
func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegraf.Metric, error) { func (p *Parser) processMetric(input []byte, data []DataSet, tag bool, timestamp time.Time) ([]telegraf.Metric, error) {
if len(data) == 0 { if len(data) == 0 {
return nil, nil return nil, nil
} }
@ -207,13 +208,13 @@ func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegr
p.measurementName, p.measurementName,
map[string]string{}, map[string]string{},
map[string]interface{}{}, map[string]interface{}{},
p.timestamp, timestamp,
), ),
Result: result, Result: result,
} }
// Expand all array's and nested arrays into separate metrics // Expand all array's and nested arrays into separate metrics
nodes, err := p.expandArray(mNode) nodes, err := p.expandArray(mNode, timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -259,7 +260,7 @@ func mergeMetric(a telegraf.Metric, m telegraf.Metric) {
} }
// expandArray will recursively create a new MetricNode for each element in a JSON array or single value // expandArray will recursively create a new MetricNode for each element in a JSON array or single value
func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) { func (p *Parser) expandArray(result MetricNode, timestamp time.Time) ([]telegraf.Metric, error) {
var results []telegraf.Metric var results []telegraf.Metric
if result.IsObject() { if result.IsObject() {
@ -267,7 +268,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
p.Log.Debugf("Found object in query ignoring it please use 'object' to gather metrics from objects") p.Log.Debugf("Found object in query ignoring it please use 'object' to gather metrics from objects")
return results, nil return results, nil
} }
r, err := p.combineObject(result) r, err := p.combineObject(result, timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -285,14 +286,14 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
p.measurementName, p.measurementName,
map[string]string{}, map[string]string{},
map[string]interface{}{}, map[string]interface{}{},
p.timestamp, timestamp,
) )
if val.IsObject() { if val.IsObject() {
n := result n := result
n.ParentIndex += val.Index n.ParentIndex += val.Index
n.Metric = m n.Metric = m
n.Result = val n.Result = val
r, err := p.combineObject(n) r, err := p.combineObject(n, timestamp)
if err != nil { if err != nil {
return false return false
} }
@ -311,7 +312,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
n.ParentIndex += val.Index n.ParentIndex += val.Index
n.Metric = m n.Metric = m
n.Result = val n.Result = val
r, err := p.expandArray(n) r, err := p.expandArray(n, timestamp)
if err != nil { if err != nil {
return false return false
} }
@ -400,7 +401,7 @@ func (p *Parser) existsInpathResults(index int) *PathResult {
} }
// processObjects will iterate over all 'object' configs and create metrics for each // processObjects will iterate over all 'object' configs and create metrics for each
func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.Metric, error) { func (p *Parser) processObjects(input []byte, objects []JSONObject, timestamp time.Time) ([]telegraf.Metric, error) {
p.iterateObjects = true p.iterateObjects = true
var t []telegraf.Metric var t []telegraf.Metric
for _, c := range objects { for _, c := range objects {
@ -449,11 +450,11 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.
p.measurementName, p.measurementName,
map[string]string{}, map[string]string{},
map[string]interface{}{}, map[string]interface{}{},
p.timestamp, timestamp,
), ),
Result: result, Result: result,
} }
metrics, err := p.expandArray(rootObject) metrics, err := p.expandArray(rootObject, timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -465,7 +466,7 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.
// combineObject will add all fields/tags to a single metric // combineObject will add all fields/tags to a single metric
// If the object has multiple array's as elements it won't comine those, they will remain separate metrics // If the object has multiple array's as elements it won't comine those, they will remain separate metrics
func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) { func (p *Parser) combineObject(result MetricNode, timestamp time.Time) ([]telegraf.Metric, error) {
var results []telegraf.Metric var results []telegraf.Metric
if result.IsArray() || result.IsObject() { if result.IsArray() || result.IsObject() {
var err error var err error
@ -519,12 +520,12 @@ func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) {
arrayNode.Tag = tag arrayNode.Tag = tag
if val.IsObject() { if val.IsObject() {
results, err = p.combineObject(arrayNode) results, err = p.combineObject(arrayNode, timestamp)
if err != nil { if err != nil {
return false return false
} }
} else { } else {
r, err := p.expandArray(arrayNode) r, err := p.expandArray(arrayNode, timestamp)
if err != nil { if err != nil {
return false return false
} }