fix(serializers.json): allow stateful transformations (#12735)
This commit is contained in:
parent
e51b3810ab
commit
a90b6eb119
|
|
@ -25,7 +25,7 @@ type Serializer struct {
|
||||||
TimestampUnits time.Duration
|
TimestampUnits time.Duration
|
||||||
TimestampFormat string
|
TimestampFormat string
|
||||||
|
|
||||||
transformation *jsonata.Expr
|
transformation string
|
||||||
nestedfields filter.Filter
|
nestedfields filter.Filter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -33,14 +33,7 @@ func NewSerializer(cfg FormatConfig) (*Serializer, error) {
|
||||||
s := &Serializer{
|
s := &Serializer{
|
||||||
TimestampUnits: truncateDuration(cfg.TimestampUnits),
|
TimestampUnits: truncateDuration(cfg.TimestampUnits),
|
||||||
TimestampFormat: cfg.TimestampFormat,
|
TimestampFormat: cfg.TimestampFormat,
|
||||||
}
|
transformation: cfg.Transformation,
|
||||||
|
|
||||||
if cfg.Transformation != "" {
|
|
||||||
e, err := jsonata.Compile(cfg.Transformation)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
s.transformation = e
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(cfg.NestedFieldsInclude) > 0 || len(cfg.NestedFieldsExclude) > 0 {
|
if len(cfg.NestedFieldsInclude) > 0 || len(cfg.NestedFieldsExclude) > 0 {
|
||||||
|
|
@ -58,7 +51,7 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
var obj interface{}
|
var obj interface{}
|
||||||
obj = s.createObject(metric)
|
obj = s.createObject(metric)
|
||||||
|
|
||||||
if s.transformation != nil {
|
if s.transformation != "" {
|
||||||
var err error
|
var err error
|
||||||
if obj, err = s.transform(obj); err != nil {
|
if obj, err = s.transform(obj); err != nil {
|
||||||
if errors.Is(err, jsonata.ErrUndefined) {
|
if errors.Is(err, jsonata.ErrUndefined) {
|
||||||
|
|
@ -89,7 +82,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
"metrics": objects,
|
"metrics": objects,
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.transformation != nil {
|
if s.transformation != "" {
|
||||||
var err error
|
var err error
|
||||||
if obj, err = s.transform(obj); err != nil {
|
if obj, err = s.transform(obj); err != nil {
|
||||||
if errors.Is(err, jsonata.ErrUndefined) {
|
if errors.Is(err, jsonata.ErrUndefined) {
|
||||||
|
|
@ -150,7 +143,12 @@ func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) transform(obj interface{}) (interface{}, error) {
|
func (s *Serializer) transform(obj interface{}) (interface{}, error) {
|
||||||
return s.transformation.Eval(obj)
|
transformation, err := jsonata.Compile(s.transformation)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return transformation.Eval(obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
func truncateDuration(units time.Duration) time.Duration {
|
func truncateDuration(units time.Duration) time.Duration {
|
||||||
|
|
|
||||||
|
|
@ -335,6 +335,67 @@ func TestSerializeTransformationBatch(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeTransformationIssue12734(t *testing.T) {
|
||||||
|
input := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"data",
|
||||||
|
map[string]string{"key": "a"},
|
||||||
|
map[string]interface{}{"value": 10.1},
|
||||||
|
time.Unix(0, 1676285135457000000),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"data",
|
||||||
|
map[string]string{"key": "b"},
|
||||||
|
map[string]interface{}{"value": 20.2},
|
||||||
|
time.Unix(0, 1676285135457000000),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"data",
|
||||||
|
map[string]string{"key": "c"},
|
||||||
|
map[string]interface{}{"value": 30.3},
|
||||||
|
time.Unix(0, 1676285135457000000),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
transformation := `
|
||||||
|
{
|
||||||
|
"valueRows": metrics{$string(timestamp): fields.value[]} ~> $each(function($v, $k) {
|
||||||
|
{
|
||||||
|
"timestamp": $number($k),
|
||||||
|
"values": $v
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
expected := map[string]interface{}{
|
||||||
|
"valueRows": map[string]interface{}{
|
||||||
|
"timestamp": 1.676285135e+9,
|
||||||
|
"values": []interface{}{10.1, 20.2, 30.3},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup serializer
|
||||||
|
serializer, err := NewSerializer(
|
||||||
|
FormatConfig{
|
||||||
|
Transformation: transformation,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Check multiple serializations as issue #12734 shows that the
|
||||||
|
// transformation breaks after the first iteration
|
||||||
|
for i := 1; i <= 3; i++ {
|
||||||
|
buf, err := serializer.SerializeBatch(input)
|
||||||
|
require.NoErrorf(t, err, "broke in iteration %d", i)
|
||||||
|
|
||||||
|
// Compare
|
||||||
|
var actual interface{}
|
||||||
|
require.NoError(t, json.Unmarshal(buf, &actual))
|
||||||
|
require.EqualValuesf(t, expected, actual, "broke in iteration %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSerializeNesting(t *testing.T) {
|
func TestSerializeNesting(t *testing.T) {
|
||||||
var tests = []struct {
|
var tests = []struct {
|
||||||
name string
|
name string
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue