diff --git a/config/config.go b/config/config.go index 60e9b88e9..a2701c693 100644 --- a/config/config.go +++ b/config/config.go @@ -1676,8 +1676,11 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error) c.getFieldStringSlice(tbl, "templates", &sc.Templates) c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format) c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar) + c.getFieldBool(tbl, "csv_column_prefix", &sc.CSVPrefix) + c.getFieldBool(tbl, "csv_header", &sc.CSVHeader) + c.getFieldString(tbl, "csv_separator", &sc.CSVSeparator) + c.getFieldString(tbl, "csv_timestamp_format", &sc.TimestampFormat) c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes) - c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields) c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport) c.getFieldBool(tbl, "graphite_tag_support", &sc.GraphiteTagSupport) @@ -1744,6 +1747,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file", "collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter", "collection_offset", + "csv_separator", "csv_header", "csv_column_prefix", "csv_timestamp_format", "data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path", "dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path", "fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys", diff --git a/docs/DATA_FORMATS_OUTPUT.md b/docs/DATA_FORMATS_OUTPUT.md index 720c922de..d7ad8367b 100644 --- a/docs/DATA_FORMATS_OUTPUT.md +++ b/docs/DATA_FORMATS_OUTPUT.md @@ -6,6 +6,7 @@ plugins. 1. [InfluxDB Line Protocol](/plugins/serializers/influx) 1. [Carbon2](/plugins/serializers/carbon2) +1. [CSV](/plugins/serializers/csv) 1. [Graphite](/plugins/serializers/graphite) 1. [JSON](/plugins/serializers/json) 1. [MessagePack](/plugins/serializers/msgpack) diff --git a/plugins/serializers/csv/README.md b/plugins/serializers/csv/README.md new file mode 100644 index 000000000..c3c252158 --- /dev/null +++ b/plugins/serializers/csv/README.md @@ -0,0 +1,55 @@ +# CSV Serializer + +The `csv` output data format converts metrics into CSV lines. + +## Configuration + +```toml +[[outputs.file]] + ## Files to write to, "stdout" is a specially handled file. + files = ["stdout", "/tmp/metrics.out"] + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "csv" + + ## The default timestamp format is Unix epoch time. + # Other timestamp layout can be configured using the Go language time + # layout specification from https://golang.org/pkg/time/#Time.Format + # e.g.: csv_timestamp_format = "2006-01-02T15:04:05Z07:00" + # csv_timestamp_format = "unix" + + ## The default separator for the CSV format. + # csv_separator = "," + + ## Output the CSV header in the first line. + ## Enable the header when outputting metrics to a new file. + ## Disable when appending to a file or when using a stateless + ## output to prevent headers appearing between data lines. + # csv_header = false + + ## Prefix tag and field columns with "tag_" and "field_" respectively. + ## This can be helpful if you need to know the "type" of a column. + # csv_column_prefix = false +``` + +## Examples + +Standard form: + +```csv +1458229140,docker,raynor,30,4,...,59,660 +``` + +When an output plugin needs to emit multiple metrics at one time, it may use +the batch format. The use of batch format is determined by the plugin, +reference the documentation for the specific plugin. With `csv_header = true` +you get + +```csv +timestamp,measurement,host,field_1,field_2,...,field_N,n_images +1458229140,docker,raynor,30,4,...,59,660 +1458229143,docker,raynor,28,5,...,60,665 +``` diff --git a/plugins/serializers/csv/csv.go b/plugins/serializers/csv/csv.go new file mode 100644 index 000000000..38c133010 --- /dev/null +++ b/plugins/serializers/csv/csv.go @@ -0,0 +1,176 @@ +package csv + +import ( + "bytes" + "encoding/csv" + "fmt" + "runtime" + "sort" + "strconv" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" +) + +type Serializer struct { + TimestampFormat string `toml:"csv_timestamp_format"` + Separator string `toml:"csv_separator"` + Header bool `toml:"csv_header"` + Prefix bool `toml:"csv_column_prefix"` + + buffer bytes.Buffer + writer *csv.Writer +} + +func NewSerializer(timestampFormat, separator string, header, prefix bool) (*Serializer, error) { + // Setting defaults + if separator == "" { + separator = "," + } + + // Check inputs + if len(separator) > 1 { + return nil, fmt.Errorf("invalid separator %q", separator) + } + switch timestampFormat { + case "": + timestampFormat = "unix" + case "unix", "unix_ms", "unix_us", "unix_ns": + default: + if time.Now().Format(timestampFormat) == timestampFormat { + return nil, fmt.Errorf("invalid timestamp format %q", timestampFormat) + } + } + + s := &Serializer{ + TimestampFormat: timestampFormat, + Separator: separator, + Header: header, + Prefix: prefix, + } + + // Initialize the writer + s.writer = csv.NewWriter(&s.buffer) + s.writer.Comma = []rune(separator)[0] + s.writer.UseCRLF = runtime.GOOS == "windows" + + return s, nil +} + +func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { + // Clear the buffer + s.buffer.Truncate(0) + + // Write the header if the user wants us to + if s.Header { + if err := s.writeHeader(metric); err != nil { + return nil, fmt.Errorf("writing header failed: %w", err) + } + s.Header = false + } + + // Write the data + if err := s.writeData(metric); err != nil { + return nil, fmt.Errorf("writing data failed: %w", err) + } + + // Finish up + s.writer.Flush() + return s.buffer.Bytes(), nil +} + +func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + if len(metrics) < 1 { + return nil, nil + } + + // Clear the buffer + s.buffer.Truncate(0) + + // Write the header if the user wants us to + if s.Header { + if err := s.writeHeader(metrics[0]); err != nil { + return nil, fmt.Errorf("writing header failed: %w", err) + } + s.Header = false + } + + for _, m := range metrics { + if err := s.writeData(m); err != nil { + return nil, fmt.Errorf("writing data failed: %w", err) + } + } + + // Finish up + s.writer.Flush() + return s.buffer.Bytes(), nil +} + +func (s *Serializer) writeHeader(metric telegraf.Metric) error { + columns := []string{ + "timestamp", + "measurement", + } + for _, tag := range metric.TagList() { + if s.Prefix { + columns = append(columns, "tag_"+tag.Key) + } else { + columns = append(columns, tag.Key) + } + } + + // Sort the fields by name + sort.Slice(metric.FieldList(), func(i, j int) bool { + return metric.FieldList()[i].Key < metric.FieldList()[j].Key + }) + for _, field := range metric.FieldList() { + if s.Prefix { + columns = append(columns, "field_"+field.Key) + } else { + columns = append(columns, field.Key) + } + } + + return s.writer.Write(columns) +} + +func (s *Serializer) writeData(metric telegraf.Metric) error { + var timestamp string + + // Format the time + switch s.TimestampFormat { + case "unix": + timestamp = strconv.FormatInt(metric.Time().Unix(), 10) + case "unix_ms": + timestamp = strconv.FormatInt(metric.Time().UnixNano()/1_000_000, 10) + case "unix_us": + timestamp = strconv.FormatInt(metric.Time().UnixNano()/1_000, 10) + case "unix_ns": + timestamp = strconv.FormatInt(metric.Time().UnixNano(), 10) + default: + timestamp = metric.Time().UTC().Format(s.TimestampFormat) + } + + columns := []string{ + timestamp, + metric.Name(), + } + for _, tag := range metric.TagList() { + columns = append(columns, tag.Value) + } + + // Sort the fields by name + sort.Slice(metric.FieldList(), func(i, j int) bool { + return metric.FieldList()[i].Key < metric.FieldList()[j].Key + }) + for _, field := range metric.FieldList() { + v, err := internal.ToString(field.Value) + if err != nil { + return fmt.Errorf("converting field %q to string failed: %w", field.Key, err) + } + columns = append(columns, v) + } + + return s.writer.Write(columns) +} diff --git a/plugins/serializers/csv/csv_test.go b/plugins/serializers/csv/csv_test.go new file mode 100644 index 000000000..f0e10ba5a --- /dev/null +++ b/plugins/serializers/csv/csv_test.go @@ -0,0 +1,181 @@ +package csv + +import ( + "bytes" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/influxdata/toml" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/testutil" +) + +func TestInvalidTimestampFormat(t *testing.T) { + _, err := NewSerializer("garbage", "", false, false) + require.EqualError(t, err, `invalid timestamp format "garbage"`) +} + +func TestInvalidSeparator(t *testing.T) { + _, err := NewSerializer("", "garbage", false, false) + require.EqualError(t, err, `invalid separator "garbage"`) + + serializer, err := NewSerializer("", "\n", false, false) + require.NoError(t, err) + + _, err = serializer.Serialize(testutil.TestMetric(42.3, "test")) + require.EqualError(t, err, "writing data failed: csv: invalid field or comment delimiter") +} + +func TestSerializeTransformationNonBatch(t *testing.T) { + var tests = []struct { + name string + filename string + }{ + { + name: "basic", + filename: "testcases/basic.conf", + }, + { + name: "unix nanoseconds timestamp", + filename: "testcases/nanoseconds.conf", + }, + { + name: "header", + filename: "testcases/header.conf", + }, + { + name: "header with prefix", + filename: "testcases/prefix.conf", + }, + { + name: "header and RFC3339 timestamp", + filename: "testcases/rfc3339.conf", + }, + { + name: "header and semicolon", + filename: "testcases/semicolon.conf", + }, + } + parser := influx.NewParser(influx.NewMetricHandler()) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filename := filepath.FromSlash(tt.filename) + cfg, header, err := loadTestConfiguration(filename) + require.NoError(t, err) + + // Get the input metrics + metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser) + require.NoError(t, err) + + // Get the expectations + expectedFn, err := testutil.ParseRawLinesFrom(header, "Output File:") + require.NoError(t, err) + require.Len(t, expectedFn, 1, "only a single output file is supported") + expected, err := loadCSV(expectedFn[0]) + require.NoError(t, err) + + // Serialize + serializer, err := NewSerializer(cfg.TimestampFormat, cfg.Separator, cfg.Header, cfg.Prefix) + require.NoError(t, err) + var actual bytes.Buffer + for _, m := range metrics { + buf, err := serializer.Serialize(m) + require.NoError(t, err) + _, err = actual.ReadFrom(bytes.NewReader(buf)) + require.NoError(t, err) + } + // Compare + require.EqualValues(t, string(expected), actual.String()) + }) + } +} + +func TestSerializeTransformationBatch(t *testing.T) { + var tests = []struct { + name string + filename string + }{ + { + name: "basic", + filename: "testcases/basic.conf", + }, + { + name: "unix nanoseconds timestamp", + filename: "testcases/nanoseconds.conf", + }, + { + name: "header", + filename: "testcases/header.conf", + }, + { + name: "header with prefix", + filename: "testcases/prefix.conf", + }, + { + name: "header and RFC3339 timestamp", + filename: "testcases/rfc3339.conf", + }, + { + name: "header and semicolon", + filename: "testcases/semicolon.conf", + }, + } + parser := influx.NewParser(influx.NewMetricHandler()) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filename := filepath.FromSlash(tt.filename) + cfg, header, err := loadTestConfiguration(filename) + require.NoError(t, err) + + // Get the input metrics + metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser) + require.NoError(t, err) + + // Get the expectations + expectedFn, err := testutil.ParseRawLinesFrom(header, "Output File:") + require.NoError(t, err) + require.Len(t, expectedFn, 1, "only a single output file is supported") + expected, err := loadCSV(expectedFn[0]) + require.NoError(t, err) + + // Serialize + serializer, err := NewSerializer(cfg.TimestampFormat, cfg.Separator, cfg.Header, cfg.Prefix) + require.NoError(t, err) + actual, err := serializer.SerializeBatch(metrics) + require.NoError(t, err) + + // Compare + require.EqualValues(t, string(expected), string(actual)) + }) + } +} + +type Config Serializer + +func loadTestConfiguration(filename string) (*Config, []string, error) { + buf, err := os.ReadFile(filename) + if err != nil { + return nil, nil, err + } + + header := make([]string, 0) + for _, line := range strings.Split(string(buf), "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "#") { + header = append(header, line) + } + } + var cfg Config + err = toml.Unmarshal(buf, &cfg) + return &cfg, header, err +} + +func loadCSV(filename string) ([]byte, error) { + return os.ReadFile(filename) +} diff --git a/plugins/serializers/csv/testcases/basic.conf b/plugins/serializers/csv/testcases/basic.conf new file mode 100644 index 000000000..9c856a268 --- /dev/null +++ b/plugins/serializers/csv/testcases/basic.conf @@ -0,0 +1,8 @@ +# Example for outputting CSV +# +# Output File: +# testcases/basic.csv +# +# Input: +# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000 +# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000 diff --git a/plugins/serializers/csv/testcases/basic.csv b/plugins/serializers/csv/testcases/basic.csv new file mode 100644 index 000000000..23de4b3b3 --- /dev/null +++ b/plugins/serializers/csv/testcases/basic.csv @@ -0,0 +1,2 @@ +1653643420,impression,F5,1cbbb3796fc2,12345,Java,4.9.1,false,5 +1653646789,expression,E42,klaus,67890,Golang,1.18.3,true,42 diff --git a/plugins/serializers/csv/testcases/header.conf b/plugins/serializers/csv/testcases/header.conf new file mode 100644 index 000000000..26b66559e --- /dev/null +++ b/plugins/serializers/csv/testcases/header.conf @@ -0,0 +1,10 @@ +# Example for outputting CSV in non-batch mode. +# +# Output File: +# testcases/header.csv +# +# Input: +# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000 +# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000 + +csv_header = true \ No newline at end of file diff --git a/plugins/serializers/csv/testcases/header.csv b/plugins/serializers/csv/testcases/header.csv new file mode 100644 index 000000000..95875995d --- /dev/null +++ b/plugins/serializers/csv/testcases/header.csv @@ -0,0 +1,3 @@ +timestamp,measurement,flagname,host,key,platform,sdkver,value,count_sum +1653643420,impression,F5,1cbbb3796fc2,12345,Java,4.9.1,false,5 +1653646789,expression,E42,klaus,67890,Golang,1.18.3,true,42 diff --git a/plugins/serializers/csv/testcases/nanoseconds.conf b/plugins/serializers/csv/testcases/nanoseconds.conf new file mode 100644 index 000000000..33e2126d0 --- /dev/null +++ b/plugins/serializers/csv/testcases/nanoseconds.conf @@ -0,0 +1,10 @@ +# Example for outputting CSV +# +# Output File: +# testcases/nanoseconds.csv +# +# Input: +# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420123456 +# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789789012 + +csv_timestamp_format = "unix_ns" \ No newline at end of file diff --git a/plugins/serializers/csv/testcases/nanoseconds.csv b/plugins/serializers/csv/testcases/nanoseconds.csv new file mode 100644 index 000000000..d241ea9c8 --- /dev/null +++ b/plugins/serializers/csv/testcases/nanoseconds.csv @@ -0,0 +1,2 @@ +1653643420123456,impression,F5,1cbbb3796fc2,12345,Java,4.9.1,false,5 +1653646789789012,expression,E42,klaus,67890,Golang,1.18.3,true,42 diff --git a/plugins/serializers/csv/testcases/prefix.conf b/plugins/serializers/csv/testcases/prefix.conf new file mode 100644 index 000000000..127875947 --- /dev/null +++ b/plugins/serializers/csv/testcases/prefix.conf @@ -0,0 +1,11 @@ +# Example for outputting CSV in non-batch mode. +# +# Output File: +# testcases/prefix.csv +# +# Input: +# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000 +# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000 + +csv_header = true +csv_column_prefix = true \ No newline at end of file diff --git a/plugins/serializers/csv/testcases/prefix.csv b/plugins/serializers/csv/testcases/prefix.csv new file mode 100644 index 000000000..ccc61a042 --- /dev/null +++ b/plugins/serializers/csv/testcases/prefix.csv @@ -0,0 +1,3 @@ +timestamp,measurement,tag_flagname,tag_host,tag_key,tag_platform,tag_sdkver,tag_value,field_count_sum +1653643420,impression,F5,1cbbb3796fc2,12345,Java,4.9.1,false,5 +1653646789,expression,E42,klaus,67890,Golang,1.18.3,true,42 diff --git a/plugins/serializers/csv/testcases/rfc3339.conf b/plugins/serializers/csv/testcases/rfc3339.conf new file mode 100644 index 000000000..23f6b3332 --- /dev/null +++ b/plugins/serializers/csv/testcases/rfc3339.conf @@ -0,0 +1,11 @@ +# Example for outputting CSV in non-batch mode. +# +# Output File: +# testcases/rfc3339.csv +# +# Input: +# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000 +# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000 + +csv_timestamp_format = "2006-01-02T15:04:05Z07:00" +csv_header = true \ No newline at end of file diff --git a/plugins/serializers/csv/testcases/rfc3339.csv b/plugins/serializers/csv/testcases/rfc3339.csv new file mode 100644 index 000000000..11973c151 --- /dev/null +++ b/plugins/serializers/csv/testcases/rfc3339.csv @@ -0,0 +1,3 @@ +timestamp,measurement,flagname,host,key,platform,sdkver,value,count_sum +2022-05-27T09:23:40Z,impression,F5,1cbbb3796fc2,12345,Java,4.9.1,false,5 +2022-05-27T10:19:49Z,expression,E42,klaus,67890,Golang,1.18.3,true,42 diff --git a/plugins/serializers/csv/testcases/semicolon.conf b/plugins/serializers/csv/testcases/semicolon.conf new file mode 100644 index 000000000..644e0c602 --- /dev/null +++ b/plugins/serializers/csv/testcases/semicolon.conf @@ -0,0 +1,11 @@ +# Example for outputting CSV in non-batch mode. +# +# Output File: +# testcases/semicolon.csv +# +# Input: +# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000 +# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000 + +csv_separator = ";" +csv_header = true diff --git a/plugins/serializers/csv/testcases/semicolon.csv b/plugins/serializers/csv/testcases/semicolon.csv new file mode 100644 index 000000000..d6a6cb448 --- /dev/null +++ b/plugins/serializers/csv/testcases/semicolon.csv @@ -0,0 +1,3 @@ +timestamp;measurement;flagname;host;key;platform;sdkver;value;count_sum +1653643420;impression;F5;1cbbb3796fc2;12345;Java;4.9.1;false;5 +1653646789;expression;E42;klaus;67890;Golang;1.18.3;true;42 diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 97938c8a5..603b6bd73 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/serializers/carbon2" + "github.com/influxdata/telegraf/plugins/serializers/csv" "github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/json" @@ -56,6 +57,15 @@ type Config struct { // Character used for metric name sanitization in Carbon2. Carbon2SanitizeReplaceChar string `toml:"carbon2_sanitize_replace_char"` + // Separator for CSV + CSVSeparator string `toml:"csv_separator"` + + // Output a CSV header for naming the columns + CSVHeader bool `toml:"csv_header"` + + // Prefix the tag and field columns for CSV format + CSVPrefix bool `toml:"csv_column_prefix"` + // Support tags in graphite protocol GraphiteTagSupport bool `toml:"graphite_tag_support"` @@ -88,7 +98,7 @@ type Config struct { // Timestamp units to use for JSON formatted output TimestampUnits time.Duration `toml:"timestamp_units"` - // Timestamp format to use for JSON formatted output + // Timestamp format to use for JSON and CSV formatted output TimestampFormat string `toml:"timestamp_format"` // Include HEC routing fields for splunkmetric output @@ -124,6 +134,8 @@ func NewSerializer(config *Config) (Serializer, error) { var err error var serializer Serializer switch config.DataFormat { + case "csv": + serializer, err = NewCSVSerializer(config) case "influx": serializer, err = NewInfluxSerializerConfig(config) case "graphite": @@ -150,6 +162,10 @@ func NewSerializer(config *Config) (Serializer, error) { return serializer, err } +func NewCSVSerializer(config *Config) (Serializer, error) { + return csv.NewSerializer(config.TimestampFormat, config.CSVSeparator, config.CSVHeader, config.CSVPrefix) +} + func NewPrometheusRemoteWriteSerializer(config *Config) (Serializer, error) { sortMetrics := prometheusremotewrite.NoSortMetrics if config.PrometheusExportTimestamp {