diff --git a/agent/agent.go b/agent/agent.go index cdced1e43..9f9c54714 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -961,9 +961,7 @@ func (a *Agent) Test(ctx context.Context, wait time.Duration) error { wg.Add(1) go func() { defer wg.Done() - s := influx.NewSerializer() - s.SetFieldSortOrder(influx.SortFields) - + s := &influx.Serializer{SortFields: true} for metric := range src { octets, err := s.Serialize(metric) if err == nil { diff --git a/cmd/telegraf/main.go b/cmd/telegraf/main.go index 65bcda89a..fe40cac1f 100644 --- a/cmd/telegraf/main.go +++ b/cmd/telegraf/main.go @@ -23,6 +23,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/parsers/all" _ "github.com/influxdata/telegraf/plugins/processors/all" _ "github.com/influxdata/telegraf/plugins/secretstores/all" + _ "github.com/influxdata/telegraf/plugins/serializers/all" ) type TelegrafConfig interface { diff --git a/config/config.go b/config/config.go index 8f8b178ea..55db82400 100644 --- a/config/config.go +++ b/config/config.go @@ -952,16 +952,45 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) } } - conf := c.buildParser(parentname, table) if err := c.toml.UnmarshalTable(table, parser); err != nil { return nil, err } + conf := &models.ParserConfig{ + Parent: parentname, + DataFormat: dataformat, + } running := models.NewRunningParser(parser, conf) err := running.Init() return running, err } +func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.RunningSerializer, error) { + var dataformat string + c.getFieldString(table, "data_format", &dataformat) + if dataformat == "" { + dataformat = "influx" + } + + creator, ok := serializers.Serializers[dataformat] + if !ok { + return nil, fmt.Errorf("undefined but requested serializer: %s", dataformat) + } + serializer := creator() + + if err := c.toml.UnmarshalTable(table, serializer); err != nil { + return nil, err + } + + conf := &models.SerializerConfig{ + Parent: parentname, + DataFormat: dataformat, + } + running := models.NewRunningSerializer(serializer, conf) + err := running.Init() + return running, err +} + func (c *Config) addProcessor(name string, table *ast.Table) error { creator, ok := processors.Processors[name] if !ok { @@ -1070,6 +1099,18 @@ func (c *Config) addOutput(name string, table *ast.Table) error { if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) { return nil } + + // For inputs with parsers we need to compute the set of + // options that is not covered by both, the parser and the input. + // We achieve this by keeping a local book of missing entries + // that counts the number of misses. In case we have a parser + // for the input both need to miss the entry. We count the + // missing entries at the end. + missThreshold := 0 + missCount := make(map[string]int) + c.setLocalMissingTomlFieldTracker(missCount) + defer c.resetMissingTomlFieldTracker() + creator, ok := outputs.Outputs[name] if !ok { // Handle removed, deprecated plugins @@ -1083,12 +1124,34 @@ func (c *Config) addOutput(name string, table *ast.Table) error { // If the output has a SetSerializer function, then this means it can write // arbitrary types of output, so build the serializer and set it. - if t, ok := output.(serializers.SerializerOutput); ok { - serializer, err := c.buildSerializer(table) - if err != nil { - return err + if t, ok := output.(telegraf.SerializerPlugin); ok { + missThreshold = 1 + if serializer, err := c.addSerializer(name, table); err == nil { + t.SetSerializer(serializer) + } else { + missThreshold = 0 + // Fallback to the old way of instantiating the parsers. + serializer, err := c.buildSerializerOld(table) + if err != nil { + return err + } + t.SetSerializer(serializer) + } + } else if t, ok := output.(serializers.SerializerOutput); ok { + // Keep the old interface for backward compatibility + // DEPRECATED: Please switch your plugin to telegraf.Serializers + missThreshold = 1 + if serializer, err := c.addSerializer(name, table); err == nil { + t.SetSerializer(serializer) + } else { + missThreshold = 0 + // Fallback to the old way of instantiating the parsers. + serializer, err := c.buildSerializerOld(table) + if err != nil { + return err + } + t.SetSerializer(serializer) } - t.SetSerializer(serializer) } outputConfig, err := c.buildOutput(name, table) @@ -1110,8 +1173,19 @@ func (c *Config) addOutput(name string, table *ast.Table) error { } } + // Check the number of misses against the threshold + for key, count := range missCount { + if count <= missThreshold { + continue + } + if err := c.missingTomlField(nil, key); err != nil { + return err + } + } + ro := models.NewRunningOutput(output, outputConfig, c.Agent.MetricBatchSize, c.Agent.MetricBufferLimit) c.Outputs = append(c.Outputs, ro) + return nil } @@ -1205,10 +1279,6 @@ func (c *Config) addInput(name string, table *ast.Table) error { } } - rp := models.NewRunningInput(input, pluginConfig) - rp.SetDefaultTags(c.Tags) - c.Inputs = append(c.Inputs, rp) - // Check the number of misses against the threshold for key, count := range missCount { if count <= missCountThreshold { @@ -1219,6 +1289,10 @@ func (c *Config) addInput(name string, table *ast.Table) error { } } + rp := models.NewRunningInput(input, pluginConfig) + rp.SetDefaultTags(c.Tags) + c.Inputs = append(c.Inputs, rp) + return nil } @@ -1266,21 +1340,6 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato return conf, err } -// buildParser parses Parser specific items from the ast.Table, -// builds the filter and returns a -// models.ParserConfig to be inserted into models.RunningParser -func (c *Config) buildParser(name string, tbl *ast.Table) *models.ParserConfig { - var dataFormat string - c.getFieldString(tbl, "data_format", &dataFormat) - - conf := &models.ParserConfig{ - Parent: name, - DataFormat: dataFormat, - } - - return conf -} - // buildProcessor parses Processor specific items from the ast.Table, // builds the filter and returns a // models.ProcessorConfig to be inserted into models.RunningProcessor @@ -1376,10 +1435,10 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e return cp, err } -// buildSerializer grabs the necessary entries from the ast.Table for creating +// buildSerializerOld grabs the necessary entries from the ast.Table for creating // a serializers.Serializer object, and creates it, which can then be added onto // an Output object. -func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error) { +func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error) { sc := &serializers.Config{TimestampUnits: 1 * time.Second} c.getFieldString(tbl, "data_format", &sc.DataFormat) @@ -1524,6 +1583,7 @@ func (c *Config) setLocalMissingTomlFieldTracker(counter map[string]int) { root = root || pt.Implements(reflect.TypeOf((*telegraf.Aggregator)(nil)).Elem()) root = root || pt.Implements(reflect.TypeOf((*telegraf.Processor)(nil)).Elem()) root = root || pt.Implements(reflect.TypeOf((*telegraf.Parser)(nil)).Elem()) + root = root || pt.Implements(reflect.TypeOf((*telegraf.Serializer)(nil)).Elem()) c, ok := counter[key] if !root { diff --git a/config/config_test.go b/config/config_test.go index 9cb58a37b..3cce7d411 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -9,6 +9,7 @@ import ( "os/exec" "path/filepath" "reflect" + "regexp" "runtime" "strings" "sync" @@ -29,6 +30,8 @@ import ( _ "github.com/influxdata/telegraf/plugins/parsers/all" // Blank import to have all parsers for testing "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/processors" + "github.com/influxdata/telegraf/plugins/serializers" + _ "github.com/influxdata/telegraf/plugins/serializers/all" // Blank import to have all serializers for testing ) func TestReadBinaryFile(t *testing.T) { @@ -511,6 +514,184 @@ func TestConfig_URLLikeFileName(t *testing.T) { } } +func TestConfig_SerializerInterfaceNewFormat(t *testing.T) { + formats := []string{ + "carbon2", + "csv", + "graphite", + "influx", + "json", + "msgpack", + "nowmetric", + "prometheus", + "prometheusremotewrite", + "splunkmetric", + "wavefront", + } + + c := NewConfig() + require.NoError(t, c.LoadConfig("./testdata/serializers_new.toml")) + require.Len(t, c.Outputs, len(formats)) + + cfg := serializers.Config{} + override := map[string]struct { + param map[string]interface{} + mask []string + }{} + + expected := make([]telegraf.Serializer, 0, len(formats)) + for _, format := range formats { + formatCfg := &cfg + formatCfg.DataFormat = format + + logger := models.NewLogger("serializers", format, "test") + + var serializer telegraf.Serializer + if creator, found := serializers.Serializers[format]; found { + serializer = creator() + } else { + var err error + serializer, err = serializers.NewSerializer(formatCfg) + require.NoErrorf(t, err, "No serializer for format %q", format) + } + + if settings, found := override[format]; found { + s := reflect.Indirect(reflect.ValueOf(serializer)) + for key, value := range settings.param { + v := reflect.ValueOf(value) + s.FieldByName(key).Set(v) + } + } + models.SetLoggerOnPlugin(serializer, logger) + if s, ok := serializer.(telegraf.Initializer); ok { + require.NoError(t, s.Init()) + } + expected = append(expected, serializer) + } + require.Len(t, expected, len(formats)) + + actual := make([]interface{}, 0) + for _, plugin := range c.Outputs { + output, ok := plugin.Output.(*MockupOutputPluginSerializerNew) + require.True(t, ok) + // Get the parser set with 'SetParser()' + if p, ok := output.Serializer.(*models.RunningSerializer); ok { + actual = append(actual, p.Serializer) + } else { + actual = append(actual, output.Serializer) + } + } + require.Len(t, actual, len(formats)) + + for i, format := range formats { + // Determine the underlying type of the serializer + stype := reflect.Indirect(reflect.ValueOf(expected[i])).Interface() + // Ignore all unexported fields and fields not relevant for functionality + options := []cmp.Option{ + cmpopts.IgnoreUnexported(stype), + cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}), + cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}), + } + if settings, found := override[format]; found { + options = append(options, cmpopts.IgnoreFields(stype, settings.mask...)) + } + + // Do a manual comparision as require.EqualValues will also work on unexported fields + // that cannot be cleared or ignored. + diff := cmp.Diff(expected[i], actual[i], options...) + require.Emptyf(t, diff, "Difference in SetSerializer() for %q", format) + } +} + +func TestConfig_SerializerInterfaceOldFormat(t *testing.T) { + formats := []string{ + "carbon2", + "csv", + "graphite", + "influx", + "json", + "msgpack", + "nowmetric", + "prometheus", + "prometheusremotewrite", + "splunkmetric", + "wavefront", + } + + c := NewConfig() + require.NoError(t, c.LoadConfig("./testdata/serializers_old.toml")) + require.Len(t, c.Outputs, len(formats)) + + cfg := serializers.Config{} + override := map[string]struct { + param map[string]interface{} + mask []string + }{} + + expected := make([]telegraf.Serializer, 0, len(formats)) + for _, format := range formats { + formatCfg := &cfg + formatCfg.DataFormat = format + + logger := models.NewLogger("serializers", format, "test") + + var serializer serializers.Serializer + if creator, found := serializers.Serializers[format]; found { + serializer = creator() + } else { + var err error + serializer, err = serializers.NewSerializer(formatCfg) + require.NoErrorf(t, err, "No serializer for format %q", format) + } + + if settings, found := override[format]; found { + s := reflect.Indirect(reflect.ValueOf(serializer)) + for key, value := range settings.param { + v := reflect.ValueOf(value) + s.FieldByName(key).Set(v) + } + } + models.SetLoggerOnPlugin(serializer, logger) + if s, ok := serializer.(telegraf.Initializer); ok { + require.NoError(t, s.Init()) + } + expected = append(expected, serializer) + } + require.Len(t, expected, len(formats)) + + actual := make([]interface{}, 0) + for _, plugin := range c.Outputs { + output, ok := plugin.Output.(*MockupOutputPluginSerializerOld) + require.True(t, ok) + // Get the parser set with 'SetParser()' + if p, ok := output.Serializer.(*models.RunningSerializer); ok { + actual = append(actual, p.Serializer) + } else { + actual = append(actual, output.Serializer) + } + } + require.Len(t, actual, len(formats)) + + for i, format := range formats { + // Determine the underlying type of the serializer + stype := reflect.Indirect(reflect.ValueOf(expected[i])).Interface() + // Ignore all unexported fields and fields not relevant for functionality + options := []cmp.Option{ + cmpopts.IgnoreUnexported(stype), + cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}), + cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}), + } + if settings, found := override[format]; found { + options = append(options, cmpopts.IgnoreFields(stype, settings.mask...)) + } + + // Do a manual comparison as require.EqualValues will also work on unexported fields + // that cannot be cleared or ignored. + diff := cmp.Diff(expected[i], actual[i], options...) + require.Emptyf(t, diff, "Difference in SetSerializer() for %q", format) + } +} + func TestConfig_ParserInterfaceNewFormat(t *testing.T) { formats := []string{ "collectd", @@ -1341,6 +1522,47 @@ func (m *MockupOuputPlugin) Write(_ []telegraf.Metric) error { return nil } +/*** Mockup OUTPUT plugin for serializer testing to avoid cyclic dependencies ***/ +type MockupOutputPluginSerializerOld struct { + Serializer serializers.Serializer +} + +func (m *MockupOutputPluginSerializerOld) SetSerializer(s serializers.Serializer) { + m.Serializer = s +} +func (*MockupOutputPluginSerializerOld) Connect() error { + return nil +} +func (*MockupOutputPluginSerializerOld) Close() error { + return nil +} +func (*MockupOutputPluginSerializerOld) SampleConfig() string { + return "Mockup test output plugin" +} +func (*MockupOutputPluginSerializerOld) Write(_ []telegraf.Metric) error { + return nil +} + +type MockupOutputPluginSerializerNew struct { + Serializer telegraf.Serializer +} + +func (m *MockupOutputPluginSerializerNew) SetSerializer(s telegraf.Serializer) { + m.Serializer = s +} +func (*MockupOutputPluginSerializerNew) Connect() error { + return nil +} +func (*MockupOutputPluginSerializerNew) Close() error { + return nil +} +func (*MockupOutputPluginSerializerNew) SampleConfig() string { + return "Mockup test output plugin" +} +func (*MockupOutputPluginSerializerNew) Write(_ []telegraf.Metric) error { + return nil +} + /*** Mockup INPUT plugin with state for testing to avoid cyclic dependencies ***/ type MockupState struct { Name string @@ -1457,4 +1679,10 @@ func init() { outputs.Add("http", func() telegraf.Output { return &MockupOuputPlugin{} }) + outputs.Add("serializer_test_new", func() telegraf.Output { + return &MockupOutputPluginSerializerNew{} + }) + outputs.Add("serializer_test_old", func() telegraf.Output { + return &MockupOutputPluginSerializerOld{} + }) } diff --git a/config/testdata/serializers_new.toml b/config/testdata/serializers_new.toml new file mode 100644 index 000000000..d11139e7a --- /dev/null +++ b/config/testdata/serializers_new.toml @@ -0,0 +1,32 @@ +[[outputs.serializer_test_new]] + data_format = "carbon2" + +[[outputs.serializer_test_new]] + data_format = "csv" + +[[outputs.serializer_test_new]] + data_format = "graphite" + +[[outputs.serializer_test_new]] + data_format = "influx" + +[[outputs.serializer_test_new]] + data_format = "json" + +[[outputs.serializer_test_new]] + data_format = "msgpack" + +[[outputs.serializer_test_new]] + data_format = "nowmetric" + +[[outputs.serializer_test_new]] + data_format = "prometheus" + +[[outputs.serializer_test_new]] + data_format = "prometheusremotewrite" + +[[outputs.serializer_test_new]] + data_format = "splunkmetric" + +[[outputs.serializer_test_new]] + data_format = "wavefront" diff --git a/config/testdata/serializers_old.toml b/config/testdata/serializers_old.toml new file mode 100644 index 000000000..6b556ca09 --- /dev/null +++ b/config/testdata/serializers_old.toml @@ -0,0 +1,32 @@ +[[outputs.serializer_test_old]] + data_format = "carbon2" + +[[outputs.serializer_test_old]] + data_format = "csv" + +[[outputs.serializer_test_old]] + data_format = "graphite" + +[[outputs.serializer_test_old]] + data_format = "influx" + +[[outputs.serializer_test_old]] + data_format = "json" + +[[outputs.serializer_test_old]] + data_format = "msgpack" + +[[outputs.serializer_test_old]] + data_format = "nowmetric" + +[[outputs.serializer_test_old]] + data_format = "prometheus" + +[[outputs.serializer_test_old]] + data_format = "prometheusremotewrite" + +[[outputs.serializer_test_old]] + data_format = "splunkmetric" + +[[outputs.serializer_test_old]] + data_format = "wavefront" diff --git a/models/running_serializer.go b/models/running_serializer.go new file mode 100644 index 000000000..580ab21a8 --- /dev/null +++ b/models/running_serializer.go @@ -0,0 +1,102 @@ +package models + +import ( + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/selfstat" +) + +// SerializerConfig is the common config for all serializers. +type SerializerConfig struct { + Parent string + Alias string + DataFormat string + DefaultTags map[string]string +} + +type RunningSerializer struct { + Serializer serializers.Serializer + Config *SerializerConfig + log telegraf.Logger + + MetricsSerialized selfstat.Stat + BytesSerialized selfstat.Stat + SerializationTime selfstat.Stat +} + +func NewRunningSerializer(serializer serializers.Serializer, config *SerializerConfig) *RunningSerializer { + tags := map[string]string{"type": config.DataFormat} + if config.Alias != "" { + tags["alias"] = config.Alias + } + + serializerErrorsRegister := selfstat.Register("serializer", "errors", tags) + logger := NewLogger("serializers", config.DataFormat+"::"+config.Parent, config.Alias) + logger.OnErr(func() { + serializerErrorsRegister.Incr(1) + }) + SetLoggerOnPlugin(serializer, logger) + + return &RunningSerializer{ + Serializer: serializer, + Config: config, + MetricsSerialized: selfstat.Register( + "serializer", + "metrics_serialized", + tags, + ), + BytesSerialized: selfstat.Register( + "serializer", + "bytes_serialized", + tags, + ), + SerializationTime: selfstat.Register( + "serializer", + "serialization_time_ns", + tags, + ), + log: logger, + } +} + +func (r *RunningSerializer) LogName() string { + return logName("parsers", r.Config.DataFormat+"::"+r.Config.Parent, r.Config.Alias) +} + +func (r *RunningSerializer) Init() error { + if p, ok := r.Serializer.(telegraf.Initializer); ok { + err := p.Init() + if err != nil { + return err + } + } + return nil +} + +func (r *RunningSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { + start := time.Now() + buf, err := r.Serializer.Serialize(metric) + elapsed := time.Since(start) + r.SerializationTime.Incr(elapsed.Nanoseconds()) + r.MetricsSerialized.Incr(1) + r.BytesSerialized.Incr(int64(len(buf))) + + return buf, err +} + +func (r *RunningSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + start := time.Now() + buf, err := r.Serializer.SerializeBatch(metrics) + elapsed := time.Since(start) + r.SerializationTime.Incr(elapsed.Nanoseconds()) + r.MetricsSerialized.Incr(int64(len(metrics))) + r.BytesSerialized.Incr(int64(len(buf))) + + return buf, err +} + +func (r *RunningSerializer) Log() telegraf.Logger { + return r.log +} diff --git a/plugins/common/shim/goshim.go b/plugins/common/shim/goshim.go index f70a2243e..2205bdce7 100644 --- a/plugins/common/shim/goshim.go +++ b/plugins/common/shim/goshim.go @@ -101,7 +101,10 @@ func hasQuit(ctx context.Context) bool { } func (s *Shim) writeProcessedMetrics() error { - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + if err := serializer.Init(); err != nil { + return fmt.Errorf("creating serializer failed: %w", err) + } for { //nolint:gosimple // for-select used on purpose select { case m, open := <-s.metricCh: diff --git a/plugins/common/shim/output_test.go b/plugins/common/shim/output_test.go index d4f38e167..4340ecf43 100644 --- a/plugins/common/shim/output_test.go +++ b/plugins/common/shim/output_test.go @@ -10,7 +10,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -33,7 +33,8 @@ func TestOutputShim(t *testing.T) { wg.Done() }() - serializer := serializers.NewInfluxSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) m := metric.New("thing", map[string]string{ diff --git a/plugins/common/shim/processor_test.go b/plugins/common/shim/processor_test.go index aafcaf7c0..78a74fca4 100644 --- a/plugins/common/shim/processor_test.go +++ b/plugins/common/shim/processor_test.go @@ -13,7 +13,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/influx" - "github.com/influxdata/telegraf/plugins/serializers" + influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" ) func TestProcessorShim(t *testing.T) { @@ -52,7 +52,9 @@ func testSendAndReceive(t *testing.T, fieldKey string, fieldValue string) { wg.Done() }() - serializer := serializers.NewInfluxSerializer() + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) + parser := influx.Parser{} require.NoError(t, parser.Init()) diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index bec8529bb..9871d8f78 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -18,7 +18,7 @@ import ( "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/prometheus" - "github.com/influxdata/telegraf/plugins/serializers" + influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -206,9 +206,12 @@ func TestMain(m *testing.M) { func runCounterProgram() error { envMetricName := os.Getenv("METRIC_NAME") - i := 0 - serializer := serializers.NewInfluxSerializer() + serializer := &influxSerializer.Serializer{} + if err := serializer.Init(); err != nil { + return err + } + i := 0 scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { m := metric.New(envMetricName, diff --git a/plugins/inputs/execd/shim/goshim.go b/plugins/inputs/execd/shim/goshim.go index fb6278911..3ec28edca 100644 --- a/plugins/inputs/execd/shim/goshim.go +++ b/plugins/inputs/execd/shim/goshim.go @@ -104,7 +104,10 @@ func (s *Shim) Run(pollInterval time.Duration) error { collectMetricsPrompt := make(chan os.Signal, 1) listenForCollectMetricsSignals(ctx, collectMetricsPrompt) - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + if err := serializer.Init(); err != nil { + return fmt.Errorf("creating serializer failed: %w", err) + } for _, input := range s.Inputs { wrappedInput := inputShim{Input: input} diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 14f3e4da3..31b9ceab1 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -21,7 +21,7 @@ import ( kafkaOutput "github.com/influxdata/telegraf/plugins/outputs/kafka" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/value" - "github.com/influxdata/telegraf/plugins/serializers" + influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -540,7 +540,8 @@ func TestKafkaRoundTripIntegration(t *testing.T) { output, ok := creator().(*kafkaOutput.Kafka) require.True(t, ok) - s := serializers.NewInfluxSerializer() + s := &influxSerializer.Serializer{} + require.NoError(t, s.Init()) output.SetSerializer(s) output.Brokers = brokers output.Topic = topic diff --git a/plugins/outputs/cloud_pubsub/topic_stubbed.go b/plugins/outputs/cloud_pubsub/topic_stubbed.go index 4deb79232..2fde64271 100644 --- a/plugins/outputs/cloud_pubsub/topic_stubbed.go +++ b/plugins/outputs/cloud_pubsub/topic_stubbed.go @@ -18,7 +18,7 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/influx" - "github.com/influxdata/telegraf/plugins/serializers" + serializer "github.com/influxdata/telegraf/plugins/serializers/influx" ) const ( @@ -62,7 +62,9 @@ type ( ) func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []testMetric) (*PubSub, *stubTopic, []telegraf.Metric) { - s := serializers.NewInfluxSerializer() + // Instantiate a Influx line-protocol serializer + s := &serializer.Serializer{} + _ = s.Init() // We can ignore the error as the Init will never fail metrics := make([]telegraf.Metric, 0, len(testM)) t := &stubTopic{ diff --git a/plugins/outputs/exec/exec_test.go b/plugins/outputs/exec/exec_test.go index a153cff8b..688bac4f9 100644 --- a/plugins/outputs/exec/exec_test.go +++ b/plugins/outputs/exec/exec_test.go @@ -10,7 +10,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -57,7 +57,8 @@ func TestExec(t *testing.T) { runner: &CommandRunner{}, } - s := serializers.NewInfluxSerializer() + s := &influx.Serializer{} + require.NoError(t, s.Init()) e.SetSerializer(s) require.NoError(t, e.Connect()) diff --git a/plugins/outputs/execd/execd_test.go b/plugins/outputs/execd/execd_test.go index 7c7680b44..e501ccf31 100644 --- a/plugins/outputs/execd/execd_test.go +++ b/plugins/outputs/execd/execd_test.go @@ -18,14 +18,15 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/influx" - "github.com/influxdata/telegraf/plugins/serializers" + influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) var now = time.Date(2020, 6, 30, 16, 16, 0, 0, time.UTC) func TestExternalOutputWorks(t *testing.T) { - influxSerializer := serializers.NewInfluxSerializer() + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) exe, err := os.Executable() require.NoError(t, err) @@ -34,7 +35,7 @@ func TestExternalOutputWorks(t *testing.T) { Command: []string{exe, "-testoutput"}, Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"}, RestartDelay: config.Duration(5 * time.Second), - serializer: influxSerializer, + serializer: serializer, Log: testutil.Logger{}, } @@ -71,7 +72,8 @@ func TestExternalOutputWorks(t *testing.T) { } func TestPartiallyUnserializableThrowError(t *testing.T) { - influxSerializer := serializers.NewInfluxSerializer() + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) exe, err := os.Executable() require.NoError(t, err) @@ -81,7 +83,7 @@ func TestPartiallyUnserializableThrowError(t *testing.T) { Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"}, RestartDelay: config.Duration(5 * time.Second), IgnoreSerializationError: false, - serializer: influxSerializer, + serializer: serializer, Log: testutil.Logger{}, } @@ -107,7 +109,8 @@ func TestPartiallyUnserializableThrowError(t *testing.T) { } func TestPartiallyUnserializableCanBeSkipped(t *testing.T) { - influxSerializer := serializers.NewInfluxSerializer() + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) exe, err := os.Executable() require.NoError(t, err) @@ -117,7 +120,7 @@ func TestPartiallyUnserializableCanBeSkipped(t *testing.T) { Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"}, RestartDelay: config.Duration(5 * time.Second), IgnoreSerializationError: true, - serializer: influxSerializer, + serializer: serializer, Log: testutil.Logger{}, } diff --git a/plugins/outputs/file/file_test.go b/plugins/outputs/file/file_test.go index 946f6eebd..137d90910 100644 --- a/plugins/outputs/file/file_test.go +++ b/plugins/outputs/file/file_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -21,7 +21,10 @@ const ( func TestFileExistingFile(t *testing.T) { fh := createFile(t) - s := serializers.NewInfluxSerializer() + + s := &influx.Serializer{} + require.NoError(t, s.Init()) + f := File{ Files: []string{fh.Name()}, serializer: s, @@ -40,7 +43,9 @@ func TestFileExistingFile(t *testing.T) { } func TestFileNewFile(t *testing.T) { - s := serializers.NewInfluxSerializer() + s := &influx.Serializer{} + require.NoError(t, s.Init()) + fh := tmpFile(t) f := File{ Files: []string{fh}, @@ -64,7 +69,9 @@ func TestFileExistingFiles(t *testing.T) { fh2 := createFile(t) fh3 := createFile(t) - s := serializers.NewInfluxSerializer() + s := &influx.Serializer{} + require.NoError(t, s.Init()) + f := File{ Files: []string{fh1.Name(), fh2.Name(), fh3.Name()}, serializer: s, @@ -85,7 +92,9 @@ func TestFileExistingFiles(t *testing.T) { } func TestFileNewFiles(t *testing.T) { - s := serializers.NewInfluxSerializer() + s := &influx.Serializer{} + require.NoError(t, s.Init()) + fh1 := tmpFile(t) fh2 := tmpFile(t) fh3 := tmpFile(t) @@ -112,7 +121,9 @@ func TestFileBoth(t *testing.T) { fh1 := createFile(t) fh2 := tmpFile(t) - s := serializers.NewInfluxSerializer() + s := &influx.Serializer{} + require.NoError(t, s.Init()) + f := File{ Files: []string{fh1.Name(), fh2}, serializer: s, @@ -137,7 +148,9 @@ func TestFileStdout(t *testing.T) { r, w, _ := os.Pipe() os.Stdout = w - s := serializers.NewInfluxSerializer() + s := &influx.Serializer{} + require.NoError(t, s.Init()) + f := File{ Files: []string{"stdout"}, serializer: s, diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go index 16aef4c8d..7dcd41900 100644 --- a/plugins/outputs/http/http_test.go +++ b/plugins/outputs/http/http_test.go @@ -112,7 +112,8 @@ func TestMethod(t *testing.T) { w.WriteHeader(http.StatusOK) }) - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) tt.plugin.SetSerializer(serializer) err = tt.plugin.Connect() if tt.connectError { @@ -175,7 +176,8 @@ func TestHTTPClientConfig(t *testing.T) { w.WriteHeader(http.StatusOK) }) - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) tt.plugin.SetSerializer(serializer) err = tt.plugin.Connect() if tt.connectError { @@ -267,7 +269,8 @@ func TestStatusCode(t *testing.T) { w.WriteHeader(tt.statusCode) }) - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) tt.plugin.SetSerializer(serializer) err = tt.plugin.Connect() require.NoError(t, err) @@ -316,7 +319,8 @@ func TestContentType(t *testing.T) { w.WriteHeader(http.StatusOK) }) - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) tt.plugin.SetSerializer(serializer) err = tt.plugin.Connect() require.NoError(t, err) @@ -376,7 +380,8 @@ func TestContentEncodingGzip(t *testing.T) { w.WriteHeader(http.StatusNoContent) }) - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) tt.plugin.SetSerializer(serializer) err = tt.plugin.Connect() require.NoError(t, err) @@ -431,7 +436,8 @@ func TestBasicAuth(t *testing.T) { w.WriteHeader(http.StatusOK) }) - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) plugin.SetSerializer(serializer) require.NoError(t, plugin.Connect()) require.NoError(t, plugin.Write([]telegraf.Metric{getMetric()})) @@ -506,7 +512,8 @@ func TestOAuthClientCredentialsGrant(t *testing.T) { } }) - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) tt.plugin.SetSerializer(serializer) err = tt.plugin.Connect() require.NoError(t, err) @@ -604,7 +611,10 @@ func TestOAuthAuthorizationCodeGrant(t *testing.T) { } }) - tt.plugin.SetSerializer(influx.NewSerializer()) + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) + tt.plugin.SetSerializer(serializer) + require.NoError(t, tt.plugin.Connect()) require.NoError(t, tt.plugin.Write([]telegraf.Metric{getMetric()})) require.NoError(t, err) @@ -630,7 +640,8 @@ func TestDefaultUserAgent(t *testing.T) { Method: defaultMethod, } - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) client.SetSerializer(serializer) err = client.Connect() require.NoError(t, err) @@ -652,10 +663,13 @@ func TestBatchedUnbatched(t *testing.T) { Method: defaultMethod, } + influxSerializer := &influx.Serializer{} + require.NoError(t, influxSerializer.Init()) + jsonSerializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second}) require.NoError(t, err) s := map[string]serializers.Serializer{ - "influx": influx.NewSerializer(), + "influx": influxSerializer, "json": jsonSerializer, } @@ -726,7 +740,8 @@ func TestAwsCredentials(t *testing.T) { tt.handler(t, w, r) }) - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) tt.plugin.SetSerializer(serializer) err = tt.plugin.Connect() require.NoError(t, err) diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index e25d62498..983a33a36 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -155,7 +155,10 @@ func NewHTTPClient(cfg HTTPConfig) (*httpClient, error) { } if cfg.Serializer == nil { - cfg.Serializer = influx.NewSerializer() + cfg.Serializer = &influx.Serializer{} + if err := cfg.Serializer.Init(); err != nil { + return nil, err + } } var transport *http.Transport diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 4897a00bc..9dbf3ac4a 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -169,10 +169,15 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { } func (i *InfluxDB) udpClient(address *url.URL) (Client, error) { + serializer := &influx.Serializer{UintSupport: true} + if err := serializer.Init(); err != nil { + return nil, err + } + udpConfig := &UDPConfig{ URL: address, MaxPayloadSize: int(i.UDPPayload), - Serializer: i.newSerializer(), + Serializer: serializer, Log: i.Log, } @@ -190,6 +195,11 @@ func (i *InfluxDB) httpClient(ctx context.Context, address *url.URL, proxy *url. return nil, err } + serializer := &influx.Serializer{UintSupport: true} + if err := serializer.Init(); err != nil { + return nil, err + } + httpConfig := &HTTPConfig{ URL: address, Timeout: time.Duration(i.Timeout), @@ -208,7 +218,7 @@ func (i *InfluxDB) httpClient(ctx context.Context, address *url.URL, proxy *url. RetentionPolicyTag: i.RetentionPolicyTag, ExcludeRetentionPolicyTag: i.ExcludeRetentionPolicyTag, Consistency: i.WriteConsistency, - Serializer: i.newSerializer(), + Serializer: serializer, Log: i.Log, } @@ -228,15 +238,6 @@ func (i *InfluxDB) httpClient(ctx context.Context, address *url.URL, proxy *url. return c, nil } -func (i *InfluxDB) newSerializer() *influx.Serializer { - serializer := influx.NewSerializer() - if i.InfluxUintSupport { - serializer.SetFieldTypeSupport(influx.UintSupport) - } - - return serializer -} - func init() { outputs.Add("influxdb", func() telegraf.Output { return &InfluxDB{ diff --git a/plugins/outputs/influxdb/udp.go b/plugins/outputs/influxdb/udp.go index 3cd3ebbe8..48c8cab7e 100644 --- a/plugins/outputs/influxdb/udp.go +++ b/plugins/outputs/influxdb/udp.go @@ -46,10 +46,12 @@ func NewUDPClient(config UDPConfig) (*udpClient, error) { serializer := config.Serializer if serializer == nil { - s := influx.NewSerializer() - serializer = s + serializer = &influx.Serializer{UintSupport: true} + if err := serializer.Init(); err != nil { + return nil, err + } } - serializer.SetMaxLineBytes(size) + serializer.MaxLineBytes = size dialer := config.Dialer if dialer == nil { diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 47ae70474..e0ee2ec5f 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -113,7 +113,10 @@ func NewHTTPClient(cfg *HTTPConfig) (*httpClient, error) { serializer := cfg.Serializer if serializer == nil { - serializer = influx.NewSerializer() + serializer = &influx.Serializer{} + if err := serializer.Init(); err != nil { + return nil, err + } } var transport *http.Transport diff --git a/plugins/outputs/influxdb_v2/influxdb_v2.go b/plugins/outputs/influxdb_v2/influxdb_v2.go index 05654d5c3..1a46a586a 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2.go @@ -125,6 +125,11 @@ func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, erro return nil, err } + serializer := &influx.Serializer{UintSupport: true} + if err := serializer.Init(); err != nil { + return nil, err + } + httpConfig := &HTTPConfig{ URL: address, Token: i.Token, @@ -138,7 +143,7 @@ func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, erro UserAgent: i.UserAgent, ContentEncoding: i.ContentEncoding, TLSConfig: tlsConfig, - Serializer: i.newSerializer(), + Serializer: serializer, Log: i.Log, } @@ -150,15 +155,6 @@ func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, erro return c, nil } -func (i *InfluxDB) newSerializer() *influx.Serializer { - serializer := influx.NewSerializer() - if i.UintSupport { - serializer.SetFieldTypeSupport(influx.UintSupport) - } - - return serializer -} - func init() { outputs.Add("influxdb_v2", func() telegraf.Output { return &InfluxDB{ diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index 822f320bb..821153db5 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -13,7 +13,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -72,7 +72,9 @@ func TestConnectAndWriteIntegration(t *testing.T) { fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]), } - s := serializers.NewInfluxSerializer() + s := &influx.Serializer{} + require.NoError(t, s.Init()) + k := &Kafka{ Brokers: brokers, Topic: "Test", @@ -330,7 +332,8 @@ func TestTopicTag(t *testing.T) { t.Run(tt.name, func(t *testing.T) { tt.plugin.Log = testutil.Logger{} - s := serializers.NewInfluxSerializer() + s := &influx.Serializer{} + require.NoError(t, s.Init()) tt.plugin.SetSerializer(s) err := tt.plugin.Connect() diff --git a/plugins/outputs/kinesis/kinesis_test.go b/plugins/outputs/kinesis/kinesis_test.go index 2dc8f4a6d..7c403abab 100644 --- a/plugins/outputs/kinesis/kinesis_test.go +++ b/plugins/outputs/kinesis/kinesis_test.go @@ -209,7 +209,9 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) { } func TestWrite_NoMetrics(t *testing.T) { - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) + svc := &mockKinesisPutRecords{} k := KinesisOutput{ @@ -230,7 +232,8 @@ func TestWrite_NoMetrics(t *testing.T) { } func TestWrite_SingleMetric(t *testing.T) { - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(1, 0) @@ -264,7 +267,8 @@ func TestWrite_SingleMetric(t *testing.T) { } func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) { - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(3, 0) @@ -295,7 +299,8 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) { } func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) { - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(maxRecordsPerRequest, 0) @@ -326,7 +331,8 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) { } func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) { - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(maxRecordsPerRequest, 0) @@ -364,7 +370,8 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) { } func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) { - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(maxRecordsPerRequest, 0) @@ -402,7 +409,8 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) { } func TestWrite_SerializerError(t *testing.T) { - serializer := influx.NewSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(2, 0) diff --git a/plugins/outputs/mqtt/mqtt_test.go b/plugins/outputs/mqtt/mqtt_test.go index 275fdbf4a..92f4a3b32 100644 --- a/plugins/outputs/mqtt/mqtt_test.go +++ b/plugins/outputs/mqtt/mqtt_test.go @@ -17,7 +17,7 @@ import ( "github.com/influxdata/telegraf/plugins/common/mqtt" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/value" - "github.com/influxdata/telegraf/plugins/serializers" + influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -49,7 +49,8 @@ func TestConnectAndWriteIntegration(t *testing.T) { container := launchTestContainer(t) defer container.Terminate() var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) - s := serializers.NewInfluxSerializer() + s := &influxSerializer.Serializer{} + require.NoError(t, s.Init()) m := &MQTT{ MqttConfig: mqtt.MqttConfig{ Servers: []string{url}, @@ -79,7 +80,9 @@ func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) { defer container.Terminate() var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) - s := serializers.NewInfluxSerializer() + s := &influxSerializer.Serializer{} + require.NoError(t, s.Init()) + m := &MQTT{ MqttConfig: mqtt.MqttConfig{ Servers: []string{url}, @@ -109,7 +112,10 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) { container := launchTestContainer(t) defer container.Terminate() - var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) + url := fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) + s := &influxSerializer.Serializer{} + require.NoError(t, s.Init()) + m := &MQTT{ MqttConfig: mqtt.MqttConfig{ Servers: []string{url}, @@ -117,7 +123,7 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) { KeepAlive: 30, Timeout: config.Duration(5 * time.Second), }, - serializer: serializers.NewInfluxSerializer(), + serializer: s, Log: testutil.Logger{Name: "mqttv5-integration-test"}, } @@ -151,7 +157,8 @@ func TestIntegrationMQTTv3(t *testing.T) { // Setup the parser / serializer pair parser := &influx.Parser{} require.NoError(t, parser.Init()) - serializer := serializers.NewInfluxSerializer() + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) // Setup the plugin url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) @@ -265,7 +272,9 @@ func TestMQTTv5Properties(t *testing.T) { } // Setup the metric serializer - serializer := serializers.NewInfluxSerializer() + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) + plugin.SetSerializer(serializer) // Verify that we can connect to the MQTT broker @@ -300,7 +309,8 @@ func TestIntegrationMQTTLayoutNonBatch(t *testing.T) { // Setup the parser / serializer pair parser := &influx.Parser{} require.NoError(t, parser.Init()) - serializer := serializers.NewInfluxSerializer() + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) // Setup the plugin url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) @@ -386,7 +396,8 @@ func TestIntegrationMQTTLayoutBatch(t *testing.T) { // Setup the parser / serializer pair parser := &influx.Parser{} require.NoError(t, parser.Init()) - serializer := serializers.NewInfluxSerializer() + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) // Setup the plugin url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) @@ -858,7 +869,9 @@ func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) { } func TestGenerateTopicName(t *testing.T) { - s := serializers.NewInfluxSerializer() + s := &influxSerializer.Serializer{} + require.NoError(t, s.Init()) + m := &MQTT{ MqttConfig: mqtt.MqttConfig{ Servers: []string{"tcp://localhost:1883"}, diff --git a/plugins/outputs/nats/nats_test.go b/plugins/outputs/nats/nats_test.go index 2d860b016..85b8caaf7 100644 --- a/plugins/outputs/nats/nats_test.go +++ b/plugins/outputs/nats/nats_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" - "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -27,12 +27,13 @@ func TestConnectAndWriteIntegration(t *testing.T) { defer container.Terminate() server := []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports[servicePort])} - s := serializers.NewInfluxSerializer() + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) n := &NATS{ Servers: server, Name: "telegraf", Subject: "telegraf", - serializer: s, + serializer: serializer, } // Verify that we can connect to the NATS daemon diff --git a/plugins/outputs/nsq/nsq_test.go b/plugins/outputs/nsq/nsq_test.go index 89ec90847..afad5469b 100644 --- a/plugins/outputs/nsq/nsq_test.go +++ b/plugins/outputs/nsq/nsq_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" - "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -29,7 +29,8 @@ func TestConnectAndWriteIntegration(t *testing.T) { defer container.Terminate() server := []string{fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])} - s := serializers.NewInfluxSerializer() + s := &influx.Serializer{} + require.NoError(t, s.Init()) n := &NSQ{ Server: server[0], Topic: "telegraf", diff --git a/plugins/outputs/socket_writer/socket_writer.go b/plugins/outputs/socket_writer/socket_writer.go index f8f3fbb2e..b45fe685d 100644 --- a/plugins/outputs/socket_writer/socket_writer.go +++ b/plugins/outputs/socket_writer/socket_writer.go @@ -144,13 +144,8 @@ func (sw *SocketWriter) Close() error { return err } -func newSocketWriter() *SocketWriter { - s := serializers.NewInfluxSerializer() - return &SocketWriter{ - Serializer: s, - } -} - func init() { - outputs.Add("socket_writer", func() telegraf.Output { return newSocketWriter() }) + outputs.Add("socket_writer", func() telegraf.Output { + return &SocketWriter{} + }) } diff --git a/plugins/outputs/socket_writer/socket_writer_test.go b/plugins/outputs/socket_writer/socket_writer_test.go index a10f71489..c69359a05 100644 --- a/plugins/outputs/socket_writer/socket_writer_test.go +++ b/plugins/outputs/socket_writer/socket_writer_test.go @@ -10,18 +10,25 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) +func newSocketWriter(addr string) *SocketWriter { + serializer := &influx.Serializer{} + _ = serializer.Init() + return &SocketWriter{ + Address: addr, + Serializer: serializer, + } +} + func TestSocketWriter_tcp(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - sw := newSocketWriter() - sw.Address = "tcp://" + listener.Addr().String() - - err = sw.Connect() - require.NoError(t, err) + sw := newSocketWriter("tcp://" + listener.Addr().String()) + require.NoError(t, sw.Connect()) lconn, err := listener.Accept() require.NoError(t, err) @@ -33,11 +40,8 @@ func TestSocketWriter_udp(t *testing.T) { listener, err := net.ListenPacket("udp", "127.0.0.1:0") require.NoError(t, err) - sw := newSocketWriter() - sw.Address = "udp://" + listener.LocalAddr().String() - - err = sw.Connect() - require.NoError(t, err) + sw := newSocketWriter("udp://" + listener.LocalAddr().String()) + require.NoError(t, sw.Connect()) testSocketWriterPacket(t, sw, listener) } @@ -48,11 +52,8 @@ func TestSocketWriter_unix(t *testing.T) { listener, err := net.Listen("unix", sock) require.NoError(t, err) - sw := newSocketWriter() - sw.Address = "unix://" + sock - - err = sw.Connect() - require.NoError(t, err) + sw := newSocketWriter("unix://" + sock) + require.NoError(t, sw.Connect()) lconn, err := listener.Accept() require.NoError(t, err) @@ -70,11 +71,8 @@ func TestSocketWriter_unixgram(t *testing.T) { listener, err := net.ListenPacket("unixgram", sock) require.NoError(t, err) - sw := newSocketWriter() - sw.Address = "unixgram://" + sock - - err = sw.Connect() - require.NoError(t, err) + sw := newSocketWriter("unixgram://" + sock) + require.NoError(t, sw.Connect()) testSocketWriterPacket(t, sw, listener) } @@ -132,13 +130,9 @@ func TestSocketWriter_Write_err(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - sw := newSocketWriter() - sw.Address = "tcp://" + listener.Addr().String() - - err = sw.Connect() - require.NoError(t, err) - err = sw.Conn.(*net.TCPConn).SetReadBuffer(256) - require.NoError(t, err) + sw := newSocketWriter("tcp://" + listener.Addr().String()) + require.NoError(t, sw.Connect()) + require.NoError(t, sw.Conn.(*net.TCPConn).SetReadBuffer(256)) lconn, err := listener.Accept() require.NoError(t, err) @@ -163,13 +157,9 @@ func TestSocketWriter_Write_reconnect(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - sw := newSocketWriter() - sw.Address = "tcp://" + listener.Addr().String() - - err = sw.Connect() - require.NoError(t, err) - err = sw.Conn.(*net.TCPConn).SetReadBuffer(256) - require.NoError(t, err) + sw := newSocketWriter("tcp://" + listener.Addr().String()) + require.NoError(t, sw.Connect()) + require.NoError(t, sw.Conn.(*net.TCPConn).SetReadBuffer(256)) lconn, err := listener.Accept() require.NoError(t, err) @@ -206,12 +196,9 @@ func TestSocketWriter_udp_gzip(t *testing.T) { listener, err := net.ListenPacket("udp", "127.0.0.1:0") require.NoError(t, err) - sw := newSocketWriter() - sw.Address = "udp://" + listener.LocalAddr().String() + sw := newSocketWriter("udp://" + listener.LocalAddr().String()) sw.ContentEncoding = "gzip" - - err = sw.Connect() - require.NoError(t, err) + require.NoError(t, sw.Connect()) testSocketWriterPacket(t, sw, listener) } diff --git a/plugins/processors/execd/README.md b/plugins/processors/execd/README.md index b8185d68e..63010abe2 100644 --- a/plugins/processors/execd/README.md +++ b/plugins/processors/execd/README.md @@ -65,12 +65,16 @@ import ( "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/influx" - "github.com/influxdata/telegraf/plugins/serializers" + influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" ) func main() { parser := influx.NewStreamParser(os.Stdin) - serializer, _ := serializers.NewInfluxSerializer() + serializer := influxSerializer.Serializer{} + if err := serializer.Init(); err != nil { + fmt.Fprintf(os.Stderr, "serializer init failed: %v\n", err) + os.Exit(1) + } for { metric, err := parser.Next() diff --git a/plugins/processors/execd/execd_test.go b/plugins/processors/execd/execd_test.go index 7d746c21c..339bd9146 100644 --- a/plugins/processors/execd/execd_test.go +++ b/plugins/processors/execd/execd_test.go @@ -13,7 +13,7 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/influx" - "github.com/influxdata/telegraf/plugins/serializers" + influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -152,7 +152,8 @@ func TestMain(m *testing.M) { func runCountMultiplierProgram() { fieldName := os.Getenv("FIELD_NAME") parser := influx.NewStreamParser(os.Stdin) - serializer := serializers.NewInfluxSerializer() + serializer := &influxSerializer.Serializer{} + _ = serializer.Init() // this should always succeed for { m, err := parser.Next() diff --git a/plugins/processors/printer/printer.go b/plugins/processors/printer/printer.go index 6165f79a7..8aec8faca 100644 --- a/plugins/processors/printer/printer.go +++ b/plugins/processors/printer/printer.go @@ -7,7 +7,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/processors" - "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers/influx" ) @@ -15,13 +14,18 @@ import ( var sampleConfig string type Printer struct { - serializer serializers.Serializer + serializer *influx.Serializer } func (*Printer) SampleConfig() string { return sampleConfig } +func (p *Printer) Init() error { + p.serializer = &influx.Serializer{} + return p.serializer.Init() +} + func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric { for _, metric := range in { octets, err := p.serializer.Serialize(metric) @@ -35,8 +39,6 @@ func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric { func init() { processors.Add("printer", func() telegraf.Processor { - return &Printer{ - serializer: influx.NewSerializer(), - } + return &Printer{} }) } diff --git a/plugins/serializers/all/all.go b/plugins/serializers/all/all.go new file mode 100644 index 000000000..1a6c64721 --- /dev/null +++ b/plugins/serializers/all/all.go @@ -0,0 +1 @@ +package all diff --git a/plugins/serializers/all/influx.go b/plugins/serializers/all/influx.go new file mode 100644 index 000000000..000ae6b03 --- /dev/null +++ b/plugins/serializers/all/influx.go @@ -0,0 +1,7 @@ +//go:build !custom || serializers || serializers.influx + +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/serializers/influx" // register plugin +) diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index d5e8e759b..f2b290e5a 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -12,24 +12,11 @@ import ( "strings" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers" ) -const MaxInt64 = int64(^uint64(0) >> 1) - -type FieldSortOrder int - const ( - NoSortFields FieldSortOrder = iota - SortFields -) - -type FieldTypeSupport int - -const ( - UintSupport FieldTypeSupport = 1 << iota -) - -var ( + MaxInt64 = int64(^uint64(0) >> 1) NeedMoreSpace = "need more space" InvalidName = "invalid name" NoFields = "no serializable fields" @@ -59,10 +46,11 @@ func (e FieldError) Error() string { // Serializer is a serializer for line protocol. type Serializer struct { - maxLineBytes int - bytesWritten int - fieldSortOrder FieldSortOrder - fieldTypeSupport FieldTypeSupport + MaxLineBytes int `toml:"influx_max_line_bytes"` + SortFields bool `toml:"influx_sort_fields"` + UintSupport bool `toml:"influx_uint_support"` + + bytesWritten int buf bytes.Buffer header []byte @@ -70,27 +58,12 @@ type Serializer struct { pair []byte } -func NewSerializer() *Serializer { - serializer := &Serializer{ - fieldSortOrder: NoSortFields, +func (s *Serializer) Init() error { + s.header = make([]byte, 0, 50) + s.footer = make([]byte, 0, 21) + s.pair = make([]byte, 0, 50) - header: make([]byte, 0, 50), - footer: make([]byte, 0, 21), - pair: make([]byte, 0, 50), - } - return serializer -} - -func (s *Serializer) SetMaxLineBytes(maxLineBytes int) { - s.maxLineBytes = maxLineBytes -} - -func (s *Serializer) SetFieldSortOrder(order FieldSortOrder) { - s.fieldSortOrder = order -} - -func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) { - s.fieldTypeSupport = typeSupport + return nil } // Serialize writes the telegraf.Metric to a byte slice. May produce multiple @@ -215,7 +188,7 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { s.buildFooter(m) - if s.fieldSortOrder == SortFields { + if s.SortFields { sort.Slice(m.FieldList(), func(i, j int) bool { return m.FieldList()[i].Key < m.FieldList()[j].Key }) @@ -239,7 +212,7 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { bytesNeeded++ } - if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes { + if s.MaxLineBytes > 0 && bytesNeeded > s.MaxLineBytes { // Need at least one field per line, this metric cannot be fit // into the max line bytes. if firstField { @@ -255,7 +228,7 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { firstField = true bytesNeeded = len(s.header) + len(s.pair) + len(s.footer) - if bytesNeeded > s.maxLineBytes { + if bytesNeeded > s.MaxLineBytes { return s.newMetricError(NeedMoreSpace) } } @@ -299,7 +272,7 @@ func (s *Serializer) newMetricError(reason string) *MetricError { func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) { switch v := value.(type) { case uint64: - if s.fieldTypeSupport&UintSupport != 0 { + if s.UintSupport { return appendUintField(buf, v), nil } if v <= uint64(MaxInt64) { @@ -349,3 +322,20 @@ func appendStringField(buf []byte, value string) []byte { buf = append(buf, '"') return buf } + +func init() { + serializers.Add("influx", + func() serializers.Serializer { + return &Serializer{} + }, + ) +} + +// InitFromConfig is a compatibility function to construct the parser the old way +func (s *Serializer) InitFromConfig(cfg *serializers.Config) error { + s.MaxLineBytes = cfg.InfluxMaxLineBytes + s.SortFields = cfg.InfluxSortFields + s.UintSupport = cfg.InfluxUintSupport + + return nil +} diff --git a/plugins/serializers/influx/influx_test.go b/plugins/serializers/influx/influx_test.go index 7aecda607..7bf3e60f8 100644 --- a/plugins/serializers/influx/influx_test.go +++ b/plugins/serializers/influx/influx_test.go @@ -14,7 +14,7 @@ import ( var tests = []struct { name string maxBytes int - typeSupport FieldTypeSupport + uintSupport bool input telegraf.Metric output []byte errReason string @@ -132,7 +132,7 @@ var tests = []struct { time.Unix(0, 0), ), output: []byte("cpu value=42u 0\n"), - typeSupport: UintSupport, + uintSupport: true, }, { name: "uint field max value", @@ -145,7 +145,7 @@ var tests = []struct { time.Unix(0, 0), ), output: []byte("cpu value=18446744073709551615u 0\n"), - typeSupport: UintSupport, + uintSupport: true, }, { name: "uint field no uint support", @@ -481,10 +481,11 @@ var tests = []struct { func TestSerializer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - serializer := NewSerializer() - serializer.SetMaxLineBytes(tt.maxBytes) - serializer.SetFieldSortOrder(SortFields) - serializer.SetFieldTypeSupport(tt.typeSupport) + serializer := &Serializer{ + MaxLineBytes: tt.maxBytes, + SortFields: true, + UintSupport: tt.uintSupport, + } output, err := serializer.Serialize(tt.input) if tt.errReason != "" { require.Error(t, err) @@ -498,9 +499,10 @@ func TestSerializer(t *testing.T) { func BenchmarkSerializer(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - serializer := NewSerializer() - serializer.SetMaxLineBytes(tt.maxBytes) - serializer.SetFieldTypeSupport(tt.typeSupport) + serializer := &Serializer{ + MaxLineBytes: tt.maxBytes, + UintSupport: tt.uintSupport, + } for n := 0; n < b.N; n++ { output, err := serializer.Serialize(tt.input) _ = err @@ -522,8 +524,9 @@ func TestSerialize_SerializeBatch(t *testing.T) { metrics := []telegraf.Metric{m, m} - serializer := NewSerializer() - serializer.SetFieldSortOrder(SortFields) + serializer := &Serializer{ + SortFields: true, + } output, err := serializer.SerializeBatch(metrics) require.NoError(t, err) require.Equal(t, []byte("cpu value=42 0\ncpu value=42 0\n"), output) diff --git a/plugins/serializers/influx/reader.go b/plugins/serializers/influx/reader.go index c506afde7..830990103 100644 --- a/plugins/serializers/influx/reader.go +++ b/plugins/serializers/influx/reader.go @@ -23,7 +23,7 @@ func NewReader(metrics []telegraf.Metric, serializer *Serializer) io.Reader { metrics: metrics, serializer: serializer, offset: 0, - buf: bytes.NewBuffer(make([]byte, 0, serializer.maxLineBytes)), + buf: bytes.NewBuffer(make([]byte, 0, serializer.MaxLineBytes)), } } diff --git a/plugins/serializers/influx/reader_test.go b/plugins/serializers/influx/reader_test.go index 569e8be98..3a2254774 100644 --- a/plugins/serializers/influx/reader_test.go +++ b/plugins/serializers/influx/reader_test.go @@ -127,9 +127,10 @@ func TestReader(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - serializer := NewSerializer() - serializer.SetMaxLineBytes(tt.maxLineBytes) - serializer.SetFieldSortOrder(SortFields) + serializer := &Serializer{ + MaxLineBytes: tt.maxLineBytes, + SortFields: true, + } reader := NewReader(tt.input, serializer) data := new(bytes.Buffer) @@ -161,8 +162,9 @@ func TestZeroLengthBufferNoError(t *testing.T) { }, time.Unix(0, 0), ) - serializer := NewSerializer() - serializer.SetFieldSortOrder(SortFields) + serializer := &Serializer{ + SortFields: true, + } reader := NewReader([]telegraf.Metric{m}, serializer) readbuf := make([]byte, 0) @@ -243,7 +245,7 @@ func BenchmarkReader(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { readbuf := make([]byte, 4096) - serializer := NewSerializer() + serializer := &Serializer{} reader := NewReader(metrics, serializer) for { _, err := reader.Read(readbuf) diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 9248bcc13..f29e7ee94 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -9,7 +9,6 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/carbon2" "github.com/influxdata/telegraf/plugins/serializers/csv" "github.com/influxdata/telegraf/plugins/serializers/graphite" - "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/plugins/serializers/msgpack" "github.com/influxdata/telegraf/plugins/serializers/nowmetric" @@ -19,6 +18,17 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/wavefront" ) +// Creator is the function to create a new serializer +type Creator func() Serializer + +// Serializers contains the registry of all known serializers (following the new style) +var Serializers = map[string]Creator{} + +// Add adds a serializer to the registry. Usually this function is called in the plugin's init function +func Add(name string, creator Creator) { + Serializers[name] = creator +} + // SerializerOutput is an interface for output plugins that are able to // serialize telegraf metrics into arbitrary data formats. type SerializerOutput interface { @@ -46,6 +56,12 @@ type Serializer interface { SerializeBatch(metrics []telegraf.Metric) ([]byte, error) } +// SerializerCompatibility is an interface for backward-compatible initialization of serializers +type SerializerCompatibility interface { + // InitFromConfig sets the serializers internal variables from the old-style config + InitFromConfig(config *Config) error +} + // Config is a struct that covers the data types needed for all serializer types, // and can be used to instantiate _any_ of the serializers. type Config struct { @@ -153,8 +169,6 @@ func NewSerializer(config *Config) (Serializer, error) { switch config.DataFormat { case "csv": serializer, err = NewCSVSerializer(config) - case "influx": - serializer, err = NewInfluxSerializerConfig(config), nil case "graphite": serializer, err = NewGraphiteSerializer( config.Prefix, @@ -187,7 +201,20 @@ func NewSerializer(config *Config) (Serializer, error) { case "msgpack": serializer, err = NewMsgpackSerializer(), nil default: - err = fmt.Errorf("invalid data format: %s", config.DataFormat) + creator, found := Serializers[config.DataFormat] + if !found { + return nil, fmt.Errorf("invalid data format: %s", config.DataFormat) + } + + // Try to create new-style serializers the old way... + serializer := creator() + p, ok := serializer.(SerializerCompatibility) + if !ok { + return nil, fmt.Errorf("serializer for %q cannot be created the old way", config.DataFormat) + } + err := p.InitFromConfig(config) + + return serializer, err } return serializer, err } @@ -263,28 +290,6 @@ func NewNowSerializer() (Serializer, error) { return nowmetric.NewSerializer() } -func NewInfluxSerializerConfig(config *Config) Serializer { - var sort influx.FieldSortOrder - if config.InfluxSortFields { - sort = influx.SortFields - } - - var typeSupport influx.FieldTypeSupport - if config.InfluxUintSupport { - typeSupport = typeSupport + influx.UintSupport - } - - s := influx.NewSerializer() - s.SetMaxLineBytes(config.InfluxMaxLineBytes) - s.SetFieldSortOrder(sort) - s.SetFieldTypeSupport(typeSupport) - return s -} - -func NewInfluxSerializer() Serializer { - return influx.NewSerializer() -} - //nolint:revive //argument-limit conditionally more arguments allowed func NewGraphiteSerializer( prefix, diff --git a/serializer.go b/serializer.go new file mode 100644 index 000000000..dcb151e42 --- /dev/null +++ b/serializer.go @@ -0,0 +1,28 @@ +package telegraf + +// SerializerPlugin is an interface for plugins that are able to +// serialize telegraf metrics into arbitrary data formats. +type SerializerPlugin interface { + // SetSerializer sets the serializer function for the interface. + SetSerializer(serializer Serializer) +} + +// Serializer is an interface defining functions that a serializer plugin must +// satisfy. +// +// Implementations of this interface should be reentrant but are not required +// to be thread-safe. +type Serializer interface { + // Serialize takes a single telegraf metric and turns it into a byte buffer. + // separate metrics should be separated by a newline, and there should be + // a newline at the end of the buffer. + // + // New plugins should use SerializeBatch instead to allow for non-line + // delimited metrics. + Serialize(metric Metric) ([]byte, error) + + // SerializeBatch takes an array of telegraf metric and serializes it into + // a byte buffer. This method is not required to be suitable for use with + // line oriented framing. + SerializeBatch(metrics []Metric) ([]byte, error) +} diff --git a/tools/custom_builder/config.go b/tools/custom_builder/config.go index 0567bb137..29748a044 100644 --- a/tools/custom_builder/config.go +++ b/tools/custom_builder/config.go @@ -68,9 +68,10 @@ func (s *selection) Filter(p packageCollection) *packageCollection { enabled.packages[category] = categoryEnabledPackages } - // Make sure we update the list of default parsers used by + // Make sure we update the list of default parsers and serializers used by // the remaining packages enabled.FillDefaultParsers() + enabled.FillDefaultSerializers() // If the user did not configure any parser, we want to include // the default parsers if any to preserve a functional set of @@ -88,6 +89,21 @@ func (s *selection) Filter(p packageCollection) *packageCollection { enabled.packages["parsers"] = parsers } + // If the user did not configure any serializer, we want to include + // the default one if any to preserve a functional set of plugins. + if len(enabled.packages["serializers"]) == 0 && len(enabled.defaultSerializers) > 0 { + var serializers []packageInfo + for _, pkg := range p.packages["serializers"] { + for _, name := range enabled.defaultSerializers { + if pkg.Plugin == name { + serializers = append(serializers, pkg) + break + } + } + } + enabled.packages["serializers"] = serializers + } + return &enabled } diff --git a/tools/custom_builder/main.go b/tools/custom_builder/main.go index 69206e78f..774d0d549 100644 --- a/tools/custom_builder/main.go +++ b/tools/custom_builder/main.go @@ -19,6 +19,7 @@ var categories = []string{ "parsers", "processors", "secretstores", + "serializers", } const description = ` diff --git a/tools/custom_builder/packages.go b/tools/custom_builder/packages.go index 612866af9..5a952915c 100644 --- a/tools/custom_builder/packages.go +++ b/tools/custom_builder/packages.go @@ -27,16 +27,18 @@ var packageFilter = filter.MustCompile([]string{ }) type packageInfo struct { - Category string - Plugin string - Path string - Tag string - DefaultParser string + Category string + Plugin string + Path string + Tag string + DefaultParser string + DefaultSerializer string } type packageCollection struct { - packages map[string][]packageInfo - defaultParsers []string + packages map[string][]packageInfo + defaultParsers []string + defaultSerializers []string } // Define the package exceptions @@ -100,7 +102,8 @@ func (p *packageCollection) collectPackagesForCategory(category string) error { } // Extract potential default parsers for input and processor packages - var defaultParser string + // as well as serializers for the output package + var defaultParser, defaultSerializer string switch category { case "inputs", "processors": var err error @@ -108,17 +111,24 @@ func (p *packageCollection) collectPackagesForCategory(category string) error { if err != nil { log.Printf("Getting default parser for %s.%s failed: %v", category, name, err) } + case "outputs": + var err error + defaultSerializer, err = extractDefaultSerializer(path) + if err != nil { + log.Printf("Getting default serializer for %s.%s failed: %v", category, name, err) + } } for _, plugin := range registeredNames { path := filepath.Join("plugins", category, element.Name()) tag := category + "." + element.Name() entries = append(entries, packageInfo{ - Category: category, - Plugin: plugin, - Path: filepath.ToSlash(path), - Tag: tag, - DefaultParser: defaultParser, + Category: category, + Plugin: plugin, + Path: filepath.ToSlash(path), + Tag: tag, + DefaultParser: defaultParser, + DefaultSerializer: defaultSerializer, }) } } @@ -148,6 +158,26 @@ func (p *packageCollection) FillDefaultParsers() { } } +func (p *packageCollection) FillDefaultSerializers() { + // Make sure we ignore all empty-named parsers which indicate + // that there is no parser used by the plugin. + serializers := map[string]bool{"": true} + + // Iterate over all plugins that may have parsers and collect + // the defaults + p.defaultSerializers = make([]string, 0) + for _, category := range []string{"outputs"} { + for _, pkg := range p.packages[category] { + name := pkg.DefaultSerializer + if seen := serializers[name]; seen { + continue + } + p.defaultSerializers = append(p.defaultSerializers, name) + serializers[name] = true + } + } +} + func (p *packageCollection) CollectAvailable() error { p.packages = make(map[string][]packageInfo) @@ -158,6 +188,7 @@ func (p *packageCollection) CollectAvailable() error { } p.FillDefaultParsers() + p.FillDefaultSerializers() return nil } @@ -352,3 +383,35 @@ func extractDefaultParser(pluginDir string) (string, error) { return "", nil } + +func extractDefaultSerializer(pluginDir string) (string, error) { + re := regexp.MustCompile(`^\s*#?\s*data_format\s*=\s*"(.*)"\s*$`) + + // Walk all config files in the package directory + elements, err := os.ReadDir(pluginDir) + if err != nil { + return "", err + } + + for _, element := range elements { + path := filepath.Join(pluginDir, element.Name()) + if element.IsDir() || filepath.Ext(element.Name()) != ".conf" { + continue + } + + // Read the config and search for a "data_format" entry + file, err := os.Open(path) + if err != nil { + return "", err + } + scanner := bufio.NewScanner(file) + for scanner.Scan() { + match := re.FindStringSubmatch(scanner.Text()) + if len(match) == 2 { + return match[1], nil + } + } + } + + return "", nil +}