chore(serializers.json): Migrate to new-style framework (#13335)

This commit is contained in:
Sven Rebhan 2023-05-25 21:22:14 +02:00 committed by GitHub
parent a1743269cc
commit ece214e5a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 135 additions and 134 deletions

View File

@ -1481,12 +1481,6 @@ func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error)
c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields)
c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport)
c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits)
c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat)
c.getFieldString(tbl, "json_transformation", &sc.Transformation)
c.getFieldStringSlice(tbl, "json_nested_fields_include", &sc.JSONNestedFieldInclude)
c.getFieldStringSlice(tbl, "json_nested_fields_exclude", &sc.JSONNestedFieldExclude)
c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting)
c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric)
c.getFieldBool(tbl, "splunkmetric_omit_event_tag", &sc.SplunkmetricOmitEventTag)
@ -1566,9 +1560,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
// Serializer options to ignore
case "prefix", "template", "templates",
"influx_max_line_bytes", "influx_sort_fields", "influx_uint_support",
"json_timestamp_format", "json_timestamp_units", "json_transformation",
"json_nested_fields_include", "json_nested_fields_exclude",
"prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
"prometheus_compact_encoding",
"splunkmetric_hec_routing", "splunkmetric_multimetric", "splunkmetric_omit_event_tag",

View File

@ -248,11 +248,11 @@ func (adx *AzureDataExplorer) Init() error {
return fmt.Errorf("unknown ingestion type %q", adx.IngestionType)
}
serializer, err := json.NewSerializer(json.FormatConfig{
serializer := &json.Serializer{
TimestampUnits: time.Nanosecond,
TimestampFormat: time.RFC3339Nano,
})
if err != nil {
}
if err := serializer.Init(); err != nil {
return err
}
adx.serializer = serializer

View File

@ -105,11 +105,10 @@ func TestWrite(t *testing.T) {
for _, tC := range testCases {
t.Run(tC.name, func(t *testing.T) {
serializer, err := telegrafJson.NewSerializer(
telegrafJson.FormatConfig{
TimestampUnits: time.Second,
})
require.NoError(t, err)
serializer := &telegrafJson.Serializer{
TimestampUnits: time.Second,
}
require.NoError(t, serializer.Init())
ingestionType := "queued"
if tC.ingestionType != "" {
@ -159,7 +158,10 @@ func TestWrite(t *testing.T) {
}
func TestCreateAzureDataExplorerTable(t *testing.T) {
serializer, _ := telegrafJson.NewSerializer(telegrafJson.FormatConfig{TimestampUnits: time.Second})
serializer := &telegrafJson.Serializer{
TimestampUnits: time.Second,
}
require.NoError(t, serializer.Init())
plugin := AzureDataExplorer{
Endpoint: "someendpoint",
Database: "databasename",
@ -253,8 +255,10 @@ func TestWriteWithType(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
serializer, err := telegrafJson.NewSerializer(telegrafJson.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
serializer := &telegrafJson.Serializer{
TimestampUnits: time.Second,
}
require.NoError(t, serializer.Init())
for tableName, jsonValue := range testCase.tableNameToExpectedResult {
ingestionType := "queued"
if testCase.ingestionType != "" {

View File

@ -42,8 +42,11 @@ func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIt
/* End wrapper interface */
func TestInitAndWrite(t *testing.T) {
serializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
serializer := &json.Serializer{
TimestampUnits: time.Second,
}
require.NoError(t, serializer.Init())
mockHub := &mockEventHub{}
e := &EventHubs{
Hub: mockHub,
@ -60,8 +63,7 @@ func TestInitAndWrite(t *testing.T) {
metrics := testutil.MockMetrics()
mockHub.On("SendBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
err = e.Write(metrics)
require.NoError(t, err)
require.NoError(t, e.Write(metrics))
mockHub.AssertExpectations(t)
}
@ -101,8 +103,10 @@ func TestInitAndWriteIntegration(t *testing.T) {
testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name
// Configure the plugin to target the newly created hub
serializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
serializer := &json.Serializer{
TimestampUnits: time.Second,
}
require.NoError(t, serializer.Init())
e := &EventHubs{
Hub: &eventHub{},
ConnectionString: testHubCS,

View File

@ -666,8 +666,9 @@ func TestBatchedUnbatched(t *testing.T) {
influxSerializer := &influx.Serializer{}
require.NoError(t, influxSerializer.Init())
jsonSerializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
jsonSerializer := &json.Serializer{TimestampUnits: time.Second}
require.NoError(t, jsonSerializer.Init())
s := map[string]serializers.Serializer{
"influx": influxSerializer,
"json": jsonSerializer,

View File

@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/testutil"
)
@ -28,12 +28,13 @@ func TestConnectAndWrite(t *testing.T) {
require.NoError(t, err, "failed to start container")
defer container.Terminate()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewJSONSerializer(
&serializers.Config{
TimestampUnits: 10 * time.Second,
TimestampFormat: "yyy-dd-mmThh:mm:ss",
})
require.NoError(t, err)
s := &json.Serializer{
TimestampUnits: 10 * time.Second,
TimestampFormat: "yyy-dd-mmThh:mm:ss",
}
require.NoError(t, s.Init())
st := &STOMP{
Host: url,
QueueName: "test_queue",

View File

@ -0,0 +1,7 @@
//go:build !custom || serializers || serializers.json
package all
import (
_ "github.com/influxdata/telegraf/plugins/serializers/json" // register plugin
)

View File

@ -11,47 +11,51 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/serializers"
)
type FormatConfig struct {
TimestampUnits time.Duration
TimestampFormat string
Transformation string
NestedFieldsInclude []string
NestedFieldsExclude []string
}
type Serializer struct {
TimestampUnits time.Duration
TimestampFormat string
TimestampUnits time.Duration `toml:"json_timestamp_units"`
TimestampFormat string `toml:"json_timestamp_format"`
Transformation string `toml:"json_transformation"`
NestedFieldsInclude []string `toml:"json_nested_fields_include"`
NestedFieldsExclude []string `toml:"json_nested_fields_exclude"`
transformation string
nestedfields filter.Filter
nestedfields filter.Filter
}
func NewSerializer(cfg FormatConfig) (*Serializer, error) {
s := &Serializer{
TimestampUnits: truncateDuration(cfg.TimestampUnits),
TimestampFormat: cfg.TimestampFormat,
transformation: cfg.Transformation,
func (s *Serializer) Init() error {
// Default precision is 1s
if s.TimestampUnits <= 0 {
s.TimestampUnits = time.Second
}
if len(cfg.NestedFieldsInclude) > 0 || len(cfg.NestedFieldsExclude) > 0 {
f, err := filter.NewIncludeExcludeFilter(cfg.NestedFieldsInclude, cfg.NestedFieldsExclude)
// Search for the power of ten less than the duration
d := time.Nanosecond
for {
if d*10 > s.TimestampUnits {
s.TimestampUnits = d
break
}
d = d * 10
}
if len(s.NestedFieldsInclude) > 0 || len(s.NestedFieldsExclude) > 0 {
f, err := filter.NewIncludeExcludeFilter(s.NestedFieldsInclude, s.NestedFieldsExclude)
if err != nil {
return nil, err
return err
}
s.nestedfields = f
}
return s, nil
return nil
}
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
var obj interface{}
obj = s.createObject(metric)
if s.transformation != "" {
if s.Transformation != "" {
var err error
if obj, err = s.transform(obj); err != nil {
if errors.Is(err, jsonata.ErrUndefined) {
@ -82,7 +86,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
"metrics": objects,
}
if s.transformation != "" {
if s.Transformation != "" {
var err error
if obj, err = s.transform(obj); err != nil {
if errors.Is(err, jsonata.ErrUndefined) {
@ -143,7 +147,7 @@ func (s *Serializer) createObject(metric telegraf.Metric) map[string]interface{}
}
func (s *Serializer) transform(obj interface{}) (interface{}, error) {
transformation, err := jsonata.Compile(s.transformation)
transformation, err := jsonata.Compile(s.Transformation)
if err != nil {
return nil, err
}
@ -151,18 +155,21 @@ func (s *Serializer) transform(obj interface{}) (interface{}, error) {
return transformation.Eval(obj)
}
func truncateDuration(units time.Duration) time.Duration {
// Default precision is 1s
if units <= 0 {
return time.Second
}
// Search for the power of ten less than the duration
d := time.Nanosecond
for {
if d*10 > units {
return d
}
d = d * 10
}
func init() {
serializers.Add("json",
func() serializers.Serializer {
return &Serializer{}
},
)
}
// InitFromConfig is a compatibility function to construct the parser the old way
func (s *Serializer) InitFromConfig(cfg *serializers.Config) error {
s.TimestampUnits = cfg.TimestampUnits
s.TimestampFormat = cfg.TimestampFormat
s.Transformation = cfg.Transformation
s.NestedFieldsInclude = cfg.JSONNestedFieldInclude
s.NestedFieldsExclude = cfg.JSONNestedFieldExclude
return nil
}

View File

@ -29,8 +29,8 @@ func TestSerializeMetricFloat(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
s := Serializer{}
require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
expS := []byte(fmt.Sprintf(`{"fields":{"usage_idle":91.5},"name":"cpu","tags":{"cpu":"cpu0"},"timestamp":%d}`, now.Unix()) + "\n")
@ -90,11 +90,11 @@ func TestSerialize_TimestampUnits(t *testing.T) {
},
time.Unix(1525478795, 123456789),
)
s, err := NewSerializer(FormatConfig{
s := Serializer{
TimestampUnits: tt.timestampUnits,
TimestampFormat: tt.timestampFormat,
})
require.NoError(t, err)
}
require.NoError(t, s.Init())
actual, err := s.Serialize(m)
require.NoError(t, err)
require.Equal(t, tt.expected+"\n", string(actual))
@ -112,8 +112,8 @@ func TestSerializeMetricInt(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
s := Serializer{}
require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -131,8 +131,8 @@ func TestSerializeMetricString(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
s := Serializer{}
require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -151,8 +151,8 @@ func TestSerializeMultiFields(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
s := Serializer{}
require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -170,8 +170,8 @@ func TestSerializeMetricWithEscapes(t *testing.T) {
}
m := metric.New("My CPU", tags, fields, now)
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
s := Serializer{}
require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -188,10 +188,10 @@ func TestSerializeBatch(t *testing.T) {
},
time.Unix(0, 0),
)
metrics := []telegraf.Metric{m, m}
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
s := Serializer{}
require.NoError(t, s.Init())
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
require.Equal(
@ -214,8 +214,8 @@ func TestSerializeBatchSkipInf(t *testing.T) {
),
}
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
s := Serializer{}
require.NoError(t, s.Init())
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
require.Equal(t, []byte(`{"metrics":[{"fields":{"time_idle":42},"name":"cpu","tags":{},"timestamp":0}]}`), buf)
@ -233,8 +233,8 @@ func TestSerializeBatchSkipInfAllFields(t *testing.T) {
),
}
s, err := NewSerializer(FormatConfig{})
require.NoError(t, err)
s := Serializer{}
require.NoError(t, s.Init())
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
require.Equal(t, []byte(`{"metrics":[{"fields":{},"name":"cpu","tags":{},"timestamp":0}]}`), buf)
@ -269,13 +269,13 @@ func TestSerializeTransformationNonBatch(t *testing.T) {
expected := expectedArray.([]interface{})
// Serialize
serializer, err := NewSerializer(
FormatConfig{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
})
require.NoError(t, err)
serializer := Serializer{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
}
require.NoError(t, serializer.Init())
for i, m := range metrics {
buf, err := serializer.Serialize(m)
require.NoError(t, err)
@ -317,13 +317,13 @@ func TestSerializeTransformationBatch(t *testing.T) {
require.NoError(t, err)
// Serialize
serializer, err := NewSerializer(
FormatConfig{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
})
require.NoError(t, err)
serializer := Serializer{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
}
require.NoError(t, serializer.Init())
buf, err := serializer.SerializeBatch(metrics)
require.NoError(t, err)
@ -376,12 +376,10 @@ func TestSerializeTransformationIssue12734(t *testing.T) {
}
// Setup serializer
serializer, err := NewSerializer(
FormatConfig{
Transformation: transformation,
},
)
require.NoError(t, err)
serializer := Serializer{
Transformation: transformation,
}
require.NoError(t, serializer.Init())
// Check multiple serializations as issue #12734 shows that the
// transformation breaks after the first iteration
@ -433,15 +431,14 @@ func TestSerializeNesting(t *testing.T) {
expected := expectedArray.(map[string]interface{})
// Serialize
serializer, err := NewSerializer(
FormatConfig{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
NestedFieldsInclude: cfg.JSONNestedFieldsInclude,
NestedFieldsExclude: cfg.JSONNestedFieldsExclude,
})
require.NoError(t, err)
serializer := Serializer{
TimestampUnits: cfg.TimestampUnits,
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
NestedFieldsInclude: cfg.JSONNestedFieldsInclude,
NestedFieldsExclude: cfg.JSONNestedFieldsExclude,
}
require.NoError(t, serializer.Init())
buf, err := serializer.Serialize(metrics[0])
require.NoError(t, err)

View File

@ -6,6 +6,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/plugins/serializers/msgpack"
"github.com/influxdata/telegraf/plugins/serializers/nowmetric"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
"github.com/influxdata/telegraf/plugins/serializers/prometheusremotewrite"
@ -162,8 +163,6 @@ func NewSerializer(config *Config) (Serializer, error) {
var err error
var serializer Serializer
switch config.DataFormat {
case "json":
serializer, err = NewJSONSerializer(config)
case "splunkmetric":
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric, config.SplunkmetricOmitEventTag), nil
case "nowmetric":
@ -243,16 +242,6 @@ func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []stri
return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions)
}
func NewJSONSerializer(config *Config) (Serializer, error) {
return json.NewSerializer(json.FormatConfig{
TimestampUnits: config.TimestampUnits,
TimestampFormat: config.TimestampFormat,
Transformation: config.Transformation,
NestedFieldsInclude: config.JSONNestedFieldInclude,
NestedFieldsExclude: config.JSONNestedFieldExclude,
})
}
func NewSplunkmetricSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool, splunkmetricOmitEventTag bool) Serializer {
return splunkmetric.NewSerializer(splunkmetricHecRouting, splunkmetricMultimetric, splunkmetricOmitEventTag)
}