chore(serializers): Add new-style framework and migrate influx (#12920)

This commit is contained in:
Sven Rebhan 2023-04-11 21:52:42 +02:00 committed by GitHub
parent eaa19307e0
commit 9bb2d1562d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 915 additions and 279 deletions

View File

@ -961,9 +961,7 @@ func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
wg.Add(1)
go func() {
defer wg.Done()
s := influx.NewSerializer()
s.SetFieldSortOrder(influx.SortFields)
s := &influx.Serializer{SortFields: true}
for metric := range src {
octets, err := s.Serialize(metric)
if err == nil {

View File

@ -23,6 +23,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/parsers/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"
_ "github.com/influxdata/telegraf/plugins/secretstores/all"
_ "github.com/influxdata/telegraf/plugins/serializers/all"
)
type TelegrafConfig interface {

View File

@ -952,16 +952,45 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
}
}
conf := c.buildParser(parentname, table)
if err := c.toml.UnmarshalTable(table, parser); err != nil {
return nil, err
}
conf := &models.ParserConfig{
Parent: parentname,
DataFormat: dataformat,
}
running := models.NewRunningParser(parser, conf)
err := running.Init()
return running, err
}
func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.RunningSerializer, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
if dataformat == "" {
dataformat = "influx"
}
creator, ok := serializers.Serializers[dataformat]
if !ok {
return nil, fmt.Errorf("undefined but requested serializer: %s", dataformat)
}
serializer := creator()
if err := c.toml.UnmarshalTable(table, serializer); err != nil {
return nil, err
}
conf := &models.SerializerConfig{
Parent: parentname,
DataFormat: dataformat,
}
running := models.NewRunningSerializer(serializer, conf)
err := running.Init()
return running, err
}
func (c *Config) addProcessor(name string, table *ast.Table) error {
creator, ok := processors.Processors[name]
if !ok {
@ -1070,6 +1099,18 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
return nil
}
// For inputs with parsers we need to compute the set of
// options that is not covered by both, the parser and the input.
// We achieve this by keeping a local book of missing entries
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
// missing entries at the end.
missThreshold := 0
missCount := make(map[string]int)
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()
creator, ok := outputs.Outputs[name]
if !ok {
// Handle removed, deprecated plugins
@ -1083,12 +1124,34 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
// If the output has a SetSerializer function, then this means it can write
// arbitrary types of output, so build the serializer and set it.
if t, ok := output.(serializers.SerializerOutput); ok {
serializer, err := c.buildSerializer(table)
if err != nil {
return err
if t, ok := output.(telegraf.SerializerPlugin); ok {
missThreshold = 1
if serializer, err := c.addSerializer(name, table); err == nil {
t.SetSerializer(serializer)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
serializer, err := c.buildSerializerOld(table)
if err != nil {
return err
}
t.SetSerializer(serializer)
}
} else if t, ok := output.(serializers.SerializerOutput); ok {
// Keep the old interface for backward compatibility
// DEPRECATED: Please switch your plugin to telegraf.Serializers
missThreshold = 1
if serializer, err := c.addSerializer(name, table); err == nil {
t.SetSerializer(serializer)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
serializer, err := c.buildSerializerOld(table)
if err != nil {
return err
}
t.SetSerializer(serializer)
}
t.SetSerializer(serializer)
}
outputConfig, err := c.buildOutput(name, table)
@ -1110,8 +1173,19 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
}
}
// Check the number of misses against the threshold
for key, count := range missCount {
if count <= missThreshold {
continue
}
if err := c.missingTomlField(nil, key); err != nil {
return err
}
}
ro := models.NewRunningOutput(output, outputConfig, c.Agent.MetricBatchSize, c.Agent.MetricBufferLimit)
c.Outputs = append(c.Outputs, ro)
return nil
}
@ -1205,10 +1279,6 @@ func (c *Config) addInput(name string, table *ast.Table) error {
}
}
rp := models.NewRunningInput(input, pluginConfig)
rp.SetDefaultTags(c.Tags)
c.Inputs = append(c.Inputs, rp)
// Check the number of misses against the threshold
for key, count := range missCount {
if count <= missCountThreshold {
@ -1219,6 +1289,10 @@ func (c *Config) addInput(name string, table *ast.Table) error {
}
}
rp := models.NewRunningInput(input, pluginConfig)
rp.SetDefaultTags(c.Tags)
c.Inputs = append(c.Inputs, rp)
return nil
}
@ -1266,21 +1340,6 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato
return conf, err
}
// buildParser parses Parser specific items from the ast.Table,
// builds the filter and returns a
// models.ParserConfig to be inserted into models.RunningParser
func (c *Config) buildParser(name string, tbl *ast.Table) *models.ParserConfig {
var dataFormat string
c.getFieldString(tbl, "data_format", &dataFormat)
conf := &models.ParserConfig{
Parent: name,
DataFormat: dataFormat,
}
return conf
}
// buildProcessor parses Processor specific items from the ast.Table,
// builds the filter and returns a
// models.ProcessorConfig to be inserted into models.RunningProcessor
@ -1376,10 +1435,10 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
return cp, err
}
// buildSerializer grabs the necessary entries from the ast.Table for creating
// buildSerializerOld grabs the necessary entries from the ast.Table for creating
// a serializers.Serializer object, and creates it, which can then be added onto
// an Output object.
func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error) {
func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error) {
sc := &serializers.Config{TimestampUnits: 1 * time.Second}
c.getFieldString(tbl, "data_format", &sc.DataFormat)
@ -1524,6 +1583,7 @@ func (c *Config) setLocalMissingTomlFieldTracker(counter map[string]int) {
root = root || pt.Implements(reflect.TypeOf((*telegraf.Aggregator)(nil)).Elem())
root = root || pt.Implements(reflect.TypeOf((*telegraf.Processor)(nil)).Elem())
root = root || pt.Implements(reflect.TypeOf((*telegraf.Parser)(nil)).Elem())
root = root || pt.Implements(reflect.TypeOf((*telegraf.Serializer)(nil)).Elem())
c, ok := counter[key]
if !root {

View File

@ -9,6 +9,7 @@ import (
"os/exec"
"path/filepath"
"reflect"
"regexp"
"runtime"
"strings"
"sync"
@ -29,6 +30,8 @@ import (
_ "github.com/influxdata/telegraf/plugins/parsers/all" // Blank import to have all parsers for testing
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
_ "github.com/influxdata/telegraf/plugins/serializers/all" // Blank import to have all serializers for testing
)
func TestReadBinaryFile(t *testing.T) {
@ -511,6 +514,184 @@ func TestConfig_URLLikeFileName(t *testing.T) {
}
}
func TestConfig_SerializerInterfaceNewFormat(t *testing.T) {
formats := []string{
"carbon2",
"csv",
"graphite",
"influx",
"json",
"msgpack",
"nowmetric",
"prometheus",
"prometheusremotewrite",
"splunkmetric",
"wavefront",
}
c := NewConfig()
require.NoError(t, c.LoadConfig("./testdata/serializers_new.toml"))
require.Len(t, c.Outputs, len(formats))
cfg := serializers.Config{}
override := map[string]struct {
param map[string]interface{}
mask []string
}{}
expected := make([]telegraf.Serializer, 0, len(formats))
for _, format := range formats {
formatCfg := &cfg
formatCfg.DataFormat = format
logger := models.NewLogger("serializers", format, "test")
var serializer telegraf.Serializer
if creator, found := serializers.Serializers[format]; found {
serializer = creator()
} else {
var err error
serializer, err = serializers.NewSerializer(formatCfg)
require.NoErrorf(t, err, "No serializer for format %q", format)
}
if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(serializer))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
}
models.SetLoggerOnPlugin(serializer, logger)
if s, ok := serializer.(telegraf.Initializer); ok {
require.NoError(t, s.Init())
}
expected = append(expected, serializer)
}
require.Len(t, expected, len(formats))
actual := make([]interface{}, 0)
for _, plugin := range c.Outputs {
output, ok := plugin.Output.(*MockupOutputPluginSerializerNew)
require.True(t, ok)
// Get the parser set with 'SetParser()'
if p, ok := output.Serializer.(*models.RunningSerializer); ok {
actual = append(actual, p.Serializer)
} else {
actual = append(actual, output.Serializer)
}
}
require.Len(t, actual, len(formats))
for i, format := range formats {
// Determine the underlying type of the serializer
stype := reflect.Indirect(reflect.ValueOf(expected[i])).Interface()
// Ignore all unexported fields and fields not relevant for functionality
options := []cmp.Option{
cmpopts.IgnoreUnexported(stype),
cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}),
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
}
if settings, found := override[format]; found {
options = append(options, cmpopts.IgnoreFields(stype, settings.mask...))
}
// Do a manual comparision as require.EqualValues will also work on unexported fields
// that cannot be cleared or ignored.
diff := cmp.Diff(expected[i], actual[i], options...)
require.Emptyf(t, diff, "Difference in SetSerializer() for %q", format)
}
}
func TestConfig_SerializerInterfaceOldFormat(t *testing.T) {
formats := []string{
"carbon2",
"csv",
"graphite",
"influx",
"json",
"msgpack",
"nowmetric",
"prometheus",
"prometheusremotewrite",
"splunkmetric",
"wavefront",
}
c := NewConfig()
require.NoError(t, c.LoadConfig("./testdata/serializers_old.toml"))
require.Len(t, c.Outputs, len(formats))
cfg := serializers.Config{}
override := map[string]struct {
param map[string]interface{}
mask []string
}{}
expected := make([]telegraf.Serializer, 0, len(formats))
for _, format := range formats {
formatCfg := &cfg
formatCfg.DataFormat = format
logger := models.NewLogger("serializers", format, "test")
var serializer serializers.Serializer
if creator, found := serializers.Serializers[format]; found {
serializer = creator()
} else {
var err error
serializer, err = serializers.NewSerializer(formatCfg)
require.NoErrorf(t, err, "No serializer for format %q", format)
}
if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(serializer))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
}
models.SetLoggerOnPlugin(serializer, logger)
if s, ok := serializer.(telegraf.Initializer); ok {
require.NoError(t, s.Init())
}
expected = append(expected, serializer)
}
require.Len(t, expected, len(formats))
actual := make([]interface{}, 0)
for _, plugin := range c.Outputs {
output, ok := plugin.Output.(*MockupOutputPluginSerializerOld)
require.True(t, ok)
// Get the parser set with 'SetParser()'
if p, ok := output.Serializer.(*models.RunningSerializer); ok {
actual = append(actual, p.Serializer)
} else {
actual = append(actual, output.Serializer)
}
}
require.Len(t, actual, len(formats))
for i, format := range formats {
// Determine the underlying type of the serializer
stype := reflect.Indirect(reflect.ValueOf(expected[i])).Interface()
// Ignore all unexported fields and fields not relevant for functionality
options := []cmp.Option{
cmpopts.IgnoreUnexported(stype),
cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}),
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
}
if settings, found := override[format]; found {
options = append(options, cmpopts.IgnoreFields(stype, settings.mask...))
}
// Do a manual comparison as require.EqualValues will also work on unexported fields
// that cannot be cleared or ignored.
diff := cmp.Diff(expected[i], actual[i], options...)
require.Emptyf(t, diff, "Difference in SetSerializer() for %q", format)
}
}
func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
formats := []string{
"collectd",
@ -1341,6 +1522,47 @@ func (m *MockupOuputPlugin) Write(_ []telegraf.Metric) error {
return nil
}
/*** Mockup OUTPUT plugin for serializer testing to avoid cyclic dependencies ***/
type MockupOutputPluginSerializerOld struct {
Serializer serializers.Serializer
}
func (m *MockupOutputPluginSerializerOld) SetSerializer(s serializers.Serializer) {
m.Serializer = s
}
func (*MockupOutputPluginSerializerOld) Connect() error {
return nil
}
func (*MockupOutputPluginSerializerOld) Close() error {
return nil
}
func (*MockupOutputPluginSerializerOld) SampleConfig() string {
return "Mockup test output plugin"
}
func (*MockupOutputPluginSerializerOld) Write(_ []telegraf.Metric) error {
return nil
}
type MockupOutputPluginSerializerNew struct {
Serializer telegraf.Serializer
}
func (m *MockupOutputPluginSerializerNew) SetSerializer(s telegraf.Serializer) {
m.Serializer = s
}
func (*MockupOutputPluginSerializerNew) Connect() error {
return nil
}
func (*MockupOutputPluginSerializerNew) Close() error {
return nil
}
func (*MockupOutputPluginSerializerNew) SampleConfig() string {
return "Mockup test output plugin"
}
func (*MockupOutputPluginSerializerNew) Write(_ []telegraf.Metric) error {
return nil
}
/*** Mockup INPUT plugin with state for testing to avoid cyclic dependencies ***/
type MockupState struct {
Name string
@ -1457,4 +1679,10 @@ func init() {
outputs.Add("http", func() telegraf.Output {
return &MockupOuputPlugin{}
})
outputs.Add("serializer_test_new", func() telegraf.Output {
return &MockupOutputPluginSerializerNew{}
})
outputs.Add("serializer_test_old", func() telegraf.Output {
return &MockupOutputPluginSerializerOld{}
})
}

32
config/testdata/serializers_new.toml vendored Normal file
View File

@ -0,0 +1,32 @@
[[outputs.serializer_test_new]]
data_format = "carbon2"
[[outputs.serializer_test_new]]
data_format = "csv"
[[outputs.serializer_test_new]]
data_format = "graphite"
[[outputs.serializer_test_new]]
data_format = "influx"
[[outputs.serializer_test_new]]
data_format = "json"
[[outputs.serializer_test_new]]
data_format = "msgpack"
[[outputs.serializer_test_new]]
data_format = "nowmetric"
[[outputs.serializer_test_new]]
data_format = "prometheus"
[[outputs.serializer_test_new]]
data_format = "prometheusremotewrite"
[[outputs.serializer_test_new]]
data_format = "splunkmetric"
[[outputs.serializer_test_new]]
data_format = "wavefront"

32
config/testdata/serializers_old.toml vendored Normal file
View File

@ -0,0 +1,32 @@
[[outputs.serializer_test_old]]
data_format = "carbon2"
[[outputs.serializer_test_old]]
data_format = "csv"
[[outputs.serializer_test_old]]
data_format = "graphite"
[[outputs.serializer_test_old]]
data_format = "influx"
[[outputs.serializer_test_old]]
data_format = "json"
[[outputs.serializer_test_old]]
data_format = "msgpack"
[[outputs.serializer_test_old]]
data_format = "nowmetric"
[[outputs.serializer_test_old]]
data_format = "prometheus"
[[outputs.serializer_test_old]]
data_format = "prometheusremotewrite"
[[outputs.serializer_test_old]]
data_format = "splunkmetric"
[[outputs.serializer_test_old]]
data_format = "wavefront"

View File

@ -0,0 +1,102 @@
package models
import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/selfstat"
)
// SerializerConfig is the common config for all serializers.
type SerializerConfig struct {
Parent string
Alias string
DataFormat string
DefaultTags map[string]string
}
type RunningSerializer struct {
Serializer serializers.Serializer
Config *SerializerConfig
log telegraf.Logger
MetricsSerialized selfstat.Stat
BytesSerialized selfstat.Stat
SerializationTime selfstat.Stat
}
func NewRunningSerializer(serializer serializers.Serializer, config *SerializerConfig) *RunningSerializer {
tags := map[string]string{"type": config.DataFormat}
if config.Alias != "" {
tags["alias"] = config.Alias
}
serializerErrorsRegister := selfstat.Register("serializer", "errors", tags)
logger := NewLogger("serializers", config.DataFormat+"::"+config.Parent, config.Alias)
logger.OnErr(func() {
serializerErrorsRegister.Incr(1)
})
SetLoggerOnPlugin(serializer, logger)
return &RunningSerializer{
Serializer: serializer,
Config: config,
MetricsSerialized: selfstat.Register(
"serializer",
"metrics_serialized",
tags,
),
BytesSerialized: selfstat.Register(
"serializer",
"bytes_serialized",
tags,
),
SerializationTime: selfstat.Register(
"serializer",
"serialization_time_ns",
tags,
),
log: logger,
}
}
func (r *RunningSerializer) LogName() string {
return logName("parsers", r.Config.DataFormat+"::"+r.Config.Parent, r.Config.Alias)
}
func (r *RunningSerializer) Init() error {
if p, ok := r.Serializer.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
func (r *RunningSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
start := time.Now()
buf, err := r.Serializer.Serialize(metric)
elapsed := time.Since(start)
r.SerializationTime.Incr(elapsed.Nanoseconds())
r.MetricsSerialized.Incr(1)
r.BytesSerialized.Incr(int64(len(buf)))
return buf, err
}
func (r *RunningSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
start := time.Now()
buf, err := r.Serializer.SerializeBatch(metrics)
elapsed := time.Since(start)
r.SerializationTime.Incr(elapsed.Nanoseconds())
r.MetricsSerialized.Incr(int64(len(metrics)))
r.BytesSerialized.Incr(int64(len(buf)))
return buf, err
}
func (r *RunningSerializer) Log() telegraf.Logger {
return r.log
}

View File

@ -101,7 +101,10 @@ func hasQuit(ctx context.Context) bool {
}
func (s *Shim) writeProcessedMetrics() error {
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
if err := serializer.Init(); err != nil {
return fmt.Errorf("creating serializer failed: %w", err)
}
for { //nolint:gosimple // for-select used on purpose
select {
case m, open := <-s.metricCh:

View File

@ -10,7 +10,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -33,7 +33,8 @@ func TestOutputShim(t *testing.T) {
wg.Done()
}()
serializer := serializers.NewInfluxSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
m := metric.New("thing",
map[string]string{

View File

@ -13,7 +13,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
)
func TestProcessorShim(t *testing.T) {
@ -52,7 +52,9 @@ func testSendAndReceive(t *testing.T, fieldKey string, fieldValue string) {
wg.Done()
}()
serializer := serializers.NewInfluxSerializer()
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
parser := influx.Parser{}
require.NoError(t, parser.Init())

View File

@ -18,7 +18,7 @@ import (
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
"github.com/influxdata/telegraf/plugins/serializers"
influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -206,9 +206,12 @@ func TestMain(m *testing.M) {
func runCounterProgram() error {
envMetricName := os.Getenv("METRIC_NAME")
i := 0
serializer := serializers.NewInfluxSerializer()
serializer := &influxSerializer.Serializer{}
if err := serializer.Init(); err != nil {
return err
}
i := 0
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
m := metric.New(envMetricName,

View File

@ -104,7 +104,10 @@ func (s *Shim) Run(pollInterval time.Duration) error {
collectMetricsPrompt := make(chan os.Signal, 1)
listenForCollectMetricsSignals(ctx, collectMetricsPrompt)
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
if err := serializer.Init(); err != nil {
return fmt.Errorf("creating serializer failed: %w", err)
}
for _, input := range s.Inputs {
wrappedInput := inputShim{Input: input}

View File

@ -21,7 +21,7 @@ import (
kafkaOutput "github.com/influxdata/telegraf/plugins/outputs/kafka"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/serializers"
influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -540,7 +540,8 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
output, ok := creator().(*kafkaOutput.Kafka)
require.True(t, ok)
s := serializers.NewInfluxSerializer()
s := &influxSerializer.Serializer{}
require.NoError(t, s.Init())
output.SetSerializer(s)
output.Brokers = brokers
output.Topic = topic

View File

@ -18,7 +18,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
serializer "github.com/influxdata/telegraf/plugins/serializers/influx"
)
const (
@ -62,7 +62,9 @@ type (
)
func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []testMetric) (*PubSub, *stubTopic, []telegraf.Metric) {
s := serializers.NewInfluxSerializer()
// Instantiate a Influx line-protocol serializer
s := &serializer.Serializer{}
_ = s.Init() // We can ignore the error as the Init will never fail
metrics := make([]telegraf.Metric, 0, len(testM))
t := &stubTopic{

View File

@ -10,7 +10,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -57,7 +57,8 @@ func TestExec(t *testing.T) {
runner: &CommandRunner{},
}
s := serializers.NewInfluxSerializer()
s := &influx.Serializer{}
require.NoError(t, s.Init())
e.SetSerializer(s)
require.NoError(t, e.Connect())

View File

@ -18,14 +18,15 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
var now = time.Date(2020, 6, 30, 16, 16, 0, 0, time.UTC)
func TestExternalOutputWorks(t *testing.T) {
influxSerializer := serializers.NewInfluxSerializer()
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
exe, err := os.Executable()
require.NoError(t, err)
@ -34,7 +35,7 @@ func TestExternalOutputWorks(t *testing.T) {
Command: []string{exe, "-testoutput"},
Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"},
RestartDelay: config.Duration(5 * time.Second),
serializer: influxSerializer,
serializer: serializer,
Log: testutil.Logger{},
}
@ -71,7 +72,8 @@ func TestExternalOutputWorks(t *testing.T) {
}
func TestPartiallyUnserializableThrowError(t *testing.T) {
influxSerializer := serializers.NewInfluxSerializer()
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
exe, err := os.Executable()
require.NoError(t, err)
@ -81,7 +83,7 @@ func TestPartiallyUnserializableThrowError(t *testing.T) {
Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"},
RestartDelay: config.Duration(5 * time.Second),
IgnoreSerializationError: false,
serializer: influxSerializer,
serializer: serializer,
Log: testutil.Logger{},
}
@ -107,7 +109,8 @@ func TestPartiallyUnserializableThrowError(t *testing.T) {
}
func TestPartiallyUnserializableCanBeSkipped(t *testing.T) {
influxSerializer := serializers.NewInfluxSerializer()
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
exe, err := os.Executable()
require.NoError(t, err)
@ -117,7 +120,7 @@ func TestPartiallyUnserializableCanBeSkipped(t *testing.T) {
Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"},
RestartDelay: config.Duration(5 * time.Second),
IgnoreSerializationError: true,
serializer: influxSerializer,
serializer: serializer,
Log: testutil.Logger{},
}

View File

@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -21,7 +21,10 @@ const (
func TestFileExistingFile(t *testing.T) {
fh := createFile(t)
s := serializers.NewInfluxSerializer()
s := &influx.Serializer{}
require.NoError(t, s.Init())
f := File{
Files: []string{fh.Name()},
serializer: s,
@ -40,7 +43,9 @@ func TestFileExistingFile(t *testing.T) {
}
func TestFileNewFile(t *testing.T) {
s := serializers.NewInfluxSerializer()
s := &influx.Serializer{}
require.NoError(t, s.Init())
fh := tmpFile(t)
f := File{
Files: []string{fh},
@ -64,7 +69,9 @@ func TestFileExistingFiles(t *testing.T) {
fh2 := createFile(t)
fh3 := createFile(t)
s := serializers.NewInfluxSerializer()
s := &influx.Serializer{}
require.NoError(t, s.Init())
f := File{
Files: []string{fh1.Name(), fh2.Name(), fh3.Name()},
serializer: s,
@ -85,7 +92,9 @@ func TestFileExistingFiles(t *testing.T) {
}
func TestFileNewFiles(t *testing.T) {
s := serializers.NewInfluxSerializer()
s := &influx.Serializer{}
require.NoError(t, s.Init())
fh1 := tmpFile(t)
fh2 := tmpFile(t)
fh3 := tmpFile(t)
@ -112,7 +121,9 @@ func TestFileBoth(t *testing.T) {
fh1 := createFile(t)
fh2 := tmpFile(t)
s := serializers.NewInfluxSerializer()
s := &influx.Serializer{}
require.NoError(t, s.Init())
f := File{
Files: []string{fh1.Name(), fh2},
serializer: s,
@ -137,7 +148,9 @@ func TestFileStdout(t *testing.T) {
r, w, _ := os.Pipe()
os.Stdout = w
s := serializers.NewInfluxSerializer()
s := &influx.Serializer{}
require.NoError(t, s.Init())
f := File{
Files: []string{"stdout"},
serializer: s,

View File

@ -112,7 +112,8 @@ func TestMethod(t *testing.T) {
w.WriteHeader(http.StatusOK)
})
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
if tt.connectError {
@ -175,7 +176,8 @@ func TestHTTPClientConfig(t *testing.T) {
w.WriteHeader(http.StatusOK)
})
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
if tt.connectError {
@ -267,7 +269,8 @@ func TestStatusCode(t *testing.T) {
w.WriteHeader(tt.statusCode)
})
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
@ -316,7 +319,8 @@ func TestContentType(t *testing.T) {
w.WriteHeader(http.StatusOK)
})
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
@ -376,7 +380,8 @@ func TestContentEncodingGzip(t *testing.T) {
w.WriteHeader(http.StatusNoContent)
})
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
@ -431,7 +436,8 @@ func TestBasicAuth(t *testing.T) {
w.WriteHeader(http.StatusOK)
})
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
require.NoError(t, plugin.Connect())
require.NoError(t, plugin.Write([]telegraf.Metric{getMetric()}))
@ -506,7 +512,8 @@ func TestOAuthClientCredentialsGrant(t *testing.T) {
}
})
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
@ -604,7 +611,10 @@ func TestOAuthAuthorizationCodeGrant(t *testing.T) {
}
})
tt.plugin.SetSerializer(influx.NewSerializer())
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
tt.plugin.SetSerializer(serializer)
require.NoError(t, tt.plugin.Connect())
require.NoError(t, tt.plugin.Write([]telegraf.Metric{getMetric()}))
require.NoError(t, err)
@ -630,7 +640,8 @@ func TestDefaultUserAgent(t *testing.T) {
Method: defaultMethod,
}
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
client.SetSerializer(serializer)
err = client.Connect()
require.NoError(t, err)
@ -652,10 +663,13 @@ func TestBatchedUnbatched(t *testing.T) {
Method: defaultMethod,
}
influxSerializer := &influx.Serializer{}
require.NoError(t, influxSerializer.Init())
jsonSerializer, err := json.NewSerializer(json.FormatConfig{TimestampUnits: time.Second})
require.NoError(t, err)
s := map[string]serializers.Serializer{
"influx": influx.NewSerializer(),
"influx": influxSerializer,
"json": jsonSerializer,
}
@ -726,7 +740,8 @@ func TestAwsCredentials(t *testing.T) {
tt.handler(t, w, r)
})
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)

View File

@ -155,7 +155,10 @@ func NewHTTPClient(cfg HTTPConfig) (*httpClient, error) {
}
if cfg.Serializer == nil {
cfg.Serializer = influx.NewSerializer()
cfg.Serializer = &influx.Serializer{}
if err := cfg.Serializer.Init(); err != nil {
return nil, err
}
}
var transport *http.Transport

View File

@ -169,10 +169,15 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
}
func (i *InfluxDB) udpClient(address *url.URL) (Client, error) {
serializer := &influx.Serializer{UintSupport: true}
if err := serializer.Init(); err != nil {
return nil, err
}
udpConfig := &UDPConfig{
URL: address,
MaxPayloadSize: int(i.UDPPayload),
Serializer: i.newSerializer(),
Serializer: serializer,
Log: i.Log,
}
@ -190,6 +195,11 @@ func (i *InfluxDB) httpClient(ctx context.Context, address *url.URL, proxy *url.
return nil, err
}
serializer := &influx.Serializer{UintSupport: true}
if err := serializer.Init(); err != nil {
return nil, err
}
httpConfig := &HTTPConfig{
URL: address,
Timeout: time.Duration(i.Timeout),
@ -208,7 +218,7 @@ func (i *InfluxDB) httpClient(ctx context.Context, address *url.URL, proxy *url.
RetentionPolicyTag: i.RetentionPolicyTag,
ExcludeRetentionPolicyTag: i.ExcludeRetentionPolicyTag,
Consistency: i.WriteConsistency,
Serializer: i.newSerializer(),
Serializer: serializer,
Log: i.Log,
}
@ -228,15 +238,6 @@ func (i *InfluxDB) httpClient(ctx context.Context, address *url.URL, proxy *url.
return c, nil
}
func (i *InfluxDB) newSerializer() *influx.Serializer {
serializer := influx.NewSerializer()
if i.InfluxUintSupport {
serializer.SetFieldTypeSupport(influx.UintSupport)
}
return serializer
}
func init() {
outputs.Add("influxdb", func() telegraf.Output {
return &InfluxDB{

View File

@ -46,10 +46,12 @@ func NewUDPClient(config UDPConfig) (*udpClient, error) {
serializer := config.Serializer
if serializer == nil {
s := influx.NewSerializer()
serializer = s
serializer = &influx.Serializer{UintSupport: true}
if err := serializer.Init(); err != nil {
return nil, err
}
}
serializer.SetMaxLineBytes(size)
serializer.MaxLineBytes = size
dialer := config.Dialer
if dialer == nil {

View File

@ -113,7 +113,10 @@ func NewHTTPClient(cfg *HTTPConfig) (*httpClient, error) {
serializer := cfg.Serializer
if serializer == nil {
serializer = influx.NewSerializer()
serializer = &influx.Serializer{}
if err := serializer.Init(); err != nil {
return nil, err
}
}
var transport *http.Transport

View File

@ -125,6 +125,11 @@ func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, erro
return nil, err
}
serializer := &influx.Serializer{UintSupport: true}
if err := serializer.Init(); err != nil {
return nil, err
}
httpConfig := &HTTPConfig{
URL: address,
Token: i.Token,
@ -138,7 +143,7 @@ func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, erro
UserAgent: i.UserAgent,
ContentEncoding: i.ContentEncoding,
TLSConfig: tlsConfig,
Serializer: i.newSerializer(),
Serializer: serializer,
Log: i.Log,
}
@ -150,15 +155,6 @@ func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, erro
return c, nil
}
func (i *InfluxDB) newSerializer() *influx.Serializer {
serializer := influx.NewSerializer()
if i.UintSupport {
serializer.SetFieldTypeSupport(influx.UintSupport)
}
return serializer
}
func init() {
outputs.Add("influxdb_v2", func() telegraf.Output {
return &InfluxDB{

View File

@ -13,7 +13,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -72,7 +72,9 @@ func TestConnectAndWriteIntegration(t *testing.T) {
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
}
s := serializers.NewInfluxSerializer()
s := &influx.Serializer{}
require.NoError(t, s.Init())
k := &Kafka{
Brokers: brokers,
Topic: "Test",
@ -330,7 +332,8 @@ func TestTopicTag(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
tt.plugin.Log = testutil.Logger{}
s := serializers.NewInfluxSerializer()
s := &influx.Serializer{}
require.NoError(t, s.Init())
tt.plugin.SetSerializer(s)
err := tt.plugin.Connect()

View File

@ -209,7 +209,9 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) {
}
func TestWrite_NoMetrics(t *testing.T) {
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
svc := &mockKinesisPutRecords{}
k := KinesisOutput{
@ -230,7 +232,8 @@ func TestWrite_NoMetrics(t *testing.T) {
}
func TestWrite_SingleMetric(t *testing.T) {
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(1, 0)
@ -264,7 +267,8 @@ func TestWrite_SingleMetric(t *testing.T) {
}
func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(3, 0)
@ -295,7 +299,8 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
}
func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
@ -326,7 +331,8 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
}
func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
@ -364,7 +370,8 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
}
func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
@ -402,7 +409,8 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
}
func TestWrite_SerializerError(t *testing.T) {
serializer := influx.NewSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(2, 0)

View File

@ -17,7 +17,7 @@ import (
"github.com/influxdata/telegraf/plugins/common/mqtt"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/serializers"
influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -49,7 +49,8 @@ func TestConnectAndWriteIntegration(t *testing.T) {
container := launchTestContainer(t)
defer container.Terminate()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s := serializers.NewInfluxSerializer()
s := &influxSerializer.Serializer{}
require.NoError(t, s.Init())
m := &MQTT{
MqttConfig: mqtt.MqttConfig{
Servers: []string{url},
@ -79,7 +80,9 @@ func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) {
defer container.Terminate()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s := serializers.NewInfluxSerializer()
s := &influxSerializer.Serializer{}
require.NoError(t, s.Init())
m := &MQTT{
MqttConfig: mqtt.MqttConfig{
Servers: []string{url},
@ -109,7 +112,10 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
container := launchTestContainer(t)
defer container.Terminate()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
url := fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s := &influxSerializer.Serializer{}
require.NoError(t, s.Init())
m := &MQTT{
MqttConfig: mqtt.MqttConfig{
Servers: []string{url},
@ -117,7 +123,7 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
KeepAlive: 30,
Timeout: config.Duration(5 * time.Second),
},
serializer: serializers.NewInfluxSerializer(),
serializer: s,
Log: testutil.Logger{Name: "mqttv5-integration-test"},
}
@ -151,7 +157,8 @@ func TestIntegrationMQTTv3(t *testing.T) {
// Setup the parser / serializer pair
parser := &influx.Parser{}
require.NoError(t, parser.Init())
serializer := serializers.NewInfluxSerializer()
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
// Setup the plugin
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
@ -265,7 +272,9 @@ func TestMQTTv5Properties(t *testing.T) {
}
// Setup the metric serializer
serializer := serializers.NewInfluxSerializer()
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
// Verify that we can connect to the MQTT broker
@ -300,7 +309,8 @@ func TestIntegrationMQTTLayoutNonBatch(t *testing.T) {
// Setup the parser / serializer pair
parser := &influx.Parser{}
require.NoError(t, parser.Init())
serializer := serializers.NewInfluxSerializer()
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
// Setup the plugin
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
@ -386,7 +396,8 @@ func TestIntegrationMQTTLayoutBatch(t *testing.T) {
// Setup the parser / serializer pair
parser := &influx.Parser{}
require.NoError(t, parser.Init())
serializer := serializers.NewInfluxSerializer()
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
// Setup the plugin
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
@ -858,7 +869,9 @@ func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) {
}
func TestGenerateTopicName(t *testing.T) {
s := serializers.NewInfluxSerializer()
s := &influxSerializer.Serializer{}
require.NoError(t, s.Init())
m := &MQTT{
MqttConfig: mqtt.MqttConfig{
Servers: []string{"tcp://localhost:1883"},

View File

@ -7,7 +7,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -27,12 +27,13 @@ func TestConnectAndWriteIntegration(t *testing.T) {
defer container.Terminate()
server := []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports[servicePort])}
s := serializers.NewInfluxSerializer()
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
n := &NATS{
Servers: server,
Name: "telegraf",
Subject: "telegraf",
serializer: s,
serializer: serializer,
}
// Verify that we can connect to the NATS daemon

View File

@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -29,7 +29,8 @@ func TestConnectAndWriteIntegration(t *testing.T) {
defer container.Terminate()
server := []string{fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])}
s := serializers.NewInfluxSerializer()
s := &influx.Serializer{}
require.NoError(t, s.Init())
n := &NSQ{
Server: server[0],
Topic: "telegraf",

View File

@ -144,13 +144,8 @@ func (sw *SocketWriter) Close() error {
return err
}
func newSocketWriter() *SocketWriter {
s := serializers.NewInfluxSerializer()
return &SocketWriter{
Serializer: s,
}
}
func init() {
outputs.Add("socket_writer", func() telegraf.Output { return newSocketWriter() })
outputs.Add("socket_writer", func() telegraf.Output {
return &SocketWriter{}
})
}

View File

@ -10,18 +10,25 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
func newSocketWriter(addr string) *SocketWriter {
serializer := &influx.Serializer{}
_ = serializer.Init()
return &SocketWriter{
Address: addr,
Serializer: serializer,
}
}
func TestSocketWriter_tcp(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
sw := newSocketWriter()
sw.Address = "tcp://" + listener.Addr().String()
err = sw.Connect()
require.NoError(t, err)
sw := newSocketWriter("tcp://" + listener.Addr().String())
require.NoError(t, sw.Connect())
lconn, err := listener.Accept()
require.NoError(t, err)
@ -33,11 +40,8 @@ func TestSocketWriter_udp(t *testing.T) {
listener, err := net.ListenPacket("udp", "127.0.0.1:0")
require.NoError(t, err)
sw := newSocketWriter()
sw.Address = "udp://" + listener.LocalAddr().String()
err = sw.Connect()
require.NoError(t, err)
sw := newSocketWriter("udp://" + listener.LocalAddr().String())
require.NoError(t, sw.Connect())
testSocketWriterPacket(t, sw, listener)
}
@ -48,11 +52,8 @@ func TestSocketWriter_unix(t *testing.T) {
listener, err := net.Listen("unix", sock)
require.NoError(t, err)
sw := newSocketWriter()
sw.Address = "unix://" + sock
err = sw.Connect()
require.NoError(t, err)
sw := newSocketWriter("unix://" + sock)
require.NoError(t, sw.Connect())
lconn, err := listener.Accept()
require.NoError(t, err)
@ -70,11 +71,8 @@ func TestSocketWriter_unixgram(t *testing.T) {
listener, err := net.ListenPacket("unixgram", sock)
require.NoError(t, err)
sw := newSocketWriter()
sw.Address = "unixgram://" + sock
err = sw.Connect()
require.NoError(t, err)
sw := newSocketWriter("unixgram://" + sock)
require.NoError(t, sw.Connect())
testSocketWriterPacket(t, sw, listener)
}
@ -132,13 +130,9 @@ func TestSocketWriter_Write_err(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
sw := newSocketWriter()
sw.Address = "tcp://" + listener.Addr().String()
err = sw.Connect()
require.NoError(t, err)
err = sw.Conn.(*net.TCPConn).SetReadBuffer(256)
require.NoError(t, err)
sw := newSocketWriter("tcp://" + listener.Addr().String())
require.NoError(t, sw.Connect())
require.NoError(t, sw.Conn.(*net.TCPConn).SetReadBuffer(256))
lconn, err := listener.Accept()
require.NoError(t, err)
@ -163,13 +157,9 @@ func TestSocketWriter_Write_reconnect(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
sw := newSocketWriter()
sw.Address = "tcp://" + listener.Addr().String()
err = sw.Connect()
require.NoError(t, err)
err = sw.Conn.(*net.TCPConn).SetReadBuffer(256)
require.NoError(t, err)
sw := newSocketWriter("tcp://" + listener.Addr().String())
require.NoError(t, sw.Connect())
require.NoError(t, sw.Conn.(*net.TCPConn).SetReadBuffer(256))
lconn, err := listener.Accept()
require.NoError(t, err)
@ -206,12 +196,9 @@ func TestSocketWriter_udp_gzip(t *testing.T) {
listener, err := net.ListenPacket("udp", "127.0.0.1:0")
require.NoError(t, err)
sw := newSocketWriter()
sw.Address = "udp://" + listener.LocalAddr().String()
sw := newSocketWriter("udp://" + listener.LocalAddr().String())
sw.ContentEncoding = "gzip"
err = sw.Connect()
require.NoError(t, err)
require.NoError(t, sw.Connect())
testSocketWriterPacket(t, sw, listener)
}

View File

@ -65,12 +65,16 @@ import (
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
)
func main() {
parser := influx.NewStreamParser(os.Stdin)
serializer, _ := serializers.NewInfluxSerializer()
serializer := influxSerializer.Serializer{}
if err := serializer.Init(); err != nil {
fmt.Fprintf(os.Stderr, "serializer init failed: %v\n", err)
os.Exit(1)
}
for {
metric, err := parser.Next()

View File

@ -13,7 +13,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -152,7 +152,8 @@ func TestMain(m *testing.M) {
func runCountMultiplierProgram() {
fieldName := os.Getenv("FIELD_NAME")
parser := influx.NewStreamParser(os.Stdin)
serializer := serializers.NewInfluxSerializer()
serializer := &influxSerializer.Serializer{}
_ = serializer.Init() // this should always succeed
for {
m, err := parser.Next()

View File

@ -7,7 +7,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)
@ -15,13 +14,18 @@ import (
var sampleConfig string
type Printer struct {
serializer serializers.Serializer
serializer *influx.Serializer
}
func (*Printer) SampleConfig() string {
return sampleConfig
}
func (p *Printer) Init() error {
p.serializer = &influx.Serializer{}
return p.serializer.Init()
}
func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
octets, err := p.serializer.Serialize(metric)
@ -35,8 +39,6 @@ func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric {
func init() {
processors.Add("printer", func() telegraf.Processor {
return &Printer{
serializer: influx.NewSerializer(),
}
return &Printer{}
})
}

View File

@ -0,0 +1 @@
package all

View File

@ -0,0 +1,7 @@
//go:build !custom || serializers || serializers.influx
package all
import (
_ "github.com/influxdata/telegraf/plugins/serializers/influx" // register plugin
)

View File

@ -12,24 +12,11 @@ import (
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers"
)
const MaxInt64 = int64(^uint64(0) >> 1)
type FieldSortOrder int
const (
NoSortFields FieldSortOrder = iota
SortFields
)
type FieldTypeSupport int
const (
UintSupport FieldTypeSupport = 1 << iota
)
var (
MaxInt64 = int64(^uint64(0) >> 1)
NeedMoreSpace = "need more space"
InvalidName = "invalid name"
NoFields = "no serializable fields"
@ -59,10 +46,11 @@ func (e FieldError) Error() string {
// Serializer is a serializer for line protocol.
type Serializer struct {
maxLineBytes int
bytesWritten int
fieldSortOrder FieldSortOrder
fieldTypeSupport FieldTypeSupport
MaxLineBytes int `toml:"influx_max_line_bytes"`
SortFields bool `toml:"influx_sort_fields"`
UintSupport bool `toml:"influx_uint_support"`
bytesWritten int
buf bytes.Buffer
header []byte
@ -70,27 +58,12 @@ type Serializer struct {
pair []byte
}
func NewSerializer() *Serializer {
serializer := &Serializer{
fieldSortOrder: NoSortFields,
func (s *Serializer) Init() error {
s.header = make([]byte, 0, 50)
s.footer = make([]byte, 0, 21)
s.pair = make([]byte, 0, 50)
header: make([]byte, 0, 50),
footer: make([]byte, 0, 21),
pair: make([]byte, 0, 50),
}
return serializer
}
func (s *Serializer) SetMaxLineBytes(maxLineBytes int) {
s.maxLineBytes = maxLineBytes
}
func (s *Serializer) SetFieldSortOrder(order FieldSortOrder) {
s.fieldSortOrder = order
}
func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) {
s.fieldTypeSupport = typeSupport
return nil
}
// Serialize writes the telegraf.Metric to a byte slice. May produce multiple
@ -215,7 +188,7 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
s.buildFooter(m)
if s.fieldSortOrder == SortFields {
if s.SortFields {
sort.Slice(m.FieldList(), func(i, j int) bool {
return m.FieldList()[i].Key < m.FieldList()[j].Key
})
@ -239,7 +212,7 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
bytesNeeded++
}
if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes {
if s.MaxLineBytes > 0 && bytesNeeded > s.MaxLineBytes {
// Need at least one field per line, this metric cannot be fit
// into the max line bytes.
if firstField {
@ -255,7 +228,7 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
firstField = true
bytesNeeded = len(s.header) + len(s.pair) + len(s.footer)
if bytesNeeded > s.maxLineBytes {
if bytesNeeded > s.MaxLineBytes {
return s.newMetricError(NeedMoreSpace)
}
}
@ -299,7 +272,7 @@ func (s *Serializer) newMetricError(reason string) *MetricError {
func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) {
switch v := value.(type) {
case uint64:
if s.fieldTypeSupport&UintSupport != 0 {
if s.UintSupport {
return appendUintField(buf, v), nil
}
if v <= uint64(MaxInt64) {
@ -349,3 +322,20 @@ func appendStringField(buf []byte, value string) []byte {
buf = append(buf, '"')
return buf
}
func init() {
serializers.Add("influx",
func() serializers.Serializer {
return &Serializer{}
},
)
}
// InitFromConfig is a compatibility function to construct the parser the old way
func (s *Serializer) InitFromConfig(cfg *serializers.Config) error {
s.MaxLineBytes = cfg.InfluxMaxLineBytes
s.SortFields = cfg.InfluxSortFields
s.UintSupport = cfg.InfluxUintSupport
return nil
}

View File

@ -14,7 +14,7 @@ import (
var tests = []struct {
name string
maxBytes int
typeSupport FieldTypeSupport
uintSupport bool
input telegraf.Metric
output []byte
errReason string
@ -132,7 +132,7 @@ var tests = []struct {
time.Unix(0, 0),
),
output: []byte("cpu value=42u 0\n"),
typeSupport: UintSupport,
uintSupport: true,
},
{
name: "uint field max value",
@ -145,7 +145,7 @@ var tests = []struct {
time.Unix(0, 0),
),
output: []byte("cpu value=18446744073709551615u 0\n"),
typeSupport: UintSupport,
uintSupport: true,
},
{
name: "uint field no uint support",
@ -481,10 +481,11 @@ var tests = []struct {
func TestSerializer(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
serializer := NewSerializer()
serializer.SetMaxLineBytes(tt.maxBytes)
serializer.SetFieldSortOrder(SortFields)
serializer.SetFieldTypeSupport(tt.typeSupport)
serializer := &Serializer{
MaxLineBytes: tt.maxBytes,
SortFields: true,
UintSupport: tt.uintSupport,
}
output, err := serializer.Serialize(tt.input)
if tt.errReason != "" {
require.Error(t, err)
@ -498,9 +499,10 @@ func TestSerializer(t *testing.T) {
func BenchmarkSerializer(b *testing.B) {
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
serializer := NewSerializer()
serializer.SetMaxLineBytes(tt.maxBytes)
serializer.SetFieldTypeSupport(tt.typeSupport)
serializer := &Serializer{
MaxLineBytes: tt.maxBytes,
UintSupport: tt.uintSupport,
}
for n := 0; n < b.N; n++ {
output, err := serializer.Serialize(tt.input)
_ = err
@ -522,8 +524,9 @@ func TestSerialize_SerializeBatch(t *testing.T) {
metrics := []telegraf.Metric{m, m}
serializer := NewSerializer()
serializer.SetFieldSortOrder(SortFields)
serializer := &Serializer{
SortFields: true,
}
output, err := serializer.SerializeBatch(metrics)
require.NoError(t, err)
require.Equal(t, []byte("cpu value=42 0\ncpu value=42 0\n"), output)

View File

@ -23,7 +23,7 @@ func NewReader(metrics []telegraf.Metric, serializer *Serializer) io.Reader {
metrics: metrics,
serializer: serializer,
offset: 0,
buf: bytes.NewBuffer(make([]byte, 0, serializer.maxLineBytes)),
buf: bytes.NewBuffer(make([]byte, 0, serializer.MaxLineBytes)),
}
}

View File

@ -127,9 +127,10 @@ func TestReader(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
serializer := NewSerializer()
serializer.SetMaxLineBytes(tt.maxLineBytes)
serializer.SetFieldSortOrder(SortFields)
serializer := &Serializer{
MaxLineBytes: tt.maxLineBytes,
SortFields: true,
}
reader := NewReader(tt.input, serializer)
data := new(bytes.Buffer)
@ -161,8 +162,9 @@ func TestZeroLengthBufferNoError(t *testing.T) {
},
time.Unix(0, 0),
)
serializer := NewSerializer()
serializer.SetFieldSortOrder(SortFields)
serializer := &Serializer{
SortFields: true,
}
reader := NewReader([]telegraf.Metric{m}, serializer)
readbuf := make([]byte, 0)
@ -243,7 +245,7 @@ func BenchmarkReader(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
readbuf := make([]byte, 4096)
serializer := NewSerializer()
serializer := &Serializer{}
reader := NewReader(metrics, serializer)
for {
_, err := reader.Read(readbuf)

View File

@ -9,7 +9,6 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
"github.com/influxdata/telegraf/plugins/serializers/csv"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/plugins/serializers/msgpack"
"github.com/influxdata/telegraf/plugins/serializers/nowmetric"
@ -19,6 +18,17 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/wavefront"
)
// Creator is the function to create a new serializer
type Creator func() Serializer
// Serializers contains the registry of all known serializers (following the new style)
var Serializers = map[string]Creator{}
// Add adds a serializer to the registry. Usually this function is called in the plugin's init function
func Add(name string, creator Creator) {
Serializers[name] = creator
}
// SerializerOutput is an interface for output plugins that are able to
// serialize telegraf metrics into arbitrary data formats.
type SerializerOutput interface {
@ -46,6 +56,12 @@ type Serializer interface {
SerializeBatch(metrics []telegraf.Metric) ([]byte, error)
}
// SerializerCompatibility is an interface for backward-compatible initialization of serializers
type SerializerCompatibility interface {
// InitFromConfig sets the serializers internal variables from the old-style config
InitFromConfig(config *Config) error
}
// Config is a struct that covers the data types needed for all serializer types,
// and can be used to instantiate _any_ of the serializers.
type Config struct {
@ -153,8 +169,6 @@ func NewSerializer(config *Config) (Serializer, error) {
switch config.DataFormat {
case "csv":
serializer, err = NewCSVSerializer(config)
case "influx":
serializer, err = NewInfluxSerializerConfig(config), nil
case "graphite":
serializer, err = NewGraphiteSerializer(
config.Prefix,
@ -187,7 +201,20 @@ func NewSerializer(config *Config) (Serializer, error) {
case "msgpack":
serializer, err = NewMsgpackSerializer(), nil
default:
err = fmt.Errorf("invalid data format: %s", config.DataFormat)
creator, found := Serializers[config.DataFormat]
if !found {
return nil, fmt.Errorf("invalid data format: %s", config.DataFormat)
}
// Try to create new-style serializers the old way...
serializer := creator()
p, ok := serializer.(SerializerCompatibility)
if !ok {
return nil, fmt.Errorf("serializer for %q cannot be created the old way", config.DataFormat)
}
err := p.InitFromConfig(config)
return serializer, err
}
return serializer, err
}
@ -263,28 +290,6 @@ func NewNowSerializer() (Serializer, error) {
return nowmetric.NewSerializer()
}
func NewInfluxSerializerConfig(config *Config) Serializer {
var sort influx.FieldSortOrder
if config.InfluxSortFields {
sort = influx.SortFields
}
var typeSupport influx.FieldTypeSupport
if config.InfluxUintSupport {
typeSupport = typeSupport + influx.UintSupport
}
s := influx.NewSerializer()
s.SetMaxLineBytes(config.InfluxMaxLineBytes)
s.SetFieldSortOrder(sort)
s.SetFieldTypeSupport(typeSupport)
return s
}
func NewInfluxSerializer() Serializer {
return influx.NewSerializer()
}
//nolint:revive //argument-limit conditionally more arguments allowed
func NewGraphiteSerializer(
prefix,

28
serializer.go Normal file
View File

@ -0,0 +1,28 @@
package telegraf
// SerializerPlugin is an interface for plugins that are able to
// serialize telegraf metrics into arbitrary data formats.
type SerializerPlugin interface {
// SetSerializer sets the serializer function for the interface.
SetSerializer(serializer Serializer)
}
// Serializer is an interface defining functions that a serializer plugin must
// satisfy.
//
// Implementations of this interface should be reentrant but are not required
// to be thread-safe.
type Serializer interface {
// Serialize takes a single telegraf metric and turns it into a byte buffer.
// separate metrics should be separated by a newline, and there should be
// a newline at the end of the buffer.
//
// New plugins should use SerializeBatch instead to allow for non-line
// delimited metrics.
Serialize(metric Metric) ([]byte, error)
// SerializeBatch takes an array of telegraf metric and serializes it into
// a byte buffer. This method is not required to be suitable for use with
// line oriented framing.
SerializeBatch(metrics []Metric) ([]byte, error)
}

View File

@ -68,9 +68,10 @@ func (s *selection) Filter(p packageCollection) *packageCollection {
enabled.packages[category] = categoryEnabledPackages
}
// Make sure we update the list of default parsers used by
// Make sure we update the list of default parsers and serializers used by
// the remaining packages
enabled.FillDefaultParsers()
enabled.FillDefaultSerializers()
// If the user did not configure any parser, we want to include
// the default parsers if any to preserve a functional set of
@ -88,6 +89,21 @@ func (s *selection) Filter(p packageCollection) *packageCollection {
enabled.packages["parsers"] = parsers
}
// If the user did not configure any serializer, we want to include
// the default one if any to preserve a functional set of plugins.
if len(enabled.packages["serializers"]) == 0 && len(enabled.defaultSerializers) > 0 {
var serializers []packageInfo
for _, pkg := range p.packages["serializers"] {
for _, name := range enabled.defaultSerializers {
if pkg.Plugin == name {
serializers = append(serializers, pkg)
break
}
}
}
enabled.packages["serializers"] = serializers
}
return &enabled
}

View File

@ -19,6 +19,7 @@ var categories = []string{
"parsers",
"processors",
"secretstores",
"serializers",
}
const description = `

View File

@ -27,16 +27,18 @@ var packageFilter = filter.MustCompile([]string{
})
type packageInfo struct {
Category string
Plugin string
Path string
Tag string
DefaultParser string
Category string
Plugin string
Path string
Tag string
DefaultParser string
DefaultSerializer string
}
type packageCollection struct {
packages map[string][]packageInfo
defaultParsers []string
packages map[string][]packageInfo
defaultParsers []string
defaultSerializers []string
}
// Define the package exceptions
@ -100,7 +102,8 @@ func (p *packageCollection) collectPackagesForCategory(category string) error {
}
// Extract potential default parsers for input and processor packages
var defaultParser string
// as well as serializers for the output package
var defaultParser, defaultSerializer string
switch category {
case "inputs", "processors":
var err error
@ -108,17 +111,24 @@ func (p *packageCollection) collectPackagesForCategory(category string) error {
if err != nil {
log.Printf("Getting default parser for %s.%s failed: %v", category, name, err)
}
case "outputs":
var err error
defaultSerializer, err = extractDefaultSerializer(path)
if err != nil {
log.Printf("Getting default serializer for %s.%s failed: %v", category, name, err)
}
}
for _, plugin := range registeredNames {
path := filepath.Join("plugins", category, element.Name())
tag := category + "." + element.Name()
entries = append(entries, packageInfo{
Category: category,
Plugin: plugin,
Path: filepath.ToSlash(path),
Tag: tag,
DefaultParser: defaultParser,
Category: category,
Plugin: plugin,
Path: filepath.ToSlash(path),
Tag: tag,
DefaultParser: defaultParser,
DefaultSerializer: defaultSerializer,
})
}
}
@ -148,6 +158,26 @@ func (p *packageCollection) FillDefaultParsers() {
}
}
func (p *packageCollection) FillDefaultSerializers() {
// Make sure we ignore all empty-named parsers which indicate
// that there is no parser used by the plugin.
serializers := map[string]bool{"": true}
// Iterate over all plugins that may have parsers and collect
// the defaults
p.defaultSerializers = make([]string, 0)
for _, category := range []string{"outputs"} {
for _, pkg := range p.packages[category] {
name := pkg.DefaultSerializer
if seen := serializers[name]; seen {
continue
}
p.defaultSerializers = append(p.defaultSerializers, name)
serializers[name] = true
}
}
}
func (p *packageCollection) CollectAvailable() error {
p.packages = make(map[string][]packageInfo)
@ -158,6 +188,7 @@ func (p *packageCollection) CollectAvailable() error {
}
p.FillDefaultParsers()
p.FillDefaultSerializers()
return nil
}
@ -352,3 +383,35 @@ func extractDefaultParser(pluginDir string) (string, error) {
return "", nil
}
func extractDefaultSerializer(pluginDir string) (string, error) {
re := regexp.MustCompile(`^\s*#?\s*data_format\s*=\s*"(.*)"\s*$`)
// Walk all config files in the package directory
elements, err := os.ReadDir(pluginDir)
if err != nil {
return "", err
}
for _, element := range elements {
path := filepath.Join(pluginDir, element.Name())
if element.IsDir() || filepath.Ext(element.Name()) != ".conf" {
continue
}
// Read the config and search for a "data_format" entry
file, err := os.Open(path)
if err != nil {
return "", err
}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
match := re.FindStringSubmatch(scanner.Text())
if len(match) == 2 {
return match[1], nil
}
}
}
return "", nil
}