2021-06-11 03:22:18 +08:00
|
|
|
package json_v2
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/influxdata/telegraf"
|
|
|
|
|
"github.com/influxdata/telegraf/internal"
|
|
|
|
|
"github.com/influxdata/telegraf/metric"
|
|
|
|
|
"github.com/tidwall/gjson"
|
|
|
|
|
)
|
|
|
|
|
|
2021-10-13 05:07:34 +08:00
|
|
|
// Parser adheres to the parser interface, contains the parser configuration, and data required to parse JSON
|
2021-06-11 03:22:18 +08:00
|
|
|
type Parser struct {
|
2021-10-13 05:07:34 +08:00
|
|
|
// These struct fields are common for a parser
|
2021-06-11 03:22:18 +08:00
|
|
|
Configs []Config
|
|
|
|
|
DefaultTags map[string]string
|
|
|
|
|
Log telegraf.Logger
|
|
|
|
|
|
2021-10-13 05:07:34 +08:00
|
|
|
// **** The struct fields bellow this comment are used for processing indvidual configs ****
|
|
|
|
|
|
|
|
|
|
// measurementName is the the name of the current config used in each line protocol
|
2021-06-11 03:22:18 +08:00
|
|
|
measurementName string
|
2021-10-13 05:07:34 +08:00
|
|
|
// timestamp is the timestamp used in each line protocol, defaults to time.Now()
|
|
|
|
|
timestamp time.Time
|
2021-06-11 03:22:18 +08:00
|
|
|
|
2021-10-13 05:07:34 +08:00
|
|
|
// **** Specific for object configuration ****
|
|
|
|
|
// subPathResults contains the results of sub-gjson path expressions provided in fields/tags table within object config
|
|
|
|
|
subPathResults []PathResult
|
|
|
|
|
// iterateObjects dictates if ExpandArray function will handle objects
|
2021-10-05 02:19:06 +08:00
|
|
|
iterateObjects bool
|
2021-10-13 05:07:34 +08:00
|
|
|
// objectConfig contains the config for an object, some info is needed while iterating over the gjson results
|
|
|
|
|
objectConfig JSONObject
|
2021-10-05 02:19:06 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type PathResult struct {
|
|
|
|
|
result gjson.Result
|
|
|
|
|
tag bool
|
|
|
|
|
DataSet
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Config struct {
|
|
|
|
|
MeasurementName string `toml:"measurement_name"` // OPTIONAL
|
|
|
|
|
MeasurementNamePath string `toml:"measurement_name_path"` // OPTIONAL
|
|
|
|
|
TimestampPath string `toml:"timestamp_path"` // OPTIONAL
|
|
|
|
|
TimestampFormat string `toml:"timestamp_format"` // OPTIONAL, but REQUIRED when timestamp_path is defined
|
|
|
|
|
TimestampTimezone string `toml:"timestamp_timezone"` // OPTIONAL, but REQUIRES timestamp_path
|
|
|
|
|
|
|
|
|
|
Fields []DataSet
|
|
|
|
|
Tags []DataSet
|
|
|
|
|
JSONObjects []JSONObject
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type DataSet struct {
|
|
|
|
|
Path string `toml:"path"` // REQUIRED
|
|
|
|
|
Type string `toml:"type"` // OPTIONAL, can't be set for tags they will always be a string
|
|
|
|
|
Rename string `toml:"rename"` // OPTIONAL
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type JSONObject struct {
|
|
|
|
|
Path string `toml:"path"` // REQUIRED
|
|
|
|
|
TimestampKey string `toml:"timestamp_key"` // OPTIONAL
|
|
|
|
|
TimestampFormat string `toml:"timestamp_format"` // OPTIONAL, but REQUIRED when timestamp_path is defined
|
|
|
|
|
TimestampTimezone string `toml:"timestamp_timezone"` // OPTIONAL, but REQUIRES timestamp_path
|
|
|
|
|
Renames map[string]string `toml:"renames"` // OPTIONAL
|
|
|
|
|
Fields map[string]string `toml:"fields"` // OPTIONAL
|
|
|
|
|
Tags []string `toml:"tags"` // OPTIONAL
|
|
|
|
|
IncludedKeys []string `toml:"included_keys"` // OPTIONAL
|
|
|
|
|
ExcludedKeys []string `toml:"excluded_keys"` // OPTIONAL
|
|
|
|
|
DisablePrependKeys bool `toml:"disable_prepend_keys"` // OPTIONAL
|
2021-10-05 02:19:06 +08:00
|
|
|
FieldPaths []DataSet // OPTIONAL
|
|
|
|
|
TagPaths []DataSet // OPTIONAL
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type MetricNode struct {
|
2021-10-05 02:19:06 +08:00
|
|
|
ParentIndex int
|
2021-06-11 03:22:18 +08:00
|
|
|
OutputName string
|
|
|
|
|
SetName string
|
|
|
|
|
Tag bool
|
|
|
|
|
DesiredType string // Can be "int", "uint", "float", "bool", "string"
|
2021-10-05 02:19:06 +08:00
|
|
|
/*
|
|
|
|
|
IncludeCollection is only used when processing objects and is responsible for containing the gjson results
|
|
|
|
|
found by the gjson paths provided in the FieldPaths and TagPaths configs.
|
|
|
|
|
*/
|
|
|
|
|
IncludeCollection *PathResult
|
2021-06-11 03:22:18 +08:00
|
|
|
|
|
|
|
|
Metric telegraf.Metric
|
|
|
|
|
gjson.Result
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
|
|
|
|
|
// Only valid JSON is supported
|
2021-10-13 05:07:34 +08:00
|
|
|
if !gjson.Valid(string(input)) {
|
|
|
|
|
return nil, fmt.Errorf("invalid JSON provided, unable to parse")
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var metrics []telegraf.Metric
|
|
|
|
|
|
|
|
|
|
for _, c := range p.Configs {
|
2021-10-13 05:07:34 +08:00
|
|
|
// Measurement name can either be hardcoded, or parsed from the JSON using a GJSON path expression
|
2021-06-11 03:22:18 +08:00
|
|
|
p.measurementName = c.MeasurementName
|
|
|
|
|
if c.MeasurementNamePath != "" {
|
2021-10-13 05:07:34 +08:00
|
|
|
result := gjson.GetBytes(input, c.MeasurementNamePath)
|
2021-06-11 03:22:18 +08:00
|
|
|
if !result.IsArray() && !result.IsObject() {
|
|
|
|
|
p.measurementName = result.String()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-13 05:07:34 +08:00
|
|
|
// timestamp defaults to current time, or can be parsed from the JSON using a GJSON path expression
|
|
|
|
|
p.timestamp = time.Now()
|
2021-06-11 03:22:18 +08:00
|
|
|
if c.TimestampPath != "" {
|
2021-10-13 05:07:34 +08:00
|
|
|
result := gjson.GetBytes(input, c.TimestampPath)
|
2021-06-11 03:22:18 +08:00
|
|
|
if !result.IsArray() && !result.IsObject() {
|
|
|
|
|
if c.TimestampFormat == "" {
|
|
|
|
|
err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'")
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var err error
|
2021-10-13 05:07:34 +08:00
|
|
|
p.timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.Value(), c.TimestampTimezone)
|
2021-06-11 03:22:18 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-13 05:07:34 +08:00
|
|
|
fields, err := p.processMetric(input, c.Fields, false)
|
2021-06-11 03:22:18 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-13 05:07:34 +08:00
|
|
|
tags, err := p.processMetric(input, c.Tags, true)
|
2021-06-11 03:22:18 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-13 05:07:34 +08:00
|
|
|
objects, err := p.processObjects(input, c.JSONObjects)
|
2021-06-11 03:22:18 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metrics = append(metrics, cartesianProduct(tags, fields)...)
|
|
|
|
|
|
|
|
|
|
if len(objects) != 0 && len(metrics) != 0 {
|
2021-10-08 04:45:02 +08:00
|
|
|
metrics = cartesianProduct(objects, metrics)
|
2021-06-11 03:22:18 +08:00
|
|
|
} else {
|
|
|
|
|
metrics = append(metrics, objects...)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for k, v := range p.DefaultTags {
|
|
|
|
|
for _, t := range metrics {
|
|
|
|
|
t.AddTag(k, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return metrics, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// processMetric will iterate over all 'field' or 'tag' configs and create metrics for each
|
|
|
|
|
// A field/tag can either be a single value or an array of values, each resulting in its own metric
|
|
|
|
|
// For multiple configs, a set of metrics is created from the cartesian product of each separate config
|
2021-10-13 05:07:34 +08:00
|
|
|
func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegraf.Metric, error) {
|
2021-06-11 03:22:18 +08:00
|
|
|
if len(data) == 0 {
|
|
|
|
|
return nil, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
p.iterateObjects = false
|
|
|
|
|
var metrics [][]telegraf.Metric
|
|
|
|
|
|
|
|
|
|
for _, c := range data {
|
|
|
|
|
if c.Path == "" {
|
|
|
|
|
return nil, fmt.Errorf("GJSON path is required")
|
|
|
|
|
}
|
2021-10-13 05:07:34 +08:00
|
|
|
result := gjson.GetBytes(input, c.Path)
|
2021-06-11 03:22:18 +08:00
|
|
|
|
|
|
|
|
if result.IsObject() {
|
|
|
|
|
p.Log.Debugf("Found object in the path: %s, ignoring it please use 'object' to gather metrics from objects", c.Path)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setName := c.Rename
|
|
|
|
|
// Default to the last path word, should be the upper key name
|
|
|
|
|
if setName == "" {
|
|
|
|
|
s := strings.Split(c.Path, ".")
|
|
|
|
|
setName = s[len(s)-1]
|
|
|
|
|
}
|
|
|
|
|
setName = strings.ReplaceAll(setName, " ", "_")
|
|
|
|
|
|
|
|
|
|
mNode := MetricNode{
|
|
|
|
|
OutputName: setName,
|
|
|
|
|
SetName: setName,
|
|
|
|
|
DesiredType: c.Type,
|
|
|
|
|
Tag: tag,
|
|
|
|
|
Metric: metric.New(
|
|
|
|
|
p.measurementName,
|
|
|
|
|
map[string]string{},
|
|
|
|
|
map[string]interface{}{},
|
2021-10-13 05:07:34 +08:00
|
|
|
p.timestamp,
|
2021-06-11 03:22:18 +08:00
|
|
|
),
|
|
|
|
|
Result: result,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Expand all array's and nested arrays into separate metrics
|
|
|
|
|
nodes, err := p.expandArray(mNode)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2021-07-23 08:09:01 +08:00
|
|
|
metrics = append(metrics, nodes)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i := 1; i < len(metrics); i++ {
|
|
|
|
|
metrics[i] = cartesianProduct(metrics[i-1], metrics[i])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return metrics[len(metrics)-1], nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func cartesianProduct(a, b []telegraf.Metric) []telegraf.Metric {
|
|
|
|
|
if len(a) == 0 {
|
|
|
|
|
return b
|
|
|
|
|
}
|
|
|
|
|
if len(b) == 0 {
|
|
|
|
|
return a
|
|
|
|
|
}
|
|
|
|
|
p := make([]telegraf.Metric, len(a)*len(b))
|
|
|
|
|
i := 0
|
|
|
|
|
for _, a := range a {
|
|
|
|
|
for _, b := range b {
|
|
|
|
|
m := a.Copy()
|
|
|
|
|
mergeMetric(b, m)
|
|
|
|
|
p[i] = m
|
|
|
|
|
i++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return p
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func mergeMetric(a telegraf.Metric, m telegraf.Metric) {
|
|
|
|
|
for _, f := range a.FieldList() {
|
|
|
|
|
m.AddField(f.Key, f.Value)
|
|
|
|
|
}
|
|
|
|
|
for _, t := range a.TagList() {
|
|
|
|
|
m.AddTag(t.Key, t.Value)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// expandArray will recursively create a new MetricNode for each element in a JSON array or single value
|
2021-07-23 08:09:01 +08:00
|
|
|
func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
|
|
|
|
|
var results []telegraf.Metric
|
2021-06-11 03:22:18 +08:00
|
|
|
|
|
|
|
|
if result.IsObject() {
|
|
|
|
|
if !p.iterateObjects {
|
|
|
|
|
p.Log.Debugf("Found object in query ignoring it please use 'object' to gather metrics from objects")
|
|
|
|
|
return results, nil
|
|
|
|
|
}
|
2021-10-13 05:07:34 +08:00
|
|
|
if result.IncludeCollection == nil && (len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0) {
|
2021-10-05 02:19:06 +08:00
|
|
|
result.IncludeCollection = p.existsInpathResults(result.Index, result.Raw)
|
|
|
|
|
}
|
2021-06-11 03:22:18 +08:00
|
|
|
r, err := p.combineObject(result)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
results = append(results, r...)
|
|
|
|
|
return results, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if result.IsArray() {
|
|
|
|
|
var err error
|
2021-10-13 05:07:34 +08:00
|
|
|
if result.IncludeCollection == nil && (len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0) {
|
2021-10-05 02:19:06 +08:00
|
|
|
result.IncludeCollection = p.existsInpathResults(result.Index, result.Raw)
|
|
|
|
|
}
|
2021-06-11 03:22:18 +08:00
|
|
|
result.ForEach(func(_, val gjson.Result) bool {
|
|
|
|
|
m := metric.New(
|
|
|
|
|
p.measurementName,
|
|
|
|
|
map[string]string{},
|
|
|
|
|
map[string]interface{}{},
|
2021-10-13 05:07:34 +08:00
|
|
|
p.timestamp,
|
2021-06-11 03:22:18 +08:00
|
|
|
)
|
|
|
|
|
if val.IsObject() {
|
|
|
|
|
if p.iterateObjects {
|
2021-10-05 02:19:06 +08:00
|
|
|
n := result
|
|
|
|
|
n.ParentIndex += val.Index
|
|
|
|
|
n.Metric = m
|
|
|
|
|
n.Result = val
|
2021-10-13 05:07:34 +08:00
|
|
|
if n.IncludeCollection == nil && (len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0) {
|
2021-10-05 02:19:06 +08:00
|
|
|
n.IncludeCollection = p.existsInpathResults(n.Index, n.Raw)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
2021-07-23 08:09:01 +08:00
|
|
|
r, err := p.combineObject(n)
|
2021-06-11 03:22:18 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
results = append(results, r...)
|
|
|
|
|
} else {
|
|
|
|
|
p.Log.Debugf("Found object in query ignoring it please use 'object' to gather metrics from objects")
|
|
|
|
|
}
|
|
|
|
|
if len(results) != 0 {
|
|
|
|
|
for _, newResult := range results {
|
2021-07-23 08:09:01 +08:00
|
|
|
mergeMetric(result.Metric, newResult)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, f := range result.Metric.FieldList() {
|
|
|
|
|
m.AddField(f.Key, f.Value)
|
|
|
|
|
}
|
|
|
|
|
for _, f := range result.Metric.TagList() {
|
|
|
|
|
m.AddTag(f.Key, f.Value)
|
|
|
|
|
}
|
2021-10-05 02:19:06 +08:00
|
|
|
n := result
|
|
|
|
|
n.ParentIndex += val.Index
|
|
|
|
|
n.Metric = m
|
|
|
|
|
n.Result = val
|
2021-10-13 05:07:34 +08:00
|
|
|
if n.IncludeCollection == nil && (len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0) {
|
2021-10-05 02:19:06 +08:00
|
|
|
n.IncludeCollection = p.existsInpathResults(n.Index, n.Raw)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
2021-07-23 08:09:01 +08:00
|
|
|
r, err := p.expandArray(n)
|
2021-06-11 03:22:18 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
results = append(results, r...)
|
|
|
|
|
return true
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
} else {
|
2021-10-13 05:07:34 +08:00
|
|
|
if result.SetName == p.objectConfig.TimestampKey {
|
|
|
|
|
if p.objectConfig.TimestampFormat == "" {
|
2021-06-16 06:50:20 +08:00
|
|
|
err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'")
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2021-10-13 05:07:34 +08:00
|
|
|
timestamp, err := internal.ParseTimestamp(p.objectConfig.TimestampFormat, result.Value(), p.objectConfig.TimestampTimezone)
|
2021-06-16 06:50:20 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
result.Metric.SetTime(timestamp)
|
|
|
|
|
} else {
|
|
|
|
|
switch result.Value().(type) {
|
|
|
|
|
case nil: // Ignore JSON values that are set as null
|
|
|
|
|
default:
|
2021-10-05 02:19:06 +08:00
|
|
|
outputName := result.OutputName
|
|
|
|
|
desiredType := result.DesiredType
|
|
|
|
|
|
2021-10-13 05:07:34 +08:00
|
|
|
if len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0 {
|
2021-10-05 02:19:06 +08:00
|
|
|
var pathResult *PathResult
|
|
|
|
|
// When IncludeCollection isn't nil, that means the current result is included in the collection.
|
|
|
|
|
if result.IncludeCollection != nil {
|
|
|
|
|
pathResult = result.IncludeCollection
|
|
|
|
|
} else {
|
|
|
|
|
// Verify that the result should be included based on the results of fieldpaths and tag paths
|
|
|
|
|
pathResult = p.existsInpathResults(result.ParentIndex, result.Raw)
|
|
|
|
|
}
|
|
|
|
|
if pathResult == nil {
|
|
|
|
|
return results, nil
|
|
|
|
|
}
|
|
|
|
|
if pathResult.tag {
|
|
|
|
|
result.Tag = true
|
|
|
|
|
}
|
|
|
|
|
if !pathResult.tag {
|
|
|
|
|
desiredType = pathResult.Type
|
|
|
|
|
}
|
|
|
|
|
if pathResult.Rename != "" {
|
|
|
|
|
outputName = pathResult.Rename
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-06-16 06:50:20 +08:00
|
|
|
if result.Tag {
|
2021-10-05 02:19:06 +08:00
|
|
|
desiredType = "string"
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
2021-10-05 02:19:06 +08:00
|
|
|
v, err := p.convertType(result.Result, desiredType, result.SetName)
|
2021-06-11 03:22:18 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2021-06-16 06:50:20 +08:00
|
|
|
if result.Tag {
|
2021-10-05 02:19:06 +08:00
|
|
|
result.Metric.AddTag(outputName, v.(string))
|
2021-06-16 06:50:20 +08:00
|
|
|
} else {
|
2021-10-05 02:19:06 +08:00
|
|
|
result.Metric.AddField(outputName, v)
|
2021-06-16 06:50:20 +08:00
|
|
|
}
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
}
|
2021-06-16 06:50:20 +08:00
|
|
|
|
2021-07-23 08:09:01 +08:00
|
|
|
results = append(results, result.Metric)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return results, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-05 02:19:06 +08:00
|
|
|
func (p *Parser) existsInpathResults(index int, raw string) *PathResult {
|
2021-10-13 05:07:34 +08:00
|
|
|
for _, f := range p.subPathResults {
|
2021-10-05 02:19:06 +08:00
|
|
|
if f.result.Index == 0 {
|
|
|
|
|
for _, i := range f.result.Indexes {
|
|
|
|
|
if i == index {
|
|
|
|
|
return &f
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if f.result.Index == index {
|
|
|
|
|
return &f
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-06-11 03:22:18 +08:00
|
|
|
// processObjects will iterate over all 'object' configs and create metrics for each
|
2021-10-13 05:07:34 +08:00
|
|
|
func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.Metric, error) {
|
2021-06-11 03:22:18 +08:00
|
|
|
p.iterateObjects = true
|
|
|
|
|
var t []telegraf.Metric
|
|
|
|
|
for _, c := range objects {
|
2021-10-13 05:07:34 +08:00
|
|
|
p.objectConfig = c
|
2021-10-05 02:19:06 +08:00
|
|
|
|
2021-06-11 03:22:18 +08:00
|
|
|
if c.Path == "" {
|
|
|
|
|
return nil, fmt.Errorf("GJSON path is required")
|
|
|
|
|
}
|
2021-10-13 05:07:34 +08:00
|
|
|
result := gjson.GetBytes(input, c.Path)
|
2021-10-05 02:19:06 +08:00
|
|
|
|
|
|
|
|
scopedJSON := []byte(result.Raw)
|
|
|
|
|
for _, f := range c.FieldPaths {
|
|
|
|
|
var r PathResult
|
|
|
|
|
r.result = gjson.GetBytes(scopedJSON, f.Path)
|
|
|
|
|
r.DataSet = f
|
2021-10-13 05:07:34 +08:00
|
|
|
p.subPathResults = append(p.subPathResults, r)
|
2021-10-05 02:19:06 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, f := range c.TagPaths {
|
|
|
|
|
var r PathResult
|
|
|
|
|
r.result = gjson.GetBytes(scopedJSON, f.Path)
|
|
|
|
|
r.DataSet = f
|
|
|
|
|
r.tag = true
|
2021-10-13 05:07:34 +08:00
|
|
|
p.subPathResults = append(p.subPathResults, r)
|
2021-10-05 02:19:06 +08:00
|
|
|
}
|
2021-06-11 03:22:18 +08:00
|
|
|
|
|
|
|
|
if result.Type == gjson.Null {
|
|
|
|
|
return nil, fmt.Errorf("GJSON Path returned null")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rootObject := MetricNode{
|
2021-10-05 02:19:06 +08:00
|
|
|
ParentIndex: 0,
|
2021-06-11 03:22:18 +08:00
|
|
|
Metric: metric.New(
|
|
|
|
|
p.measurementName,
|
|
|
|
|
map[string]string{},
|
|
|
|
|
map[string]interface{}{},
|
2021-10-13 05:07:34 +08:00
|
|
|
p.timestamp,
|
2021-06-11 03:22:18 +08:00
|
|
|
),
|
|
|
|
|
Result: result,
|
|
|
|
|
}
|
|
|
|
|
metrics, err := p.expandArray(rootObject)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2021-07-23 08:09:01 +08:00
|
|
|
t = append(t, metrics...)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return t, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// combineObject will add all fields/tags to a single metric
|
|
|
|
|
// If the object has multiple array's as elements it won't comine those, they will remain separate metrics
|
2021-07-23 08:09:01 +08:00
|
|
|
func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) {
|
|
|
|
|
var results []telegraf.Metric
|
2021-06-11 03:22:18 +08:00
|
|
|
if result.IsArray() || result.IsObject() {
|
|
|
|
|
var err error
|
|
|
|
|
result.ForEach(func(key, val gjson.Result) bool {
|
|
|
|
|
// Determine if field/tag set name is configured
|
|
|
|
|
var setName string
|
|
|
|
|
if result.SetName != "" {
|
|
|
|
|
setName = result.SetName + "_" + strings.ReplaceAll(key.String(), " ", "_")
|
|
|
|
|
} else {
|
|
|
|
|
setName = strings.ReplaceAll(key.String(), " ", "_")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if p.isExcluded(setName) || !p.isIncluded(setName, val) {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var outputName string
|
2021-10-13 05:07:34 +08:00
|
|
|
if p.objectConfig.DisablePrependKeys {
|
2021-06-11 03:22:18 +08:00
|
|
|
outputName = strings.ReplaceAll(key.String(), " ", "_")
|
|
|
|
|
} else {
|
|
|
|
|
outputName = setName
|
|
|
|
|
}
|
2021-10-13 05:07:34 +08:00
|
|
|
for k, n := range p.objectConfig.Renames {
|
2021-06-11 03:22:18 +08:00
|
|
|
if k == setName {
|
|
|
|
|
outputName = n
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-05 02:19:06 +08:00
|
|
|
arrayNode := result
|
|
|
|
|
arrayNode.ParentIndex += val.Index
|
|
|
|
|
arrayNode.OutputName = outputName
|
|
|
|
|
arrayNode.SetName = setName
|
|
|
|
|
arrayNode.Result = val
|
2021-06-11 03:22:18 +08:00
|
|
|
|
2021-10-13 05:07:34 +08:00
|
|
|
for k, t := range p.objectConfig.Fields {
|
2021-06-11 03:22:18 +08:00
|
|
|
if setName == k {
|
|
|
|
|
arrayNode.DesiredType = t
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tag := false
|
2021-10-13 05:07:34 +08:00
|
|
|
for _, t := range p.objectConfig.Tags {
|
2021-06-11 03:22:18 +08:00
|
|
|
if setName == t {
|
|
|
|
|
tag = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
arrayNode.Tag = tag
|
2021-07-23 08:09:01 +08:00
|
|
|
|
2021-06-11 03:22:18 +08:00
|
|
|
if val.IsObject() {
|
2021-07-23 08:09:01 +08:00
|
|
|
results, err = p.combineObject(arrayNode)
|
2021-06-11 03:22:18 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
} else {
|
2021-07-23 08:09:01 +08:00
|
|
|
r, err := p.expandArray(arrayNode)
|
2021-06-11 03:22:18 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
2021-07-23 08:09:01 +08:00
|
|
|
results = cartesianProduct(results, r)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return results, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) isIncluded(key string, val gjson.Result) bool {
|
2021-10-13 05:07:34 +08:00
|
|
|
if len(p.objectConfig.IncludedKeys) == 0 {
|
2021-06-11 03:22:18 +08:00
|
|
|
return true
|
|
|
|
|
}
|
2021-06-23 00:48:29 +08:00
|
|
|
// automatically adds tags to included_keys so it does NOT have to be repeated in the config
|
2021-10-13 05:07:34 +08:00
|
|
|
allKeys := append(p.objectConfig.IncludedKeys, p.objectConfig.Tags...)
|
2021-10-05 02:19:06 +08:00
|
|
|
for _, i := range allKeys {
|
2021-06-11 03:22:18 +08:00
|
|
|
if i == key {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
if val.IsArray() || val.IsObject() {
|
|
|
|
|
// Check if the included key is a sub element
|
|
|
|
|
if strings.HasPrefix(i, key) {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) isExcluded(key string) bool {
|
2021-10-13 05:07:34 +08:00
|
|
|
for _, i := range p.objectConfig.ExcludedKeys {
|
2021-06-11 03:22:18 +08:00
|
|
|
if i == key {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
|
|
|
|
return nil, fmt.Errorf("ParseLine is designed for parsing influx line protocol, therefore not implemented for parsing JSON")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) SetDefaultTags(tags map[string]string) {
|
|
|
|
|
p.DefaultTags = tags
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// convertType will convert the value parsed from the input JSON to the specified type in the config
|
2021-07-20 11:23:12 +08:00
|
|
|
func (p *Parser) convertType(input gjson.Result, desiredType string, name string) (interface{}, error) {
|
|
|
|
|
switch inputType := input.Value().(type) {
|
2021-06-11 03:22:18 +08:00
|
|
|
case string:
|
|
|
|
|
if desiredType != "string" {
|
|
|
|
|
switch desiredType {
|
|
|
|
|
case "uint":
|
|
|
|
|
r, err := strconv.ParseUint(inputType, 10, 64)
|
|
|
|
|
if err != nil {
|
2021-10-13 05:07:34 +08:00
|
|
|
return nil, fmt.Errorf("unable to convert field '%s' to type uint: %v", name, err)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
return r, nil
|
|
|
|
|
case "int":
|
2021-07-20 11:23:12 +08:00
|
|
|
r, err := strconv.ParseInt(inputType, 10, 64)
|
2021-06-11 03:22:18 +08:00
|
|
|
if err != nil {
|
2021-10-13 05:07:34 +08:00
|
|
|
return nil, fmt.Errorf("unable to convert field '%s' to type int: %v", name, err)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
return r, nil
|
|
|
|
|
case "float":
|
|
|
|
|
r, err := strconv.ParseFloat(inputType, 64)
|
|
|
|
|
if err != nil {
|
2021-10-13 05:07:34 +08:00
|
|
|
return nil, fmt.Errorf("unable to convert field '%s' to type float: %v", name, err)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
return r, nil
|
|
|
|
|
case "bool":
|
|
|
|
|
r, err := strconv.ParseBool(inputType)
|
|
|
|
|
if err != nil {
|
2021-10-13 05:07:34 +08:00
|
|
|
return nil, fmt.Errorf("unable to convert field '%s' to type bool: %v", name, err)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
return r, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
case bool:
|
|
|
|
|
switch desiredType {
|
|
|
|
|
case "string":
|
|
|
|
|
return strconv.FormatBool(inputType), nil
|
|
|
|
|
case "int":
|
|
|
|
|
if inputType {
|
|
|
|
|
return int64(1), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return int64(0), nil
|
|
|
|
|
case "uint":
|
|
|
|
|
if inputType {
|
|
|
|
|
return uint64(1), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return uint64(0), nil
|
|
|
|
|
}
|
|
|
|
|
case float64:
|
|
|
|
|
if desiredType != "float" {
|
|
|
|
|
switch desiredType {
|
|
|
|
|
case "string":
|
|
|
|
|
return fmt.Sprint(inputType), nil
|
|
|
|
|
case "int":
|
2021-07-20 11:23:12 +08:00
|
|
|
return input.Int(), nil
|
2021-06-11 03:22:18 +08:00
|
|
|
case "uint":
|
2021-07-20 11:23:12 +08:00
|
|
|
return input.Uint(), nil
|
2021-06-11 03:22:18 +08:00
|
|
|
case "bool":
|
|
|
|
|
if inputType == 0 {
|
|
|
|
|
return false, nil
|
|
|
|
|
} else if inputType == 1 {
|
|
|
|
|
return true, nil
|
|
|
|
|
} else {
|
2021-10-13 05:07:34 +08:00
|
|
|
return nil, fmt.Errorf("unable to convert field '%s' to type bool", name)
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
return nil, fmt.Errorf("unknown format '%T' for field '%s'", inputType, name)
|
|
|
|
|
}
|
|
|
|
|
|
2021-07-20 11:23:12 +08:00
|
|
|
return input.Value(), nil
|
2021-06-11 03:22:18 +08:00
|
|
|
}
|