diff --git a/config/config.go b/config/config.go index 5fbdf084e..9a42d5304 100644 --- a/config/config.go +++ b/config/config.go @@ -1452,10 +1452,6 @@ func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error) c.getFieldString(tbl, "prefix", &sc.Prefix) c.getFieldString(tbl, "template", &sc.Template) c.getFieldStringSlice(tbl, "templates", &sc.Templates) - c.getFieldBool(tbl, "csv_column_prefix", &sc.CSVPrefix) - c.getFieldBool(tbl, "csv_header", &sc.CSVHeader) - c.getFieldString(tbl, "csv_separator", &sc.CSVSeparator) - c.getFieldString(tbl, "csv_timestamp_format", &sc.TimestampFormat) c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes) c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields) c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport) @@ -1550,7 +1546,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { // Serializer options to ignore case "prefix", "template", "templates", - "csv_column_prefix", "csv_header", "csv_separator", "csv_timestamp_format", "graphite_strict_sanitize_regex", "graphite_tag_sanitize_mode", "graphite_tag_support", "graphite_separator", "influx_max_line_bytes", "influx_sort_fields", "influx_uint_support", diff --git a/plugins/serializers/all/csv.go b/plugins/serializers/all/csv.go new file mode 100644 index 000000000..a6c727347 --- /dev/null +++ b/plugins/serializers/all/csv.go @@ -0,0 +1,7 @@ +//go:build !custom || serializers || serializers.csv + +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/serializers/csv" // register plugin +) diff --git a/plugins/serializers/csv/csv.go b/plugins/serializers/csv/csv.go index 38c133010..41201273e 100644 --- a/plugins/serializers/csv/csv.go +++ b/plugins/serializers/csv/csv.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/serializers" ) type Serializer struct { @@ -23,39 +24,32 @@ type Serializer struct { writer *csv.Writer } -func NewSerializer(timestampFormat, separator string, header, prefix bool) (*Serializer, error) { +func (s *Serializer) Init() error { // Setting defaults - if separator == "" { - separator = "," + if s.Separator == "" { + s.Separator = "," } // Check inputs - if len(separator) > 1 { - return nil, fmt.Errorf("invalid separator %q", separator) + if len(s.Separator) > 1 { + return fmt.Errorf("invalid separator %q", s.Separator) } - switch timestampFormat { + switch s.TimestampFormat { case "": - timestampFormat = "unix" + s.TimestampFormat = "unix" case "unix", "unix_ms", "unix_us", "unix_ns": default: - if time.Now().Format(timestampFormat) == timestampFormat { - return nil, fmt.Errorf("invalid timestamp format %q", timestampFormat) + if time.Now().Format(s.TimestampFormat) == s.TimestampFormat { + return fmt.Errorf("invalid timestamp format %q", s.TimestampFormat) } } - s := &Serializer{ - TimestampFormat: timestampFormat, - Separator: separator, - Header: header, - Prefix: prefix, - } - // Initialize the writer s.writer = csv.NewWriter(&s.buffer) - s.writer.Comma = []rune(separator)[0] + s.writer.Comma = []rune(s.Separator)[0] s.writer.UseCRLF = runtime.GOOS == "windows" - return s, nil + return nil } func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { @@ -174,3 +168,21 @@ func (s *Serializer) writeData(metric telegraf.Metric) error { return s.writer.Write(columns) } + +func init() { + serializers.Add("csv", + func() serializers.Serializer { + return &Serializer{} + }, + ) +} + +// InitFromConfig is a compatibility function to construct the parser the old way +func (s *Serializer) InitFromConfig(cfg *serializers.Config) error { + s.TimestampFormat = cfg.TimestampFormat + s.Separator = cfg.CSVSeparator + s.Header = cfg.CSVHeader + s.Prefix = cfg.CSVPrefix + + return nil +} diff --git a/plugins/serializers/csv/csv_test.go b/plugins/serializers/csv/csv_test.go index f4443c30a..ade0c10cf 100644 --- a/plugins/serializers/csv/csv_test.go +++ b/plugins/serializers/csv/csv_test.go @@ -15,18 +15,24 @@ import ( ) func TestInvalidTimestampFormat(t *testing.T) { - _, err := NewSerializer("garbage", "", false, false) - require.EqualError(t, err, `invalid timestamp format "garbage"`) + s := Serializer{ + TimestampFormat: "garbage", + } + require.EqualError(t, s.Init(), `invalid timestamp format "garbage"`) } func TestInvalidSeparator(t *testing.T) { - _, err := NewSerializer("", "garbage", false, false) - require.EqualError(t, err, `invalid separator "garbage"`) + s := Serializer{ + Separator: "garbage", + } + require.EqualError(t, s.Init(), `invalid separator "garbage"`) - serializer, err := NewSerializer("", "\n", false, false) - require.NoError(t, err) + s = Serializer{ + Separator: "\n", + } + require.NoError(t, s.Init()) - _, err = serializer.Serialize(testutil.TestMetric(42.3, "test")) + _, err := s.Serialize(testutil.TestMetric(42.3, "test")) require.EqualError(t, err, "writing data failed: csv: invalid field or comment delimiter") } @@ -81,8 +87,13 @@ func TestSerializeTransformationNonBatch(t *testing.T) { require.NoError(t, err) // Serialize - serializer, err := NewSerializer(cfg.TimestampFormat, cfg.Separator, cfg.Header, cfg.Prefix) - require.NoError(t, err) + serializer := Serializer{ + TimestampFormat: cfg.TimestampFormat, + Separator: cfg.Separator, + Header: cfg.Header, + Prefix: cfg.Prefix, + } + require.NoError(t, serializer.Init()) // expected results use LF endings serializer.writer.UseCRLF = false var actual bytes.Buffer @@ -149,8 +160,13 @@ func TestSerializeTransformationBatch(t *testing.T) { require.NoError(t, err) // Serialize - serializer, err := NewSerializer(cfg.TimestampFormat, cfg.Separator, cfg.Header, cfg.Prefix) - require.NoError(t, err) + serializer := Serializer{ + TimestampFormat: cfg.TimestampFormat, + Separator: cfg.Separator, + Header: cfg.Header, + Prefix: cfg.Prefix, + } + require.NoError(t, serializer.Init()) // expected results use LF endings serializer.writer.UseCRLF = false actual, err := serializer.SerializeBatch(metrics) diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index fd8ce782f..8e6dee15e 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -6,7 +6,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/serializers/csv" "github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/plugins/serializers/msgpack" @@ -166,8 +165,6 @@ func NewSerializer(config *Config) (Serializer, error) { var err error var serializer Serializer switch config.DataFormat { - case "csv": - serializer, err = NewCSVSerializer(config) case "graphite": serializer, err = NewGraphiteSerializer( config.Prefix, @@ -216,10 +213,6 @@ func NewSerializer(config *Config) (Serializer, error) { return serializer, err } -func NewCSVSerializer(config *Config) (Serializer, error) { - return csv.NewSerializer(config.TimestampFormat, config.CSVSeparator, config.CSVHeader, config.CSVPrefix) -} - func NewPrometheusRemoteWriteSerializer(config *Config) Serializer { sortMetrics := prometheusremotewrite.NoSortMetrics if config.PrometheusExportTimestamp {