feat(parsers.json): Allow JSONata based transformations in JSON serializer (#11251)
This commit is contained in:
parent
ffb06c2168
commit
9f3a7414a9
|
|
@ -1355,6 +1355,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)
|
||||||
|
|
||||||
c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits)
|
c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits)
|
||||||
c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat)
|
c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat)
|
||||||
|
c.getFieldString(tbl, "json_transformation", &sc.Transformation)
|
||||||
|
|
||||||
c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting)
|
c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting)
|
||||||
c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric)
|
c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric)
|
||||||
|
|
|
||||||
|
|
@ -68,6 +68,7 @@ following works:
|
||||||
- github.com/awslabs/kinesis-aggregation/go [Apache License 2.0](https://github.com/awslabs/kinesis-aggregation/blob/master/LICENSE.txt)
|
- github.com/awslabs/kinesis-aggregation/go [Apache License 2.0](https://github.com/awslabs/kinesis-aggregation/blob/master/LICENSE.txt)
|
||||||
- github.com/benbjohnson/clock [MIT License](https://github.com/benbjohnson/clock/blob/master/LICENSE)
|
- github.com/benbjohnson/clock [MIT License](https://github.com/benbjohnson/clock/blob/master/LICENSE)
|
||||||
- github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE)
|
- github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE)
|
||||||
|
- github.com/blues/jsonata-go [MIT License](https://github.com/blues/jsonata-go/blob/main/LICENSE)
|
||||||
- github.com/bmatcuk/doublestar [MIT License](https://github.com/bmatcuk/doublestar/blob/master/LICENSE)
|
- github.com/bmatcuk/doublestar [MIT License](https://github.com/bmatcuk/doublestar/blob/master/LICENSE)
|
||||||
- github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE)
|
- github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE)
|
||||||
- github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE)
|
- github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE)
|
||||||
|
|
|
||||||
1
go.mod
1
go.mod
|
|
@ -41,6 +41,7 @@ require (
|
||||||
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.13.6
|
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.13.6
|
||||||
github.com/aws/smithy-go v1.11.3
|
github.com/aws/smithy-go v1.11.3
|
||||||
github.com/benbjohnson/clock v1.3.0
|
github.com/benbjohnson/clock v1.3.0
|
||||||
|
github.com/blues/jsonata-go v1.5.4
|
||||||
github.com/bmatcuk/doublestar/v3 v3.0.0
|
github.com/bmatcuk/doublestar/v3 v3.0.0
|
||||||
github.com/caio/go-tdigest v3.1.0+incompatible
|
github.com/caio/go-tdigest v3.1.0+incompatible
|
||||||
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20220628142927-f4160bcb943c
|
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20220628142927-f4160bcb943c
|
||||||
|
|
|
||||||
2
go.sum
2
go.sum
|
|
@ -439,6 +439,8 @@ github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqO
|
||||||
github.com/bkielbasa/cyclop v1.2.0/go.mod h1:qOI0yy6A7dYC4Zgsa72Ppm9kONl0RoIlPbzot9mhmeI=
|
github.com/bkielbasa/cyclop v1.2.0/go.mod h1:qOI0yy6A7dYC4Zgsa72Ppm9kONl0RoIlPbzot9mhmeI=
|
||||||
github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||||
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||||
|
github.com/blues/jsonata-go v1.5.4 h1:XCsXaVVMrt4lcpKeJw6mNJHqQpWU751cnHdCFUq3xd8=
|
||||||
|
github.com/blues/jsonata-go v1.5.4/go.mod h1:uns2jymDrnI7y+UFYCqsRTEiAH22GyHnNXrkupAVFWI=
|
||||||
github.com/bmatcuk/doublestar/v3 v3.0.0 h1:TQtVPlDnAYwcrVNB2JiGuMc++H5qzWZd9PhkNo5WyHI=
|
github.com/bmatcuk/doublestar/v3 v3.0.0 h1:TQtVPlDnAYwcrVNB2JiGuMc++H5qzWZd9PhkNo5WyHI=
|
||||||
github.com/bmatcuk/doublestar/v3 v3.0.0/go.mod h1:6PcTVMw80pCY1RVuoqu3V++99uQB3vsSYKPTd8AWA0k=
|
github.com/bmatcuk/doublestar/v3 v3.0.0/go.mod h1:6PcTVMw80pCY1RVuoqu3V++99uQB3vsSYKPTd8AWA0k=
|
||||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
|
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
|
||||||
|
|
|
||||||
|
|
@ -220,7 +220,7 @@ func (adx *AzureDataExplorer) Init() error {
|
||||||
return errors.New("Metrics grouping type is not valid")
|
return errors.New("Metrics grouping type is not valid")
|
||||||
}
|
}
|
||||||
|
|
||||||
serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano)
|
serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -134,7 +134,7 @@ func TestWrite(t *testing.T) {
|
||||||
|
|
||||||
for _, tC := range testCases {
|
for _, tC := range testCases {
|
||||||
t.Run(tC.name, func(t *testing.T) {
|
t.Run(tC.name, func(t *testing.T) {
|
||||||
serializer, err := telegrafJson.NewSerializer(time.Second, "")
|
serializer, err := telegrafJson.NewSerializer(time.Second, "", "")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
plugin := AzureDataExplorer{
|
plugin := AzureDataExplorer{
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,8 @@ func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIt
|
||||||
/* End wrapper interface */
|
/* End wrapper interface */
|
||||||
|
|
||||||
func TestInitAndWrite(t *testing.T) {
|
func TestInitAndWrite(t *testing.T) {
|
||||||
serializer, _ := json.NewSerializer(time.Second, "")
|
serializer, err := json.NewSerializer(time.Second, "", "")
|
||||||
|
require.NoError(t, err)
|
||||||
mockHub := &mockEventHub{}
|
mockHub := &mockEventHub{}
|
||||||
e := &EventHubs{
|
e := &EventHubs{
|
||||||
Hub: mockHub,
|
Hub: mockHub,
|
||||||
|
|
@ -52,8 +53,7 @@ func TestInitAndWrite(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mockHub.On("GetHub", mock.Anything).Return(nil).Once()
|
mockHub.On("GetHub", mock.Anything).Return(nil).Once()
|
||||||
err := e.Init()
|
require.NoError(t, e.Init())
|
||||||
require.NoError(t, err)
|
|
||||||
mockHub.AssertExpectations(t)
|
mockHub.AssertExpectations(t)
|
||||||
|
|
||||||
metrics := testutil.MockMetrics()
|
metrics := testutil.MockMetrics()
|
||||||
|
|
@ -100,8 +100,8 @@ func TestInitAndWriteIntegration(t *testing.T) {
|
||||||
testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name
|
testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name
|
||||||
|
|
||||||
// Configure the plugin to target the newly created hub
|
// Configure the plugin to target the newly created hub
|
||||||
serializer, _ := json.NewSerializer(time.Second, "")
|
serializer, err := json.NewSerializer(time.Second, "", "")
|
||||||
|
require.NoError(t, err)
|
||||||
e := &EventHubs{
|
e := &EventHubs{
|
||||||
Hub: &eventHub{},
|
Hub: &eventHub{},
|
||||||
ConnectionString: testHubCS,
|
ConnectionString: testHubCS,
|
||||||
|
|
@ -110,13 +110,11 @@ func TestInitAndWriteIntegration(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that we can connect to Event Hubs
|
// Verify that we can connect to Event Hubs
|
||||||
err = e.Init()
|
require.NoError(t, e.Init())
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Verify that we can successfully write data to Event Hubs
|
// Verify that we can successfully write data to Event Hubs
|
||||||
metrics := testutil.MockMetrics()
|
metrics := testutil.MockMetrics()
|
||||||
err = e.Write(metrics)
|
require.NoError(t, e.Write(metrics))
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
** Verify we can read data back from the test hub
|
** Verify we can read data back from the test hub
|
||||||
|
|
|
||||||
|
|
@ -640,12 +640,11 @@ func TestBatchedUnbatched(t *testing.T) {
|
||||||
Method: defaultMethod,
|
Method: defaultMethod,
|
||||||
}
|
}
|
||||||
|
|
||||||
var s = map[string]serializers.Serializer{
|
jsonSerializer, err := json.NewSerializer(time.Second, "", "")
|
||||||
"influx": influx.NewSerializer(),
|
|
||||||
"json": func(s serializers.Serializer, err error) serializers.Serializer {
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
return s
|
s := map[string]serializers.Serializer{
|
||||||
}(json.NewSerializer(time.Second, "")),
|
"influx": influx.NewSerializer(),
|
||||||
|
"json": jsonSerializer,
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, serializer := range s {
|
for name, serializer := range s {
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ func TestConnectAndWrite(t *testing.T) {
|
||||||
require.NoError(t, container.Terminate(), "terminating container failed")
|
require.NoError(t, container.Terminate(), "terminating container failed")
|
||||||
}()
|
}()
|
||||||
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
|
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
|
||||||
s, err := serializers.NewJSONSerializer(10*time.Second, "yyy-dd-mmThh:mm:ss")
|
s, err := serializers.NewJSONSerializer(10*time.Second, "yyy-dd-mmThh:mm:ss", "")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st := &STOMP{
|
st := &STOMP{
|
||||||
Host: url,
|
Host: url,
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,11 @@ The `json` output data format converts metrics into JSON documents.
|
||||||
# layout specification from https://golang.org/pkg/time/#Time.Format
|
# layout specification from https://golang.org/pkg/time/#Time.Format
|
||||||
# e.g.: json_timestamp_format = "2006-01-02T15:04:05Z07:00"
|
# e.g.: json_timestamp_format = "2006-01-02T15:04:05Z07:00"
|
||||||
#json_timestamp_format = ""
|
#json_timestamp_format = ""
|
||||||
|
|
||||||
|
## A [JSONata](https://jsonata.org/) transformation of the JSON in [standard-form](#examples).
|
||||||
|
## This allows to generate an arbitrary output form based on the metric(s). Please use
|
||||||
|
## multiline strings (starting and ending with three single-quotes) if needed.
|
||||||
|
#json_transformation = ""
|
||||||
```
|
```
|
||||||
|
|
||||||
## Examples
|
## Examples
|
||||||
|
|
@ -84,3 +89,207 @@ reference the documentation for the specific plugin.
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Transformations
|
||||||
|
|
||||||
|
Transformations using the [JSONata standard](https://jsonata.org/) can be specified with
|
||||||
|
the `json_tansformation` parameter. The input to the transformation is the serialized
|
||||||
|
metric in the standard-form above.
|
||||||
|
|
||||||
|
**Note**: There is a difference in batch and non-batch serialization mode!
|
||||||
|
The former adds a `metrics` field containing the metric array, while the later
|
||||||
|
serializes the metric directly.
|
||||||
|
|
||||||
|
In the following sections, some rudimentary examples for transformations are shown.
|
||||||
|
For more elaborated JSONata expressions please consult the
|
||||||
|
[documentation](https://docs.jsonata.org) or the
|
||||||
|
[online playground](https://try.jsonata.org).
|
||||||
|
|
||||||
|
### Non-batch mode
|
||||||
|
|
||||||
|
In the following examples, we will use the following input to the transformation:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"fields": {
|
||||||
|
"field_1": 30,
|
||||||
|
"field_2": 4,
|
||||||
|
"field_N": 59,
|
||||||
|
"n_images": 660
|
||||||
|
},
|
||||||
|
"name": "docker",
|
||||||
|
"tags": {
|
||||||
|
"host": "raynor"
|
||||||
|
},
|
||||||
|
"timestamp": 1458229140
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
If you want to flatten the above metric, you can use
|
||||||
|
|
||||||
|
```json
|
||||||
|
$merge([{"name": name, "timestamp": timestamp}, tags, fields])
|
||||||
|
```
|
||||||
|
|
||||||
|
to get
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": "docker",
|
||||||
|
"timestamp": 1458229140,
|
||||||
|
"host": "raynor",
|
||||||
|
"field_1": 30,
|
||||||
|
"field_2": 4,
|
||||||
|
"field_N": 59,
|
||||||
|
"n_images": 660
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
It is also possible to do arithmetics or renaming
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"capacity": $sum($sift($.fields,function($value,$key){$key~>/^field_/}).*),
|
||||||
|
"images": fields.n_images,
|
||||||
|
"host": tags.host,
|
||||||
|
"time": $fromMillis(timestamp*1000)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
will result in
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"capacity": 93,
|
||||||
|
"images": 660,
|
||||||
|
"host": "raynor",
|
||||||
|
"time": "2016-03-17T15:39:00.000Z"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Batch mode
|
||||||
|
|
||||||
|
When an output plugin emits multiple metrics in a batch fashion it might be usefull
|
||||||
|
to restructure and/or combine the metric elements. We will use the following input
|
||||||
|
example in this section
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"metrics": [
|
||||||
|
{
|
||||||
|
"fields": {
|
||||||
|
"field_1": 30,
|
||||||
|
"field_2": 4,
|
||||||
|
"field_N": 59,
|
||||||
|
"n_images": 660
|
||||||
|
},
|
||||||
|
"name": "docker",
|
||||||
|
"tags": {
|
||||||
|
"host": "raynor"
|
||||||
|
},
|
||||||
|
"timestamp": 1458229140
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fields": {
|
||||||
|
"field_1": 12,
|
||||||
|
"field_2": 43,
|
||||||
|
"field_3": 0,
|
||||||
|
"field_4": 5,
|
||||||
|
"field_5": 7,
|
||||||
|
"field_N": 27,
|
||||||
|
"n_images": 72
|
||||||
|
},
|
||||||
|
"name": "docker",
|
||||||
|
"tags": {
|
||||||
|
"host": "amaranth"
|
||||||
|
},
|
||||||
|
"timestamp": 1458229140
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fields": {
|
||||||
|
"field_1": 5,
|
||||||
|
"field_N": 34,
|
||||||
|
"n_images": 0
|
||||||
|
},
|
||||||
|
"name": "storage",
|
||||||
|
"tags": {
|
||||||
|
"host": "amaranth"
|
||||||
|
},
|
||||||
|
"timestamp": 1458229140
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
We can do the same computation as above, iterating over the metrics
|
||||||
|
|
||||||
|
```json
|
||||||
|
metrics.{
|
||||||
|
"capacity": $sum($sift($.fields,function($value,$key){$key~>/^field_/}).*),
|
||||||
|
"images": fields.n_images,
|
||||||
|
"service": (name & "(" & tags.host & ")"),
|
||||||
|
"time": $fromMillis(timestamp*1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
resulting in
|
||||||
|
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"capacity": 93,
|
||||||
|
"images": 660,
|
||||||
|
"service": "docker(raynor)",
|
||||||
|
"time": "2016-03-17T15:39:00.000Z"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"capacity": 94,
|
||||||
|
"images": 72,
|
||||||
|
"service": "docker(amaranth)",
|
||||||
|
"time": "2016-03-17T15:39:00.000Z"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"capacity": 39,
|
||||||
|
"images": 0,
|
||||||
|
"service": "storage(amaranth)",
|
||||||
|
"time": "2016-03-17T15:39:00.000Z"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
However, the more interesting use-case is to restructure and **combine** the metrics, e.g. by grouping by `host`
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"time": $min(metrics.timestamp) * 1000 ~> $fromMillis(),
|
||||||
|
"images": metrics{
|
||||||
|
tags.host: {
|
||||||
|
name: fields.n_images
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"capacity alerts": metrics[fields.n_images < 10].[(tags.host & " " & name)]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
resulting in
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"time": "2016-03-17T15:39:00.000Z",
|
||||||
|
"images": {
|
||||||
|
"raynor": {
|
||||||
|
"docker": 660
|
||||||
|
},
|
||||||
|
"amaranth": {
|
||||||
|
"docker": 72,
|
||||||
|
"storage": 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"capacity alerts": [
|
||||||
|
"amaranth storage"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Please consult the JSONata documentation for more examples and details.
|
||||||
|
|
|
||||||
|
|
@ -2,28 +2,55 @@ package json
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
jsonata "github.com/blues/jsonata-go"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Serializer struct {
|
type Serializer struct {
|
||||||
TimestampUnits time.Duration
|
TimestampUnits time.Duration
|
||||||
TimestampFormat string
|
TimestampFormat string
|
||||||
|
|
||||||
|
transformation *jsonata.Expr
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSerializer(timestampUnits time.Duration, timestampFormat string) (*Serializer, error) {
|
func NewSerializer(timestampUnits time.Duration, timestampFormat, transform string) (*Serializer, error) {
|
||||||
s := &Serializer{
|
s := &Serializer{
|
||||||
TimestampUnits: truncateDuration(timestampUnits),
|
TimestampUnits: truncateDuration(timestampUnits),
|
||||||
TimestampFormat: timestampFormat,
|
TimestampFormat: timestampFormat,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if transform != "" {
|
||||||
|
e, err := jsonata.Compile(transform)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.transformation = e
|
||||||
|
}
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
m := s.createObject(metric)
|
var obj interface{}
|
||||||
serialized, err := json.Marshal(m)
|
obj = s.createObject(metric)
|
||||||
|
|
||||||
|
if s.transformation != nil {
|
||||||
|
var err error
|
||||||
|
if obj, err = s.transform(obj); err != nil {
|
||||||
|
if errors.Is(err, jsonata.ErrUndefined) {
|
||||||
|
return nil, fmt.Errorf("%v (maybe configured for batch mode?)", err)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
serialized, err := json.Marshal(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []byte{}, err
|
return []byte{}, err
|
||||||
}
|
}
|
||||||
|
|
@ -39,10 +66,21 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
objects = append(objects, m)
|
objects = append(objects, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj := map[string]interface{}{
|
var obj interface{}
|
||||||
|
obj = map[string]interface{}{
|
||||||
"metrics": objects,
|
"metrics": objects,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.transformation != nil {
|
||||||
|
var err error
|
||||||
|
if obj, err = s.transform(obj); err != nil {
|
||||||
|
if errors.Is(err, jsonata.ErrUndefined) {
|
||||||
|
return nil, fmt.Errorf("%v (maybe configured for non-batch mode?)", err)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
serialized, err := json.Marshal(obj)
|
serialized, err := json.Marshal(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []byte{}, err
|
return []byte{}, err
|
||||||
|
|
@ -80,6 +118,10 @@ func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{}
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Serializer) transform(obj interface{}) (interface{}, error) {
|
||||||
|
return s.transformation.Eval(obj)
|
||||||
|
}
|
||||||
|
|
||||||
func truncateDuration(units time.Duration) time.Duration {
|
func truncateDuration(units time.Duration) time.Duration {
|
||||||
// Default precision is 1s
|
// Default precision is 1s
|
||||||
if units <= 0 {
|
if units <= 0 {
|
||||||
|
|
|
||||||
|
|
@ -1,15 +1,21 @@
|
||||||
package json
|
package json
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/toml"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -23,8 +29,8 @@ func TestSerializeMetricFloat(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
s, _ := NewSerializer(0, "")
|
s, err := NewSerializer(0, "", "")
|
||||||
var buf []byte
|
require.NoError(t, err)
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expS := []byte(fmt.Sprintf(`{"fields":{"usage_idle":91.5},"name":"cpu","tags":{"cpu":"cpu0"},"timestamp":%d}`, now.Unix()) + "\n")
|
expS := []byte(fmt.Sprintf(`{"fields":{"usage_idle":91.5},"name":"cpu","tags":{"cpu":"cpu0"},"timestamp":%d}`, now.Unix()) + "\n")
|
||||||
|
|
@ -84,7 +90,8 @@ func TestSerialize_TimestampUnits(t *testing.T) {
|
||||||
},
|
},
|
||||||
time.Unix(1525478795, 123456789),
|
time.Unix(1525478795, 123456789),
|
||||||
)
|
)
|
||||||
s, _ := NewSerializer(tt.timestampUnits, tt.timestampFormat)
|
s, err := NewSerializer(tt.timestampUnits, tt.timestampFormat, "")
|
||||||
|
require.NoError(t, err)
|
||||||
actual, err := s.Serialize(m)
|
actual, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, tt.expected+"\n", string(actual))
|
require.Equal(t, tt.expected+"\n", string(actual))
|
||||||
|
|
@ -102,8 +109,8 @@ func TestSerializeMetricInt(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
s, _ := NewSerializer(0, "")
|
s, err := NewSerializer(0, "", "")
|
||||||
var buf []byte
|
require.NoError(t, err)
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
@ -121,8 +128,8 @@ func TestSerializeMetricString(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
s, _ := NewSerializer(0, "")
|
s, err := NewSerializer(0, "", "")
|
||||||
var buf []byte
|
require.NoError(t, err)
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
@ -141,8 +148,8 @@ func TestSerializeMultiFields(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
s, _ := NewSerializer(0, "")
|
s, err := NewSerializer(0, "", "")
|
||||||
var buf []byte
|
require.NoError(t, err)
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
@ -160,7 +167,8 @@ func TestSerializeMetricWithEscapes(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("My CPU", tags, fields, now)
|
m := metric.New("My CPU", tags, fields, now)
|
||||||
|
|
||||||
s, _ := NewSerializer(0, "")
|
s, err := NewSerializer(0, "", "")
|
||||||
|
require.NoError(t, err)
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
@ -179,7 +187,8 @@ func TestSerializeBatch(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
metrics := []telegraf.Metric{m, m}
|
metrics := []telegraf.Metric{m, m}
|
||||||
s, _ := NewSerializer(0, "")
|
s, err := NewSerializer(0, "", "")
|
||||||
|
require.NoError(t, err)
|
||||||
buf, err := s.SerializeBatch(metrics)
|
buf, err := s.SerializeBatch(metrics)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, []byte(`{"metrics":[{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0},{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0}]}`), buf)
|
require.Equal(t, []byte(`{"metrics":[{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0},{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0}]}`), buf)
|
||||||
|
|
@ -198,7 +207,7 @@ func TestSerializeBatchSkipInf(t *testing.T) {
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
s, err := NewSerializer(0, "")
|
s, err := NewSerializer(0, "", "")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
buf, err := s.SerializeBatch(metrics)
|
buf, err := s.SerializeBatch(metrics)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -217,9 +226,131 @@ func TestSerializeBatchSkipInfAllFields(t *testing.T) {
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
s, err := NewSerializer(0, "")
|
s, err := NewSerializer(0, "", "")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
buf, err := s.SerializeBatch(metrics)
|
buf, err := s.SerializeBatch(metrics)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, []byte(`{"metrics":[{"fields":{},"name":"cpu","tags":{},"timestamp":0}]}`), buf)
|
require.Equal(t, []byte(`{"metrics":[{"fields":{},"name":"cpu","tags":{},"timestamp":0}]}`), buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeTransformationNonBatch(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
filename string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "non-batch transformation test",
|
||||||
|
filename: "testcases/transformation_single.conf",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
filename := filepath.FromSlash(tt.filename)
|
||||||
|
cfg, header, err := loadTestConfiguration(filename)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Get the input metrics
|
||||||
|
metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Get the expectations
|
||||||
|
expectedArray, err := loadJSON(strings.TrimSuffix(filename, ".conf") + "_out.json")
|
||||||
|
require.NoError(t, err)
|
||||||
|
expected := expectedArray.([]interface{})
|
||||||
|
|
||||||
|
// Serialize
|
||||||
|
serializer, err := NewSerializer(cfg.TimestampUnits, cfg.TimestampFormat, cfg.Transformation)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for i, m := range metrics {
|
||||||
|
buf, err := serializer.Serialize(m)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Compare
|
||||||
|
var actual interface{}
|
||||||
|
require.NoError(t, json.Unmarshal(buf, &actual))
|
||||||
|
fmt.Printf("actual: %v\n", actual)
|
||||||
|
fmt.Printf("expected: %v\n", expected[i])
|
||||||
|
require.EqualValuesf(t, expected[i], actual, "mismatch in %d", i)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSerializeTransformationBatch(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
filename string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "batch transformation test",
|
||||||
|
filename: "testcases/transformation_batch.conf",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
filename := filepath.FromSlash(tt.filename)
|
||||||
|
cfg, header, err := loadTestConfiguration(filename)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Get the input metrics
|
||||||
|
metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Get the expectations
|
||||||
|
expected, err := loadJSON(strings.TrimSuffix(filename, ".conf") + "_out.json")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Serialize
|
||||||
|
serializer, err := NewSerializer(cfg.TimestampUnits, cfg.TimestampFormat, cfg.Transformation)
|
||||||
|
require.NoError(t, err)
|
||||||
|
buf, err := serializer.SerializeBatch(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Compare
|
||||||
|
var actual interface{}
|
||||||
|
require.NoError(t, json.Unmarshal(buf, &actual))
|
||||||
|
require.EqualValues(t, expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
TimestampUnits time.Duration `toml:"json_timestamp_units"`
|
||||||
|
TimestampFormat string `toml:"json_timestamp_format"`
|
||||||
|
Transformation string `toml:"json_transformation"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadTestConfiguration(filename string) (*Config, []string, error) {
|
||||||
|
buf, err := os.ReadFile(filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
header := make([]string, 0)
|
||||||
|
for _, line := range strings.Split(string(buf), "\n") {
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
if strings.HasPrefix(line, "#") {
|
||||||
|
header = append(header, line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var cfg Config
|
||||||
|
err = toml.Unmarshal(buf, &cfg)
|
||||||
|
return &cfg, header, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadJSON(filename string) (interface{}, error) {
|
||||||
|
buf, err := os.ReadFile(filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var data interface{}
|
||||||
|
err = json.Unmarshal(buf, &data)
|
||||||
|
return data, err
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
# Example for transforming the output JSON with batch metrics.
|
||||||
|
#
|
||||||
|
# Input:
|
||||||
|
# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000
|
||||||
|
# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000
|
||||||
|
|
||||||
|
json_transformation = '''
|
||||||
|
metrics.{
|
||||||
|
"sdkVersion": tags.sdkver,
|
||||||
|
"time": timestamp,
|
||||||
|
"platform": platform,
|
||||||
|
"key": tags.key,
|
||||||
|
"events": [
|
||||||
|
{
|
||||||
|
"time": timestamp,
|
||||||
|
"flag": tags.flagname,
|
||||||
|
"experimentVersion": 0,
|
||||||
|
"value": tags.value,
|
||||||
|
"type": $uppercase(name),
|
||||||
|
"count": fields.count_sum
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
'''
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"sdkVersion": "4.9.1",
|
||||||
|
"time": 1653643420,
|
||||||
|
"key": "12345",
|
||||||
|
"events": [
|
||||||
|
{
|
||||||
|
"time": 1653643420,
|
||||||
|
"flag": "F5",
|
||||||
|
"experimentVersion": 0,
|
||||||
|
"value": "false",
|
||||||
|
"type": "IMPRESSION",
|
||||||
|
"count": 5
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"sdkVersion": "1.18.3",
|
||||||
|
"time": 1653646789,
|
||||||
|
"key": "67890",
|
||||||
|
"events": [
|
||||||
|
{
|
||||||
|
"time": 1653646789,
|
||||||
|
"flag": "E42",
|
||||||
|
"experimentVersion": 0,
|
||||||
|
"value": "true",
|
||||||
|
"type": "EXPRESSION",
|
||||||
|
"count": 42
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
# Example for transforming the output JSON in non-batch mode.
|
||||||
|
#
|
||||||
|
# Input:
|
||||||
|
# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000
|
||||||
|
# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000
|
||||||
|
|
||||||
|
json_transformation = '''
|
||||||
|
{
|
||||||
|
"sdkVersion": tags.sdkver,
|
||||||
|
"time": timestamp,
|
||||||
|
"platform": platform,
|
||||||
|
"key": tags.key,
|
||||||
|
"events": [
|
||||||
|
{
|
||||||
|
"time": timestamp,
|
||||||
|
"flag": tags.flagname,
|
||||||
|
"experimentVersion": 0,
|
||||||
|
"value": tags.value,
|
||||||
|
"type": $uppercase(name),
|
||||||
|
"count": fields.count_sum
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
'''
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"sdkVersion": "4.9.1",
|
||||||
|
"time": 1653643420,
|
||||||
|
"key": "12345",
|
||||||
|
"events": [
|
||||||
|
{
|
||||||
|
"time": 1653643420,
|
||||||
|
"flag": "F5",
|
||||||
|
"experimentVersion": 0,
|
||||||
|
"value": "false",
|
||||||
|
"type": "IMPRESSION",
|
||||||
|
"count": 5
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"sdkVersion": "1.18.3",
|
||||||
|
"time": 1653646789,
|
||||||
|
"key": "67890",
|
||||||
|
"events": [
|
||||||
|
{
|
||||||
|
"time": 1653646789,
|
||||||
|
"flag": "E42",
|
||||||
|
"experimentVersion": 0,
|
||||||
|
"value": "true",
|
||||||
|
"type": "EXPRESSION",
|
||||||
|
"count": 42
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
@ -101,6 +101,9 @@ type Config struct {
|
||||||
// Timestamp format to use for JSON and CSV formatted output
|
// Timestamp format to use for JSON and CSV formatted output
|
||||||
TimestampFormat string `toml:"timestamp_format"`
|
TimestampFormat string `toml:"timestamp_format"`
|
||||||
|
|
||||||
|
// Transformation as JSONata expression to use for JSON formatted output
|
||||||
|
Transformation string `toml:"transformation"`
|
||||||
|
|
||||||
// Include HEC routing fields for splunkmetric output
|
// Include HEC routing fields for splunkmetric output
|
||||||
HecRouting bool `toml:"hec_routing"`
|
HecRouting bool `toml:"hec_routing"`
|
||||||
|
|
||||||
|
|
@ -141,7 +144,7 @@ func NewSerializer(config *Config) (Serializer, error) {
|
||||||
case "graphite":
|
case "graphite":
|
||||||
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.GraphiteTagSanitizeMode, config.GraphiteSeparator, config.Templates)
|
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.GraphiteTagSanitizeMode, config.GraphiteSeparator, config.Templates)
|
||||||
case "json":
|
case "json":
|
||||||
serializer, err = NewJSONSerializer(config.TimestampUnits, config.TimestampFormat)
|
serializer, err = NewJSONSerializer(config.TimestampUnits, config.TimestampFormat, config.Transformation)
|
||||||
case "splunkmetric":
|
case "splunkmetric":
|
||||||
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric)
|
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric)
|
||||||
case "nowmetric":
|
case "nowmetric":
|
||||||
|
|
@ -210,8 +213,8 @@ func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []stri
|
||||||
return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions)
|
return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewJSONSerializer(timestampUnits time.Duration, timestampFormat string) (Serializer, error) {
|
func NewJSONSerializer(timestampUnits time.Duration, timestampFormat, transform string) (Serializer, error) {
|
||||||
return json.NewSerializer(timestampUnits, timestampFormat)
|
return json.NewSerializer(timestampUnits, timestampFormat, transform)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) {
|
func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue