diff --git a/config/config.go b/config/config.go index e17ba081f..7f4fb0d2c 100644 --- a/config/config.go +++ b/config/config.go @@ -1355,6 +1355,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error) c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits) c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat) + c.getFieldString(tbl, "json_transformation", &sc.Transformation) c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting) c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric) diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index b43ff108f..ed533ce6a 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -68,6 +68,7 @@ following works: - github.com/awslabs/kinesis-aggregation/go [Apache License 2.0](https://github.com/awslabs/kinesis-aggregation/blob/master/LICENSE.txt) - github.com/benbjohnson/clock [MIT License](https://github.com/benbjohnson/clock/blob/master/LICENSE) - github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE) +- github.com/blues/jsonata-go [MIT License](https://github.com/blues/jsonata-go/blob/main/LICENSE) - github.com/bmatcuk/doublestar [MIT License](https://github.com/bmatcuk/doublestar/blob/master/LICENSE) - github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE) - github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 4b646c44e..1ca758b01 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.13.6 github.com/aws/smithy-go v1.11.3 github.com/benbjohnson/clock v1.3.0 + github.com/blues/jsonata-go v1.5.4 github.com/bmatcuk/doublestar/v3 v3.0.0 github.com/caio/go-tdigest v3.1.0+incompatible github.com/cisco-ie/nx-telemetry-proto v0.0.0-20220628142927-f4160bcb943c diff --git a/go.sum b/go.sum index cce037676..a3ca8414b 100644 --- a/go.sum +++ b/go.sum @@ -439,6 +439,8 @@ github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqO github.com/bkielbasa/cyclop v1.2.0/go.mod h1:qOI0yy6A7dYC4Zgsa72Ppm9kONl0RoIlPbzot9mhmeI= github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= +github.com/blues/jsonata-go v1.5.4 h1:XCsXaVVMrt4lcpKeJw6mNJHqQpWU751cnHdCFUq3xd8= +github.com/blues/jsonata-go v1.5.4/go.mod h1:uns2jymDrnI7y+UFYCqsRTEiAH22GyHnNXrkupAVFWI= github.com/bmatcuk/doublestar/v3 v3.0.0 h1:TQtVPlDnAYwcrVNB2JiGuMc++H5qzWZd9PhkNo5WyHI= github.com/bmatcuk/doublestar/v3 v3.0.0/go.mod h1:6PcTVMw80pCY1RVuoqu3V++99uQB3vsSYKPTd8AWA0k= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer.go b/plugins/outputs/azure_data_explorer/azure_data_explorer.go index 2e00bbda3..abfe47b21 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer.go @@ -220,7 +220,7 @@ func (adx *AzureDataExplorer) Init() error { return errors.New("Metrics grouping type is not valid") } - serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano) + serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano, "") if err != nil { return err } diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go index ce53acf43..83a09fa4f 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go @@ -134,7 +134,7 @@ func TestWrite(t *testing.T) { for _, tC := range testCases { t.Run(tC.name, func(t *testing.T) { - serializer, err := telegrafJson.NewSerializer(time.Second, "") + serializer, err := telegrafJson.NewSerializer(time.Second, "", "") require.NoError(t, err) plugin := AzureDataExplorer{ diff --git a/plugins/outputs/event_hubs/event_hubs_test.go b/plugins/outputs/event_hubs/event_hubs_test.go index 9b17aef60..788491e61 100644 --- a/plugins/outputs/event_hubs/event_hubs_test.go +++ b/plugins/outputs/event_hubs/event_hubs_test.go @@ -42,7 +42,8 @@ func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIt /* End wrapper interface */ func TestInitAndWrite(t *testing.T) { - serializer, _ := json.NewSerializer(time.Second, "") + serializer, err := json.NewSerializer(time.Second, "", "") + require.NoError(t, err) mockHub := &mockEventHub{} e := &EventHubs{ Hub: mockHub, @@ -52,8 +53,7 @@ func TestInitAndWrite(t *testing.T) { } mockHub.On("GetHub", mock.Anything).Return(nil).Once() - err := e.Init() - require.NoError(t, err) + require.NoError(t, e.Init()) mockHub.AssertExpectations(t) metrics := testutil.MockMetrics() @@ -100,8 +100,8 @@ func TestInitAndWriteIntegration(t *testing.T) { testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name // Configure the plugin to target the newly created hub - serializer, _ := json.NewSerializer(time.Second, "") - + serializer, err := json.NewSerializer(time.Second, "", "") + require.NoError(t, err) e := &EventHubs{ Hub: &eventHub{}, ConnectionString: testHubCS, @@ -110,13 +110,11 @@ func TestInitAndWriteIntegration(t *testing.T) { } // Verify that we can connect to Event Hubs - err = e.Init() - require.NoError(t, err) + require.NoError(t, e.Init()) // Verify that we can successfully write data to Event Hubs metrics := testutil.MockMetrics() - err = e.Write(metrics) - require.NoError(t, err) + require.NoError(t, e.Write(metrics)) /* ** Verify we can read data back from the test hub diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go index be3b78833..fd03efa78 100644 --- a/plugins/outputs/http/http_test.go +++ b/plugins/outputs/http/http_test.go @@ -640,12 +640,11 @@ func TestBatchedUnbatched(t *testing.T) { Method: defaultMethod, } - var s = map[string]serializers.Serializer{ + jsonSerializer, err := json.NewSerializer(time.Second, "", "") + require.NoError(t, err) + s := map[string]serializers.Serializer{ "influx": influx.NewSerializer(), - "json": func(s serializers.Serializer, err error) serializers.Serializer { - require.NoError(t, err) - return s - }(json.NewSerializer(time.Second, "")), + "json": jsonSerializer, } for name, serializer := range s { diff --git a/plugins/outputs/stomp/stomp_test.go b/plugins/outputs/stomp/stomp_test.go index 541adfc61..41c6b3151 100644 --- a/plugins/outputs/stomp/stomp_test.go +++ b/plugins/outputs/stomp/stomp_test.go @@ -31,7 +31,7 @@ func TestConnectAndWrite(t *testing.T) { require.NoError(t, container.Terminate(), "terminating container failed") }() var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) - s, err := serializers.NewJSONSerializer(10*time.Second, "yyy-dd-mmThh:mm:ss") + s, err := serializers.NewJSONSerializer(10*time.Second, "yyy-dd-mmThh:mm:ss", "") require.NoError(t, err) st := &STOMP{ Host: url, diff --git a/plugins/serializers/json/README.md b/plugins/serializers/json/README.md index 2bbe8dad9..67e36df8e 100644 --- a/plugins/serializers/json/README.md +++ b/plugins/serializers/json/README.md @@ -26,6 +26,11 @@ The `json` output data format converts metrics into JSON documents. # layout specification from https://golang.org/pkg/time/#Time.Format # e.g.: json_timestamp_format = "2006-01-02T15:04:05Z07:00" #json_timestamp_format = "" + + ## A [JSONata](https://jsonata.org/) transformation of the JSON in [standard-form](#examples). + ## This allows to generate an arbitrary output form based on the metric(s). Please use + ## multiline strings (starting and ending with three single-quotes) if needed. + #json_transformation = "" ``` ## Examples @@ -84,3 +89,207 @@ reference the documentation for the specific plugin. ] } ``` + +## Transformations + +Transformations using the [JSONata standard](https://jsonata.org/) can be specified with +the `json_tansformation` parameter. The input to the transformation is the serialized +metric in the standard-form above. + +**Note**: There is a difference in batch and non-batch serialization mode! +The former adds a `metrics` field containing the metric array, while the later +serializes the metric directly. + +In the following sections, some rudimentary examples for transformations are shown. +For more elaborated JSONata expressions please consult the +[documentation](https://docs.jsonata.org) or the +[online playground](https://try.jsonata.org). + +### Non-batch mode + +In the following examples, we will use the following input to the transformation: + +```json +{ + "fields": { + "field_1": 30, + "field_2": 4, + "field_N": 59, + "n_images": 660 + }, + "name": "docker", + "tags": { + "host": "raynor" + }, + "timestamp": 1458229140 +} +``` + +If you want to flatten the above metric, you can use + +```json +$merge([{"name": name, "timestamp": timestamp}, tags, fields]) +``` + +to get + +```json +{ + "name": "docker", + "timestamp": 1458229140, + "host": "raynor", + "field_1": 30, + "field_2": 4, + "field_N": 59, + "n_images": 660 +} +``` + +It is also possible to do arithmetics or renaming + +```json +{ + "capacity": $sum($sift($.fields,function($value,$key){$key~>/^field_/}).*), + "images": fields.n_images, + "host": tags.host, + "time": $fromMillis(timestamp*1000) +} +``` + +will result in + +```json +{ + "capacity": 93, + "images": 660, + "host": "raynor", + "time": "2016-03-17T15:39:00.000Z" +} +``` + +### Batch mode + +When an output plugin emits multiple metrics in a batch fashion it might be usefull +to restructure and/or combine the metric elements. We will use the following input +example in this section + +```json +{ + "metrics": [ + { + "fields": { + "field_1": 30, + "field_2": 4, + "field_N": 59, + "n_images": 660 + }, + "name": "docker", + "tags": { + "host": "raynor" + }, + "timestamp": 1458229140 + }, + { + "fields": { + "field_1": 12, + "field_2": 43, + "field_3": 0, + "field_4": 5, + "field_5": 7, + "field_N": 27, + "n_images": 72 + }, + "name": "docker", + "tags": { + "host": "amaranth" + }, + "timestamp": 1458229140 + }, + { + "fields": { + "field_1": 5, + "field_N": 34, + "n_images": 0 + }, + "name": "storage", + "tags": { + "host": "amaranth" + }, + "timestamp": 1458229140 + } + ] +} +``` + +We can do the same computation as above, iterating over the metrics + +```json +metrics.{ + "capacity": $sum($sift($.fields,function($value,$key){$key~>/^field_/}).*), + "images": fields.n_images, + "service": (name & "(" & tags.host & ")"), + "time": $fromMillis(timestamp*1000) +} + +``` + +resulting in + +```json +[ + { + "capacity": 93, + "images": 660, + "service": "docker(raynor)", + "time": "2016-03-17T15:39:00.000Z" + }, + { + "capacity": 94, + "images": 72, + "service": "docker(amaranth)", + "time": "2016-03-17T15:39:00.000Z" + }, + { + "capacity": 39, + "images": 0, + "service": "storage(amaranth)", + "time": "2016-03-17T15:39:00.000Z" + } +] +``` + +However, the more interesting use-case is to restructure and **combine** the metrics, e.g. by grouping by `host` + +```json +{ + "time": $min(metrics.timestamp) * 1000 ~> $fromMillis(), + "images": metrics{ + tags.host: { + name: fields.n_images + } + }, + "capacity alerts": metrics[fields.n_images < 10].[(tags.host & " " & name)] +} +``` + +resulting in + +```json +{ + "time": "2016-03-17T15:39:00.000Z", + "images": { + "raynor": { + "docker": 660 + }, + "amaranth": { + "docker": 72, + "storage": 0 + } + }, + "capacity alerts": [ + "amaranth storage" + ] +} +``` + +Please consult the JSONata documentation for more examples and details. diff --git a/plugins/serializers/json/json.go b/plugins/serializers/json/json.go index a000a1c62..222be9ea2 100644 --- a/plugins/serializers/json/json.go +++ b/plugins/serializers/json/json.go @@ -2,28 +2,55 @@ package json import ( "encoding/json" + "errors" + "fmt" "math" "time" + jsonata "github.com/blues/jsonata-go" + "github.com/influxdata/telegraf" ) type Serializer struct { TimestampUnits time.Duration TimestampFormat string + + transformation *jsonata.Expr } -func NewSerializer(timestampUnits time.Duration, timestampFormat string) (*Serializer, error) { +func NewSerializer(timestampUnits time.Duration, timestampFormat, transform string) (*Serializer, error) { s := &Serializer{ TimestampUnits: truncateDuration(timestampUnits), TimestampFormat: timestampFormat, } + + if transform != "" { + e, err := jsonata.Compile(transform) + if err != nil { + return nil, err + } + s.transformation = e + } + return s, nil } func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { - m := s.createObject(metric) - serialized, err := json.Marshal(m) + var obj interface{} + obj = s.createObject(metric) + + if s.transformation != nil { + var err error + if obj, err = s.transform(obj); err != nil { + if errors.Is(err, jsonata.ErrUndefined) { + return nil, fmt.Errorf("%v (maybe configured for batch mode?)", err) + } + return nil, err + } + } + + serialized, err := json.Marshal(obj) if err != nil { return []byte{}, err } @@ -39,10 +66,21 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { objects = append(objects, m) } - obj := map[string]interface{}{ + var obj interface{} + obj = map[string]interface{}{ "metrics": objects, } + if s.transformation != nil { + var err error + if obj, err = s.transform(obj); err != nil { + if errors.Is(err, jsonata.ErrUndefined) { + return nil, fmt.Errorf("%v (maybe configured for non-batch mode?)", err) + } + return nil, err + } + } + serialized, err := json.Marshal(obj) if err != nil { return []byte{}, err @@ -80,6 +118,10 @@ func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{} return m } +func (s *Serializer) transform(obj interface{}) (interface{}, error) { + return s.transformation.Eval(obj) +} + func truncateDuration(units time.Duration) time.Duration { // Default precision is 1s if units <= 0 { diff --git a/plugins/serializers/json/json_test.go b/plugins/serializers/json/json_test.go index eaecb9603..5b6ad9c15 100644 --- a/plugins/serializers/json/json_test.go +++ b/plugins/serializers/json/json_test.go @@ -1,15 +1,21 @@ package json import ( + "encoding/json" "fmt" "math" + "os" + "path/filepath" + "strings" "testing" "time" + "github.com/influxdata/toml" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -23,8 +29,8 @@ func TestSerializeMetricFloat(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, _ := NewSerializer(0, "") - var buf []byte + s, err := NewSerializer(0, "", "") + require.NoError(t, err) buf, err := s.Serialize(m) require.NoError(t, err) expS := []byte(fmt.Sprintf(`{"fields":{"usage_idle":91.5},"name":"cpu","tags":{"cpu":"cpu0"},"timestamp":%d}`, now.Unix()) + "\n") @@ -84,7 +90,8 @@ func TestSerialize_TimestampUnits(t *testing.T) { }, time.Unix(1525478795, 123456789), ) - s, _ := NewSerializer(tt.timestampUnits, tt.timestampFormat) + s, err := NewSerializer(tt.timestampUnits, tt.timestampFormat, "") + require.NoError(t, err) actual, err := s.Serialize(m) require.NoError(t, err) require.Equal(t, tt.expected+"\n", string(actual)) @@ -102,8 +109,8 @@ func TestSerializeMetricInt(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, _ := NewSerializer(0, "") - var buf []byte + s, err := NewSerializer(0, "", "") + require.NoError(t, err) buf, err := s.Serialize(m) require.NoError(t, err) @@ -121,8 +128,8 @@ func TestSerializeMetricString(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, _ := NewSerializer(0, "") - var buf []byte + s, err := NewSerializer(0, "", "") + require.NoError(t, err) buf, err := s.Serialize(m) require.NoError(t, err) @@ -141,8 +148,8 @@ func TestSerializeMultiFields(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, _ := NewSerializer(0, "") - var buf []byte + s, err := NewSerializer(0, "", "") + require.NoError(t, err) buf, err := s.Serialize(m) require.NoError(t, err) @@ -160,7 +167,8 @@ func TestSerializeMetricWithEscapes(t *testing.T) { } m := metric.New("My CPU", tags, fields, now) - s, _ := NewSerializer(0, "") + s, err := NewSerializer(0, "", "") + require.NoError(t, err) buf, err := s.Serialize(m) require.NoError(t, err) @@ -179,7 +187,8 @@ func TestSerializeBatch(t *testing.T) { ) metrics := []telegraf.Metric{m, m} - s, _ := NewSerializer(0, "") + s, err := NewSerializer(0, "", "") + require.NoError(t, err) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) require.Equal(t, []byte(`{"metrics":[{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0},{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0}]}`), buf) @@ -198,7 +207,7 @@ func TestSerializeBatchSkipInf(t *testing.T) { ), } - s, err := NewSerializer(0, "") + s, err := NewSerializer(0, "", "") require.NoError(t, err) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) @@ -217,9 +226,131 @@ func TestSerializeBatchSkipInfAllFields(t *testing.T) { ), } - s, err := NewSerializer(0, "") + s, err := NewSerializer(0, "", "") require.NoError(t, err) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) require.Equal(t, []byte(`{"metrics":[{"fields":{},"name":"cpu","tags":{},"timestamp":0}]}`), buf) } + +func TestSerializeTransformationNonBatch(t *testing.T) { + var tests = []struct { + name string + filename string + }{ + { + name: "non-batch transformation test", + filename: "testcases/transformation_single.conf", + }, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filename := filepath.FromSlash(tt.filename) + cfg, header, err := loadTestConfiguration(filename) + require.NoError(t, err) + + // Get the input metrics + metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser) + require.NoError(t, err) + + // Get the expectations + expectedArray, err := loadJSON(strings.TrimSuffix(filename, ".conf") + "_out.json") + require.NoError(t, err) + expected := expectedArray.([]interface{}) + + // Serialize + serializer, err := NewSerializer(cfg.TimestampUnits, cfg.TimestampFormat, cfg.Transformation) + require.NoError(t, err) + for i, m := range metrics { + buf, err := serializer.Serialize(m) + require.NoError(t, err) + + // Compare + var actual interface{} + require.NoError(t, json.Unmarshal(buf, &actual)) + fmt.Printf("actual: %v\n", actual) + fmt.Printf("expected: %v\n", expected[i]) + require.EqualValuesf(t, expected[i], actual, "mismatch in %d", i) + } + }) + } +} + +func TestSerializeTransformationBatch(t *testing.T) { + var tests = []struct { + name string + filename string + }{ + { + name: "batch transformation test", + filename: "testcases/transformation_batch.conf", + }, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filename := filepath.FromSlash(tt.filename) + cfg, header, err := loadTestConfiguration(filename) + require.NoError(t, err) + + // Get the input metrics + metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser) + require.NoError(t, err) + + // Get the expectations + expected, err := loadJSON(strings.TrimSuffix(filename, ".conf") + "_out.json") + require.NoError(t, err) + + // Serialize + serializer, err := NewSerializer(cfg.TimestampUnits, cfg.TimestampFormat, cfg.Transformation) + require.NoError(t, err) + buf, err := serializer.SerializeBatch(metrics) + require.NoError(t, err) + + // Compare + var actual interface{} + require.NoError(t, json.Unmarshal(buf, &actual)) + require.EqualValues(t, expected, actual) + }) + } +} + +type Config struct { + TimestampUnits time.Duration `toml:"json_timestamp_units"` + TimestampFormat string `toml:"json_timestamp_format"` + Transformation string `toml:"json_transformation"` +} + +func loadTestConfiguration(filename string) (*Config, []string, error) { + buf, err := os.ReadFile(filename) + if err != nil { + return nil, nil, err + } + + header := make([]string, 0) + for _, line := range strings.Split(string(buf), "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "#") { + header = append(header, line) + } + } + var cfg Config + err = toml.Unmarshal(buf, &cfg) + return &cfg, header, err +} + +func loadJSON(filename string) (interface{}, error) { + buf, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + + var data interface{} + err = json.Unmarshal(buf, &data) + return data, err +} diff --git a/plugins/serializers/json/testcases/transformation_batch.conf b/plugins/serializers/json/testcases/transformation_batch.conf new file mode 100644 index 000000000..c0f48d5e0 --- /dev/null +++ b/plugins/serializers/json/testcases/transformation_batch.conf @@ -0,0 +1,24 @@ +# Example for transforming the output JSON with batch metrics. +# +# Input: +# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000 +# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000 + +json_transformation = ''' +metrics.{ + "sdkVersion": tags.sdkver, + "time": timestamp, + "platform": platform, + "key": tags.key, + "events": [ + { + "time": timestamp, + "flag": tags.flagname, + "experimentVersion": 0, + "value": tags.value, + "type": $uppercase(name), + "count": fields.count_sum + } + ] +} +''' \ No newline at end of file diff --git a/plugins/serializers/json/testcases/transformation_batch_out.json b/plugins/serializers/json/testcases/transformation_batch_out.json new file mode 100644 index 000000000..06e8abc81 --- /dev/null +++ b/plugins/serializers/json/testcases/transformation_batch_out.json @@ -0,0 +1,32 @@ +[ + { + "sdkVersion": "4.9.1", + "time": 1653643420, + "key": "12345", + "events": [ + { + "time": 1653643420, + "flag": "F5", + "experimentVersion": 0, + "value": "false", + "type": "IMPRESSION", + "count": 5 + } + ] + }, + { + "sdkVersion": "1.18.3", + "time": 1653646789, + "key": "67890", + "events": [ + { + "time": 1653646789, + "flag": "E42", + "experimentVersion": 0, + "value": "true", + "type": "EXPRESSION", + "count": 42 + } + ] + } +] \ No newline at end of file diff --git a/plugins/serializers/json/testcases/transformation_single.conf b/plugins/serializers/json/testcases/transformation_single.conf new file mode 100644 index 000000000..ca5ea3626 --- /dev/null +++ b/plugins/serializers/json/testcases/transformation_single.conf @@ -0,0 +1,24 @@ +# Example for transforming the output JSON in non-batch mode. +# +# Input: +# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000 +# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000 + +json_transformation = ''' +{ + "sdkVersion": tags.sdkver, + "time": timestamp, + "platform": platform, + "key": tags.key, + "events": [ + { + "time": timestamp, + "flag": tags.flagname, + "experimentVersion": 0, + "value": tags.value, + "type": $uppercase(name), + "count": fields.count_sum + } + ] +} +''' \ No newline at end of file diff --git a/plugins/serializers/json/testcases/transformation_single_out.json b/plugins/serializers/json/testcases/transformation_single_out.json new file mode 100644 index 000000000..06e8abc81 --- /dev/null +++ b/plugins/serializers/json/testcases/transformation_single_out.json @@ -0,0 +1,32 @@ +[ + { + "sdkVersion": "4.9.1", + "time": 1653643420, + "key": "12345", + "events": [ + { + "time": 1653643420, + "flag": "F5", + "experimentVersion": 0, + "value": "false", + "type": "IMPRESSION", + "count": 5 + } + ] + }, + { + "sdkVersion": "1.18.3", + "time": 1653646789, + "key": "67890", + "events": [ + { + "time": 1653646789, + "flag": "E42", + "experimentVersion": 0, + "value": "true", + "type": "EXPRESSION", + "count": 42 + } + ] + } +] \ No newline at end of file diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 603b6bd73..9fefef2e4 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -101,6 +101,9 @@ type Config struct { // Timestamp format to use for JSON and CSV formatted output TimestampFormat string `toml:"timestamp_format"` + // Transformation as JSONata expression to use for JSON formatted output + Transformation string `toml:"transformation"` + // Include HEC routing fields for splunkmetric output HecRouting bool `toml:"hec_routing"` @@ -141,7 +144,7 @@ func NewSerializer(config *Config) (Serializer, error) { case "graphite": serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.GraphiteTagSanitizeMode, config.GraphiteSeparator, config.Templates) case "json": - serializer, err = NewJSONSerializer(config.TimestampUnits, config.TimestampFormat) + serializer, err = NewJSONSerializer(config.TimestampUnits, config.TimestampFormat, config.Transformation) case "splunkmetric": serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric) case "nowmetric": @@ -210,8 +213,8 @@ func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []stri return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions) } -func NewJSONSerializer(timestampUnits time.Duration, timestampFormat string) (Serializer, error) { - return json.NewSerializer(timestampUnits, timestampFormat) +func NewJSONSerializer(timestampUnits time.Duration, timestampFormat, transform string) (Serializer, error) { + return json.NewSerializer(timestampUnits, timestampFormat, transform) } func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) {