diff --git a/plugins/serializers/json/json.go b/plugins/serializers/json/json.go index 2a2bfff50..dc783d66e 100644 --- a/plugins/serializers/json/json.go +++ b/plugins/serializers/json/json.go @@ -25,7 +25,7 @@ type Serializer struct { TimestampUnits time.Duration TimestampFormat string - transformation *jsonata.Expr + transformation string nestedfields filter.Filter } @@ -33,14 +33,7 @@ func NewSerializer(cfg FormatConfig) (*Serializer, error) { s := &Serializer{ TimestampUnits: truncateDuration(cfg.TimestampUnits), TimestampFormat: cfg.TimestampFormat, - } - - if cfg.Transformation != "" { - e, err := jsonata.Compile(cfg.Transformation) - if err != nil { - return nil, err - } - s.transformation = e + transformation: cfg.Transformation, } if len(cfg.NestedFieldsInclude) > 0 || len(cfg.NestedFieldsExclude) > 0 { @@ -58,7 +51,7 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { var obj interface{} obj = s.createObject(metric) - if s.transformation != nil { + if s.transformation != "" { var err error if obj, err = s.transform(obj); err != nil { if errors.Is(err, jsonata.ErrUndefined) { @@ -89,7 +82,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { "metrics": objects, } - if s.transformation != nil { + if s.transformation != "" { var err error if obj, err = s.transform(obj); err != nil { if errors.Is(err, jsonata.ErrUndefined) { @@ -150,7 +143,12 @@ func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{} } func (s *Serializer) transform(obj interface{}) (interface{}, error) { - return s.transformation.Eval(obj) + transformation, err := jsonata.Compile(s.transformation) + if err != nil { + return nil, err + } + + return transformation.Eval(obj) } func truncateDuration(units time.Duration) time.Duration { diff --git a/plugins/serializers/json/json_test.go b/plugins/serializers/json/json_test.go index a2d3c40dd..4526db272 100644 --- a/plugins/serializers/json/json_test.go +++ b/plugins/serializers/json/json_test.go @@ -335,6 +335,67 @@ func TestSerializeTransformationBatch(t *testing.T) { } } +func TestSerializeTransformationIssue12734(t *testing.T) { + input := []telegraf.Metric{ + metric.New( + "data", + map[string]string{"key": "a"}, + map[string]interface{}{"value": 10.1}, + time.Unix(0, 1676285135457000000), + ), + metric.New( + "data", + map[string]string{"key": "b"}, + map[string]interface{}{"value": 20.2}, + time.Unix(0, 1676285135457000000), + ), + metric.New( + "data", + map[string]string{"key": "c"}, + map[string]interface{}{"value": 30.3}, + time.Unix(0, 1676285135457000000), + ), + } + + transformation := ` + { + "valueRows": metrics{$string(timestamp): fields.value[]} ~> $each(function($v, $k) { + { + "timestamp": $number($k), + "values": $v + } + }) + } + ` + + expected := map[string]interface{}{ + "valueRows": map[string]interface{}{ + "timestamp": 1.676285135e+9, + "values": []interface{}{10.1, 20.2, 30.3}, + }, + } + + // Setup serializer + serializer, err := NewSerializer( + FormatConfig{ + Transformation: transformation, + }, + ) + require.NoError(t, err) + + // Check multiple serializations as issue #12734 shows that the + // transformation breaks after the first iteration + for i := 1; i <= 3; i++ { + buf, err := serializer.SerializeBatch(input) + require.NoErrorf(t, err, "broke in iteration %d", i) + + // Compare + var actual interface{} + require.NoError(t, json.Unmarshal(buf, &actual)) + require.EqualValuesf(t, expected, actual, "broke in iteration %d", i) + } +} + func TestSerializeNesting(t *testing.T) { var tests = []struct { name string