diff --git a/config/config.go b/config/config.go index ee52d2d5e..d673340ac 100644 --- a/config/config.go +++ b/config/config.go @@ -1481,12 +1481,6 @@ func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error) c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields) c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport) - c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits) - c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat) - c.getFieldString(tbl, "json_transformation", &sc.Transformation) - c.getFieldStringSlice(tbl, "json_nested_fields_include", &sc.JSONNestedFieldInclude) - c.getFieldStringSlice(tbl, "json_nested_fields_exclude", &sc.JSONNestedFieldExclude) - c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting) c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric) c.getFieldBool(tbl, "splunkmetric_omit_event_tag", &sc.SplunkmetricOmitEventTag) @@ -1566,9 +1560,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { // Serializer options to ignore case "prefix", "template", "templates", - "influx_max_line_bytes", "influx_sort_fields", "influx_uint_support", - "json_timestamp_format", "json_timestamp_units", "json_transformation", - "json_nested_fields_include", "json_nested_fields_exclude", "prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label", "prometheus_compact_encoding", "splunkmetric_hec_routing", "splunkmetric_multimetric", "splunkmetric_omit_event_tag", diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer.go b/plugins/outputs/azure_data_explorer/azure_data_explorer.go index 9ae5e4e1b..f09723415 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer.go @@ -248,11 +248,11 @@ func (adx *AzureDataExplorer) Init() error { return fmt.Errorf("unknown ingestion type %q", adx.IngestionType) } - serializer, err := json.NewSerializer(json.FormatConfig{ + serializer := &json.Serializer{ TimestampUnits: time.Nanosecond, TimestampFormat: time.RFC3339Nano, - }) - if err != nil { + } + if err := serializer.Init(); err != nil { return err } adx.serializer = serializer diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go index fcc3b5924..6c39dc995 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go @@ -105,11 +105,10 @@ func TestWrite(t *testing.T) { for _, tC := range testCases { t.Run(tC.name, func(t *testing.T) { - serializer, err := telegrafJson.NewSerializer( - telegrafJson.FormatConfig{ - TimestampUnits: time.Second, - }) - require.NoError(t, err) + serializer := &telegrafJson.Serializer{ + TimestampUnits: time.Second, + } + require.NoError(t, serializer.Init()) ingestionType := "queued" if tC.ingestionType != "" { @@ -159,7 +158,10 @@ func TestWrite(t *testing.T) { } func TestCreateAzureDataExplorerTable(t *testing.T) { - serializer, _ := telegrafJson.NewSerializer(telegrafJson.FormatConfig{TimestampUnits: time.Second}) + serializer := &telegrafJson.Serializer{ + TimestampUnits: time.Second, + } + require.NoError(t, serializer.Init()) plugin := AzureDataExplorer{ Endpoint: "someendpoint", Database: "databasename", @@ -253,8 +255,10 @@ func TestWriteWithType(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - serializer, err := telegrafJson.NewSerializer(telegrafJson.FormatConfig{TimestampUnits: time.Second}) - require.NoError(t, err) + serializer := &telegrafJson.Serializer{ + TimestampUnits: time.Second, + } + require.NoError(t, serializer.Init()) for tableName, jsonValue := range testCase.tableNameToExpectedResult { ingestionType := "queued" if testCase.ingestionType != "" { diff --git a/plugins/outputs/event_hubs/event_hubs_test.go b/plugins/outputs/event_hubs/event_hubs_test.go index efa5b5af9..80d48f6f5 100644 --- a/plugins/outputs/event_hubs/event_hubs_test.go +++ b/plugins/outputs/event_hubs/event_hubs_test.go @@ -42,8 +42,11 @@ func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIt /* End wrapper interface */ func TestInitAndWrite(t *testing.T) { - serializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second}) - require.NoError(t, err) + serializer := &json.Serializer{ + TimestampUnits: time.Second, + } + require.NoError(t, serializer.Init()) + mockHub := &mockEventHub{} e := &EventHubs{ Hub: mockHub, @@ -60,8 +63,7 @@ func TestInitAndWrite(t *testing.T) { metrics := testutil.MockMetrics() mockHub.On("SendBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - err = e.Write(metrics) - require.NoError(t, err) + require.NoError(t, e.Write(metrics)) mockHub.AssertExpectations(t) } @@ -101,8 +103,10 @@ func TestInitAndWriteIntegration(t *testing.T) { testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name // Configure the plugin to target the newly created hub - serializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second}) - require.NoError(t, err) + serializer := &json.Serializer{ + TimestampUnits: time.Second, + } + require.NoError(t, serializer.Init()) e := &EventHubs{ Hub: &eventHub{}, ConnectionString: testHubCS, diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go index 7dcd41900..aef008ce8 100644 --- a/plugins/outputs/http/http_test.go +++ b/plugins/outputs/http/http_test.go @@ -666,8 +666,9 @@ func TestBatchedUnbatched(t *testing.T) { influxSerializer := &influx.Serializer{} require.NoError(t, influxSerializer.Init()) - jsonSerializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second}) - require.NoError(t, err) + jsonSerializer := &json.Serializer{TimestampUnits: time.Second} + require.NoError(t, jsonSerializer.Init()) + s := map[string]serializers.Serializer{ "influx": influxSerializer, "json": jsonSerializer, diff --git a/plugins/outputs/stomp/stomp_test.go b/plugins/outputs/stomp/stomp_test.go index d36d9dabc..7357edb6b 100644 --- a/plugins/outputs/stomp/stomp_test.go +++ b/plugins/outputs/stomp/stomp_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" - "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/testutil" ) @@ -28,12 +28,13 @@ func TestConnectAndWrite(t *testing.T) { require.NoError(t, err, "failed to start container") defer container.Terminate() var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) - s, err := serializers.NewJSONSerializer( - &serializers.Config{ - TimestampUnits: 10 * time.Second, - TimestampFormat: "yyy-dd-mmThh:mm:ss", - }) - require.NoError(t, err) + + s := &json.Serializer{ + TimestampUnits: 10 * time.Second, + TimestampFormat: "yyy-dd-mmThh:mm:ss", + } + require.NoError(t, s.Init()) + st := &STOMP{ Host: url, QueueName: "test_queue", diff --git a/plugins/serializers/all/json.go b/plugins/serializers/all/json.go new file mode 100644 index 000000000..a98609b99 --- /dev/null +++ b/plugins/serializers/all/json.go @@ -0,0 +1,7 @@ +//go:build !custom || serializers || serializers.json + +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/serializers/json" // register plugin +) diff --git a/plugins/serializers/json/json.go b/plugins/serializers/json/json.go index dc783d66e..8a2201b24 100644 --- a/plugins/serializers/json/json.go +++ b/plugins/serializers/json/json.go @@ -11,47 +11,51 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/plugins/serializers" ) -type FormatConfig struct { - TimestampUnits time.Duration - TimestampFormat string - Transformation string - NestedFieldsInclude []string - NestedFieldsExclude []string -} - type Serializer struct { - TimestampUnits time.Duration - TimestampFormat string + TimestampUnits time.Duration `toml:"json_timestamp_units"` + TimestampFormat string `toml:"json_timestamp_format"` + Transformation string `toml:"json_transformation"` + NestedFieldsInclude []string `toml:"json_nested_fields_include"` + NestedFieldsExclude []string `toml:"json_nested_fields_exclude"` - transformation string - nestedfields filter.Filter + nestedfields filter.Filter } -func NewSerializer(cfg FormatConfig) (*Serializer, error) { - s := &Serializer{ - TimestampUnits: truncateDuration(cfg.TimestampUnits), - TimestampFormat: cfg.TimestampFormat, - transformation: cfg.Transformation, +func (s *Serializer) Init() error { + // Default precision is 1s + if s.TimestampUnits <= 0 { + s.TimestampUnits = time.Second } - if len(cfg.NestedFieldsInclude) > 0 || len(cfg.NestedFieldsExclude) > 0 { - f, err := filter.NewIncludeExcludeFilter(cfg.NestedFieldsInclude, cfg.NestedFieldsExclude) + // Search for the power of ten less than the duration + d := time.Nanosecond + for { + if d*10 > s.TimestampUnits { + s.TimestampUnits = d + break + } + d = d * 10 + } + + if len(s.NestedFieldsInclude) > 0 || len(s.NestedFieldsExclude) > 0 { + f, err := filter.NewIncludeExcludeFilter(s.NestedFieldsInclude, s.NestedFieldsExclude) if err != nil { - return nil, err + return err } s.nestedfields = f } - return s, nil + return nil } func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { var obj interface{} obj = s.createObject(metric) - if s.transformation != "" { + if s.Transformation != "" { var err error if obj, err = s.transform(obj); err != nil { if errors.Is(err, jsonata.ErrUndefined) { @@ -82,7 +86,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { "metrics": objects, } - if s.transformation != "" { + if s.Transformation != "" { var err error if obj, err = s.transform(obj); err != nil { if errors.Is(err, jsonata.ErrUndefined) { @@ -143,7 +147,7 @@ func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{} } func (s *Serializer) transform(obj interface{}) (interface{}, error) { - transformation, err := jsonata.Compile(s.transformation) + transformation, err := jsonata.Compile(s.Transformation) if err != nil { return nil, err } @@ -151,18 +155,21 @@ func (s *Serializer) transform(obj interface{}) (interface{}, error) { return transformation.Eval(obj) } -func truncateDuration(units time.Duration) time.Duration { - // Default precision is 1s - if units <= 0 { - return time.Second - } - - // Search for the power of ten less than the duration - d := time.Nanosecond - for { - if d*10 > units { - return d - } - d = d * 10 - } +func init() { + serializers.Add("json", + func() serializers.Serializer { + return &Serializer{} + }, + ) +} + +// InitFromConfig is a compatibility function to construct the parser the old way +func (s *Serializer) InitFromConfig(cfg *serializers.Config) error { + s.TimestampUnits = cfg.TimestampUnits + s.TimestampFormat = cfg.TimestampFormat + s.Transformation = cfg.Transformation + s.NestedFieldsInclude = cfg.JSONNestedFieldInclude + s.NestedFieldsExclude = cfg.JSONNestedFieldExclude + + return nil } diff --git a/plugins/serializers/json/json_test.go b/plugins/serializers/json/json_test.go index 4526db272..a78f7cbc0 100644 --- a/plugins/serializers/json/json_test.go +++ b/plugins/serializers/json/json_test.go @@ -29,8 +29,8 @@ func TestSerializeMetricFloat(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, err := NewSerializer(FormatConfig{}) - require.NoError(t, err) + s := Serializer{} + require.NoError(t, s.Init()) buf, err := s.Serialize(m) require.NoError(t, err) expS := []byte(fmt.Sprintf(`{"fields":{"usage_idle":91.5},"name":"cpu","tags":{"cpu":"cpu0"},"timestamp":%d}`, now.Unix()) + "\n") @@ -90,11 +90,11 @@ func TestSerialize_TimestampUnits(t *testing.T) { }, time.Unix(1525478795, 123456789), ) - s, err := NewSerializer(FormatConfig{ + s := Serializer{ TimestampUnits: tt.timestampUnits, TimestampFormat: tt.timestampFormat, - }) - require.NoError(t, err) + } + require.NoError(t, s.Init()) actual, err := s.Serialize(m) require.NoError(t, err) require.Equal(t, tt.expected+"\n", string(actual)) @@ -112,8 +112,8 @@ func TestSerializeMetricInt(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, err := NewSerializer(FormatConfig{}) - require.NoError(t, err) + s := Serializer{} + require.NoError(t, s.Init()) buf, err := s.Serialize(m) require.NoError(t, err) @@ -131,8 +131,8 @@ func TestSerializeMetricString(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, err := NewSerializer(FormatConfig{}) - require.NoError(t, err) + s := Serializer{} + require.NoError(t, s.Init()) buf, err := s.Serialize(m) require.NoError(t, err) @@ -151,8 +151,8 @@ func TestSerializeMultiFields(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s, err := NewSerializer(FormatConfig{}) - require.NoError(t, err) + s := Serializer{} + require.NoError(t, s.Init()) buf, err := s.Serialize(m) require.NoError(t, err) @@ -170,8 +170,8 @@ func TestSerializeMetricWithEscapes(t *testing.T) { } m := metric.New("My CPU", tags, fields, now) - s, err := NewSerializer(FormatConfig{}) - require.NoError(t, err) + s := Serializer{} + require.NoError(t, s.Init()) buf, err := s.Serialize(m) require.NoError(t, err) @@ -188,10 +188,10 @@ func TestSerializeBatch(t *testing.T) { }, time.Unix(0, 0), ) - metrics := []telegraf.Metric{m, m} - s, err := NewSerializer(FormatConfig{}) - require.NoError(t, err) + + s := Serializer{} + require.NoError(t, s.Init()) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) require.Equal( @@ -214,8 +214,8 @@ func TestSerializeBatchSkipInf(t *testing.T) { ), } - s, err := NewSerializer(FormatConfig{}) - require.NoError(t, err) + s := Serializer{} + require.NoError(t, s.Init()) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) require.Equal(t, []byte(`{"metrics":[{"fields":{"time_idle":42},"name":"cpu","tags":{},"timestamp":0}]}`), buf) @@ -233,8 +233,8 @@ func TestSerializeBatchSkipInfAllFields(t *testing.T) { ), } - s, err := NewSerializer(FormatConfig{}) - require.NoError(t, err) + s := Serializer{} + require.NoError(t, s.Init()) buf, err := s.SerializeBatch(metrics) require.NoError(t, err) require.Equal(t, []byte(`{"metrics":[{"fields":{},"name":"cpu","tags":{},"timestamp":0}]}`), buf) @@ -269,13 +269,13 @@ func TestSerializeTransformationNonBatch(t *testing.T) { expected := expectedArray.([]interface{}) // Serialize - serializer, err := NewSerializer( - FormatConfig{ - TimestampUnits: cfg.TimestampUnits, - TimestampFormat: cfg.TimestampFormat, - Transformation: cfg.Transformation, - }) - require.NoError(t, err) + serializer := Serializer{ + TimestampUnits: cfg.TimestampUnits, + TimestampFormat: cfg.TimestampFormat, + Transformation: cfg.Transformation, + } + require.NoError(t, serializer.Init()) + for i, m := range metrics { buf, err := serializer.Serialize(m) require.NoError(t, err) @@ -317,13 +317,13 @@ func TestSerializeTransformationBatch(t *testing.T) { require.NoError(t, err) // Serialize - serializer, err := NewSerializer( - FormatConfig{ - TimestampUnits: cfg.TimestampUnits, - TimestampFormat: cfg.TimestampFormat, - Transformation: cfg.Transformation, - }) - require.NoError(t, err) + serializer := Serializer{ + TimestampUnits: cfg.TimestampUnits, + TimestampFormat: cfg.TimestampFormat, + Transformation: cfg.Transformation, + } + require.NoError(t, serializer.Init()) + buf, err := serializer.SerializeBatch(metrics) require.NoError(t, err) @@ -376,12 +376,10 @@ func TestSerializeTransformationIssue12734(t *testing.T) { } // Setup serializer - serializer, err := NewSerializer( - FormatConfig{ - Transformation: transformation, - }, - ) - require.NoError(t, err) + serializer := Serializer{ + Transformation: transformation, + } + require.NoError(t, serializer.Init()) // Check multiple serializations as issue #12734 shows that the // transformation breaks after the first iteration @@ -433,15 +431,14 @@ func TestSerializeNesting(t *testing.T) { expected := expectedArray.(map[string]interface{}) // Serialize - serializer, err := NewSerializer( - FormatConfig{ - TimestampUnits: cfg.TimestampUnits, - TimestampFormat: cfg.TimestampFormat, - Transformation: cfg.Transformation, - NestedFieldsInclude: cfg.JSONNestedFieldsInclude, - NestedFieldsExclude: cfg.JSONNestedFieldsExclude, - }) - require.NoError(t, err) + serializer := Serializer{ + TimestampUnits: cfg.TimestampUnits, + TimestampFormat: cfg.TimestampFormat, + Transformation: cfg.Transformation, + NestedFieldsInclude: cfg.JSONNestedFieldsInclude, + NestedFieldsExclude: cfg.JSONNestedFieldsExclude, + } + require.NoError(t, serializer.Init()) buf, err := serializer.Serialize(metrics[0]) require.NoError(t, err) diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 36c2abe54..ab4988ae1 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/serializers/json" + "github.com/influxdata/telegraf/plugins/serializers/msgpack" "github.com/influxdata/telegraf/plugins/serializers/nowmetric" "github.com/influxdata/telegraf/plugins/serializers/prometheus" "github.com/influxdata/telegraf/plugins/serializers/prometheusremotewrite" @@ -162,8 +163,6 @@ func NewSerializer(config *Config) (Serializer, error) { var err error var serializer Serializer switch config.DataFormat { - case "json": - serializer, err = NewJSONSerializer(config) case "splunkmetric": serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric, config.SplunkmetricOmitEventTag), nil case "nowmetric": @@ -243,16 +242,6 @@ func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []stri return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions) } -func NewJSONSerializer(config *Config) (Serializer, error) { - return json.NewSerializer(json.FormatConfig{ - TimestampUnits: config.TimestampUnits, - TimestampFormat: config.TimestampFormat, - Transformation: config.Transformation, - NestedFieldsInclude: config.JSONNestedFieldInclude, - NestedFieldsExclude: config.JSONNestedFieldExclude, - }) -} - func NewSplunkmetricSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool, splunkmetricOmitEventTag bool) Serializer { return splunkmetric.NewSerializer(splunkmetricHecRouting, splunkmetricMultimetric, splunkmetricOmitEventTag) }