feat(serializers.json): Support serializing JSON nested in string fields (#12260)

This commit is contained in:
Sven Rebhan 2022-11-18 11:53:13 +01:00 committed by GitHub
parent bc56233e1b
commit 0f8dff9b8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 200 additions and 33 deletions

View File

@ -1196,6 +1196,8 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)
c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits)
c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat)
c.getFieldString(tbl, "json_transformation", &sc.Transformation)
c.getFieldStringSlice(tbl, "json_nested_fields_include", &sc.JSONNestedFieldInclude)
c.getFieldStringSlice(tbl, "json_nested_fields_exclude", &sc.JSONNestedFieldExclude)
c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting)
c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric)
@ -1276,6 +1278,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"graphite_tag_sanitize_mode", "graphite_tag_support", "graphite_separator",
"influx_max_line_bytes", "influx_sort_fields", "influx_uint_support",
"json_timestamp_format", "json_timestamp_units", "json_transformation",
"json_nested_fields_include", "json_nested_fields_exclude",
"prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
"prometheus_compact_encoding",
"splunkmetric_hec_routing", "splunkmetric_multimetric", "splunkmetric_omit_event_tag",

View File

@ -323,7 +323,10 @@ func (adx *AzureDataExplorer) Init() error {
return fmt.Errorf("unknown ingestion type %q", adx.IngestionType)
}
serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano, "")
serializer, err := json.NewSerializer(json.FormatConfig{
TimestampUnits: time.Nanosecond,
TimestampFormat: time.RFC3339Nano,
})
if err != nil {
return err
}

View File

@ -140,7 +140,10 @@ func TestWrite(t *testing.T) {
for _, tC := range testCases {
t.Run(tC.name, func(t *testing.T) {
serializer, err := telegrafJson.NewSerializer(time.Second, "", "")
serializer, err := telegrafJson.NewSerializer(
telegrafJson.FormatConfig{
TimestampUnits: time.Second,
})
require.NoError(t, err)
plugin := AzureDataExplorer{
@ -264,7 +267,7 @@ func TestWriteWithType(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
serializer, err := telegrafJson.NewSerializer(time.Second, "", "")
serializer, err := telegrafJson.NewSerializer(telegrafJson.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
for tableName, jsonValue := range testCase.tableNameToExpectedResult {
ingestionType := "queued"

View File

@ -42,7 +42,7 @@ func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIt
/* End wrapper interface */
func TestInitAndWrite(t *testing.T) {
serializer, err := json.NewSerializer(time.Second, "", "")
serializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
mockHub := &mockEventHub{}
e := &EventHubs{
@ -101,7 +101,7 @@ func TestInitAndWriteIntegration(t *testing.T) {
testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name
// Configure the plugin to target the newly created hub
serializer, err := json.NewSerializer(time.Second, "", "")
serializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
e := &EventHubs{
Hub: &eventHub{},

View File

@ -661,7 +661,7 @@ func TestBatchedUnbatched(t *testing.T) {
Method: defaultMethod,
}
jsonSerializer, err := json.NewSerializer(time.Second, "", "")
jsonSerializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
s := map[string]serializers.Serializer{
"influx": influx.NewSerializer(),

View File

@ -28,7 +28,11 @@ func TestConnectAndWrite(t *testing.T) {
require.NoError(t, err, "failed to start container")
defer container.Terminate()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewJSONSerializer(10*time.Second, "yyy-dd-mmThh:mm:ss", "")
s, err := serializers.NewJSONSerializer(
&serializers.Config{
TimestampUnits: 10 * time.Second,
TimestampFormat: "yyy-dd-mmThh:mm:ss",
})
require.NoError(t, err)
st := &STOMP{
Host: url,

View File

@ -33,6 +33,13 @@ The `json` output data format converts metrics into JSON documents.
## This allows to generate an arbitrary output form based on the metric(s). Please use
## multiline strings (starting and ending with three single-quotes) if needed.
#json_transformation = ""
## Filter for fields that contain nested JSON data.
## The serializer will try to decode matching STRING fields containing
## valid JSON. This is done BEFORE any JSON transformation. The filters
## can contain wildcards.
#json_nested_fields_include = []
#json_nested_fields_exclude = []
```
## Examples

View File

@ -10,29 +10,47 @@ import (
jsonata "github.com/blues/jsonata-go"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
)
type FormatConfig struct {
TimestampUnits time.Duration
TimestampFormat string
Transformation string
NestedFieldsInclude []string
NestedFieldsExclude []string
}
type Serializer struct {
TimestampUnits time.Duration
TimestampFormat string
transformation *jsonata.Expr
nestedfields filter.Filter
}
func NewSerializer(timestampUnits time.Duration, timestampFormat, transform string) (*Serializer, error) {
func NewSerializer(cfg FormatConfig) (*Serializer, error) {
s := &Serializer{
TimestampUnits: truncateDuration(timestampUnits),
TimestampFormat: timestampFormat,
TimestampUnits: truncateDuration(cfg.TimestampUnits),
TimestampFormat: cfg.TimestampFormat,
}
if transform != "" {
e, err := jsonata.Compile(transform)
if cfg.Transformation != "" {
e, err := jsonata.Compile(cfg.Transformation)
if err != nil {
return nil, err
}
s.transformation = e
}
if len(cfg.NestedFieldsInclude) > 0 || len(cfg.NestedFieldsExclude) > 0 {
f, err := filter.NewIncludeExcludeFilter(cfg.NestedFieldsInclude, cfg.NestedFieldsExclude)
if err != nil {
return nil, err
}
s.nestedfields = f
}
return s, nil
}
@ -99,13 +117,26 @@ func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{}
fields := make(map[string]interface{}, len(metric.FieldList()))
for _, field := range metric.FieldList() {
if fv, ok := field.Value.(float64); ok {
val := field.Value
switch fv := field.Value.(type) {
case float64:
// JSON does not support these special values
if math.IsNaN(fv) || math.IsInf(fv, 0) {
continue
}
case string:
// Check for nested fields if any
if s.nestedfields != nil && s.nestedfields.Match(field.Key) {
bv := []byte(fv)
if json.Valid(bv) {
var nested interface{}
if err := json.Unmarshal(bv, &nested); err == nil {
val = nested
}
}
}
}
fields[field.Key] = field.Value
fields[field.Key] = val
}
m["fields"] = fields

View File

@ -29,7 +29,7 @@ func TestSerializeMetricFloat(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -90,7 +90,10 @@ func TestSerialize_TimestampUnits(t *testing.T) {
},
time.Unix(1525478795, 123456789),
)
s, err := NewSerializer(tt.timestampUnits, tt.timestampFormat, "")
s, err := NewSerializer(FormatConfig{
TimestampUnits: tt.timestampUnits,
TimestampFormat: tt.timestampFormat,
})
require.NoError(t, err)
actual, err := s.Serialize(m)
require.NoError(t, err)
@ -109,7 +112,7 @@ func TestSerializeMetricInt(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -128,7 +131,7 @@ func TestSerializeMetricString(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -148,7 +151,7 @@ func TestSerializeMultiFields(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -167,7 +170,7 @@ func TestSerializeMetricWithEscapes(t *testing.T) {
}
m := metric.New("My CPU", tags, fields, now)
s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -187,7 +190,7 @@ func TestSerializeBatch(t *testing.T) {
)
metrics := []telegraf.Metric{m, m}
s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
@ -211,7 +214,7 @@ func TestSerializeBatchSkipInf(t *testing.T) {
),
}
s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
@ -230,7 +233,7 @@ func TestSerializeBatchSkipInfAllFields(t *testing.T) {
),
}
s, err := NewSerializer(0, "", "")
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
@ -266,7 +269,12 @@ func TestSerializeTransformationNonBatch(t *testing.T) {
expected := expectedArray.([]interface{})
// Serialize
serializer, err := NewSerializer(cfg.TimestampUnits, cfg.TimestampFormat, cfg.Transformation)
serializer, err := NewSerializer(
FormatConfig{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
})
require.NoError(t, err)
for i, m := range metrics {
buf, err := serializer.Serialize(m)
@ -275,8 +283,6 @@ func TestSerializeTransformationNonBatch(t *testing.T) {
// Compare
var actual interface{}
require.NoError(t, json.Unmarshal(buf, &actual))
fmt.Printf("actual: %v\n", actual)
fmt.Printf("expected: %v\n", expected[i])
require.EqualValuesf(t, expected[i], actual, "mismatch in %d", i)
}
})
@ -311,7 +317,12 @@ func TestSerializeTransformationBatch(t *testing.T) {
require.NoError(t, err)
// Serialize
serializer, err := NewSerializer(cfg.TimestampUnits, cfg.TimestampFormat, cfg.Transformation)
serializer, err := NewSerializer(
FormatConfig{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
})
require.NoError(t, err)
buf, err := serializer.SerializeBatch(metrics)
require.NoError(t, err)
@ -324,10 +335,70 @@ func TestSerializeTransformationBatch(t *testing.T) {
}
}
func TestSerializeNesting(t *testing.T) {
var tests = []struct {
name string
filename string
out string
}{
{
name: "nested fields include",
filename: "testcases/nested_fields_include.conf",
out: "testcases/nested_fields_out.json",
},
{
name: "nested fields exclude",
filename: "testcases/nested_fields_exclude.conf",
out: "testcases/nested_fields_out.json",
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filename := filepath.FromSlash(tt.filename)
cfg, header, err := loadTestConfiguration(filename)
require.NoError(t, err)
// Get the input metrics
metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser)
require.NoError(t, err)
require.Len(t, metrics, 1)
// Get the expectations
expectedArray, err := loadJSON(tt.out)
require.NoError(t, err)
expected := expectedArray.(map[string]interface{})
// Serialize
serializer, err := NewSerializer(
FormatConfig{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
NestedFieldsInclude: cfg.JSONNestedFieldsInclude,
NestedFieldsExclude: cfg.JSONNestedFieldsExclude,
})
require.NoError(t, err)
buf, err := serializer.Serialize(metrics[0])
require.NoError(t, err)
// Compare
var actual interface{}
require.NoError(t, json.Unmarshal(buf, &actual))
require.EqualValues(t, expected, actual)
})
}
}
type Config struct {
TimestampUnits time.Duration `toml:"json_timestamp_units"`
TimestampFormat string `toml:"json_timestamp_format"`
Transformation string `toml:"json_transformation"`
TimestampUnits time.Duration `toml:"json_timestamp_units"`
TimestampFormat string `toml:"json_timestamp_format"`
Transformation string `toml:"json_transformation"`
JSONNestedFieldsInclude []string `toml:"json_nested_fields_include"`
JSONNestedFieldsExclude []string `toml:"json_nested_fields_exclude"`
}
func loadTestConfiguration(filename string) (*Config, []string, error) {

View File

@ -0,0 +1,6 @@
# Example for decoding fields that contain nested JSON structures.
#
# Input:
# in,host=myhost,type=diagnostic hops=10,latency=1.23,id-1234="{\"address\": \"AB1A\", \"status\": \"online\"}",id-0000="{\"status\": \"offline\"}",id-5678="{\"address\": \"0000\", \"status\": \"online\"}" 1666006350000000000
json_nested_fields_exclude = ["hops", "latency"]

View File

@ -0,0 +1,6 @@
# Example for decoding fields that contain nested JSON structures.
#
# Input:
# in,host=myhost,type=diagnostic hops=10,latency=1.23,id-1234="{\"address\": \"AB1A\", \"status\": \"online\"}",id-0000="{\"status\": \"offline\"}",id-5678="{\"address\": \"0000\", \"status\": \"online\"}" 1666006350000000000
json_nested_fields_include = ["id-*"]

View File

@ -0,0 +1,23 @@
{
"fields": {
"id-1234": {
"address": "AB1A",
"status": "online"
},
"id-0000": {
"status": "offline"
},
"id-5678": {
"address": "0000",
"status": "online"
},
"hops": 10,
"latency": 1.23
},
"name": "in",
"tags": {
"host": "myhost",
"type": "diagnostic"
},
"timestamp": 1666006350
}

View File

@ -104,6 +104,10 @@ type Config struct {
// Transformation as JSONata expression to use for JSON formatted output
Transformation string `toml:"transformation"`
// Field filter for interpreting data as nested JSON for JSON serializer
JSONNestedFieldInclude []string `toml:"json_nested_fields_include"`
JSONNestedFieldExclude []string `toml:"json_nested_fields_exclude"`
// Include HEC routing fields for splunkmetric output
HecRouting bool `toml:"hec_routing"`
@ -157,7 +161,7 @@ func NewSerializer(config *Config) (Serializer, error) {
config.Templates,
)
case "json":
serializer, err = NewJSONSerializer(config.TimestampUnits, config.TimestampFormat, config.Transformation)
serializer, err = NewJSONSerializer(config)
case "splunkmetric":
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric, config.SplunkmetricOmitEventTag), nil
case "nowmetric":
@ -232,8 +236,14 @@ func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []stri
return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions)
}
func NewJSONSerializer(timestampUnits time.Duration, timestampFormat, transform string) (Serializer, error) {
return json.NewSerializer(timestampUnits, timestampFormat, transform)
func NewJSONSerializer(config *Config) (Serializer, error) {
return json.NewSerializer(json.FormatConfig{
TimestampUnits: config.TimestampUnits,
TimestampFormat: config.TimestampFormat,
Transformation: config.Transformation,
NestedFieldsInclude: config.JSONNestedFieldInclude,
NestedFieldsExclude: config.JSONNestedFieldExclude,
})
}
func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) {