diff --git a/plugins/serializers/csv/README.md b/plugins/serializers/csv/README.md index c3c252158..56a9687ed 100644 --- a/plugins/serializers/csv/README.md +++ b/plugins/serializers/csv/README.md @@ -33,6 +33,20 @@ The `csv` output data format converts metrics into CSV lines. ## Prefix tag and field columns with "tag_" and "field_" respectively. ## This can be helpful if you need to know the "type" of a column. # csv_column_prefix = false + + ## Use the specified order for the columns. + ## This can be helpful if you need a specific output order. To specify tags, + ## use a `tag.` prefix, for fields use a `field.` prefix and use `name` and + ## `timestamp` to reference the measurement name and timestamp respectively. + ## NOTE: The output will only contain the specified tags, fields, etc. All + ## other data will be dropped. In case a tag or field does not exist, + ## the column will be empty. + ## ex. csv_columns = ["timestamp", "tag.host", "field.value"] + ## + ## By default all metric data will be written in the order: + ## timestamp, name, tags..., fields... + ## with tags and fields being ordered alphabetically. + # csv_columns = [] ``` ## Examples diff --git a/plugins/serializers/csv/csv.go b/plugins/serializers/csv/csv.go index 3007f1c08..d9ba94f74 100644 --- a/plugins/serializers/csv/csv.go +++ b/plugins/serializers/csv/csv.go @@ -7,6 +7,7 @@ import ( "runtime" "sort" "strconv" + "strings" "time" "unicode/utf8" @@ -16,10 +17,11 @@ import ( ) type Serializer struct { - TimestampFormat string `toml:"csv_timestamp_format"` - Separator string `toml:"csv_separator"` - Header bool `toml:"csv_header"` - Prefix bool `toml:"csv_column_prefix"` + TimestampFormat string `toml:"csv_timestamp_format"` + Separator string `toml:"csv_separator"` + Header bool `toml:"csv_header"` + Prefix bool `toml:"csv_column_prefix"` + Columns []string `toml:"csv_columns"` buffer bytes.Buffer writer *csv.Writer @@ -45,6 +47,17 @@ func (s *Serializer) Init() error { } } + // Check columns if any + for _, name := range s.Columns { + switch { + case name == "timestamp", name == "name", + strings.HasPrefix(name, "tag."), + strings.HasPrefix(name, "field."): + default: + return fmt.Errorf("invalid column reference %q", name) + } + } + // Initialize the writer s.writer = csv.NewWriter(&s.buffer) s.writer.Comma, _ = utf8.DecodeRuneInString(s.Separator) @@ -54,25 +67,7 @@ func (s *Serializer) Init() error { } func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { - // Clear the buffer - s.buffer.Truncate(0) - - // Write the header if the user wants us to - if s.Header { - if err := s.writeHeader(metric); err != nil { - return nil, fmt.Errorf("writing header failed: %w", err) - } - s.Header = false - } - - // Write the data - if err := s.writeData(metric); err != nil { - return nil, fmt.Errorf("writing data failed: %w", err) - } - - // Finish up - s.writer.Flush() - return s.buffer.Bytes(), nil + return s.SerializeBatch([]telegraf.Metric{metric}) } func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { @@ -85,15 +80,27 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { // Write the header if the user wants us to if s.Header { - if err := s.writeHeader(metrics[0]); err != nil { - return nil, fmt.Errorf("writing header failed: %w", err) + if len(s.Columns) > 0 { + if err := s.writeHeaderOrdered(); err != nil { + return nil, fmt.Errorf("writing header failed: %w", err) + } + } else { + if err := s.writeHeader(metrics[0]); err != nil { + return nil, fmt.Errorf("writing header failed: %w", err) + } } s.Header = false } for _, m := range metrics { - if err := s.writeData(m); err != nil { - return nil, fmt.Errorf("writing data failed: %w", err) + if len(s.Columns) > 0 { + if err := s.writeDataOrdered(m); err != nil { + return nil, fmt.Errorf("writing data failed: %w", err) + } + } else { + if err := s.writeData(m); err != nil { + return nil, fmt.Errorf("writing data failed: %w", err) + } } } @@ -130,6 +137,21 @@ func (s *Serializer) writeHeader(metric telegraf.Metric) error { return s.writer.Write(columns) } +func (s *Serializer) writeHeaderOrdered() error { + columns := make([]string, 0, len(s.Columns)) + for _, name := range s.Columns { + if s.Prefix { + name = strings.ReplaceAll(name, ".", "_") + } else { + name = strings.TrimPrefix(name, "tag.") + name = strings.TrimPrefix(name, "field.") + } + columns = append(columns, name) + } + + return s.writer.Write(columns) +} + func (s *Serializer) writeData(metric telegraf.Metric) error { var timestamp string @@ -170,6 +192,50 @@ func (s *Serializer) writeData(metric telegraf.Metric) error { return s.writer.Write(columns) } +func (s *Serializer) writeDataOrdered(metric telegraf.Metric) error { + var timestamp string + + // Format the time + switch s.TimestampFormat { + case "unix": + timestamp = strconv.FormatInt(metric.Time().Unix(), 10) + case "unix_ms": + timestamp = strconv.FormatInt(metric.Time().UnixNano()/1_000_000, 10) + case "unix_us": + timestamp = strconv.FormatInt(metric.Time().UnixNano()/1_000, 10) + case "unix_ns": + timestamp = strconv.FormatInt(metric.Time().UnixNano(), 10) + default: + timestamp = metric.Time().UTC().Format(s.TimestampFormat) + } + + columns := make([]string, 0, len(s.Columns)) + for _, name := range s.Columns { + switch { + case name == "timestamp": + columns = append(columns, timestamp) + case name == "name": + columns = append(columns, metric.Name()) + case strings.HasPrefix(name, "tag."): + v, _ := metric.GetTag(strings.TrimPrefix(name, "tag.")) + columns = append(columns, v) + case strings.HasPrefix(name, "field."): + var v string + field := strings.TrimPrefix(name, "field.") + if raw, ok := metric.GetField(field); ok { + var err error + v, err = internal.ToString(raw) + if err != nil { + return fmt.Errorf("converting field %q to string failed: %w", field, err) + } + } + columns = append(columns, v) + } + } + + return s.writer.Write(columns) +} + func init() { serializers.Add("csv", func() serializers.Serializer { diff --git a/plugins/serializers/csv/csv_test.go b/plugins/serializers/csv/csv_test.go index 62c21cc92..36d1bb563 100644 --- a/plugins/serializers/csv/csv_test.go +++ b/plugins/serializers/csv/csv_test.go @@ -66,6 +66,22 @@ func TestSerializeTransformationNonBatch(t *testing.T) { name: "header and semicolon", filename: "testcases/semicolon.conf", }, + { + name: "ordered without header", + filename: "testcases/ordered.conf", + }, + { + name: "ordered with header", + filename: "testcases/ordered_with_header.conf", + }, + { + name: "ordered with header and prefix", + filename: "testcases/ordered_with_header_prefix.conf", + }, + { + name: "ordered non-existing fields and tags", + filename: "testcases/ordered_not_exist.conf", + }, } parser := &influx.Parser{} require.NoError(t, parser.Init()) @@ -93,6 +109,7 @@ func TestSerializeTransformationNonBatch(t *testing.T) { Separator: cfg.Separator, Header: cfg.Header, Prefix: cfg.Prefix, + Columns: cfg.Columns, } require.NoError(t, serializer.Init()) // expected results use LF endings @@ -139,6 +156,22 @@ func TestSerializeTransformationBatch(t *testing.T) { name: "header and semicolon", filename: "testcases/semicolon.conf", }, + { + name: "ordered without header", + filename: "testcases/ordered.conf", + }, + { + name: "ordered with header", + filename: "testcases/ordered_with_header.conf", + }, + { + name: "ordered with header and prefix", + filename: "testcases/ordered_with_header_prefix.conf", + }, + { + name: "ordered non-existing fields and tags", + filename: "testcases/ordered_not_exist.conf", + }, } parser := &influx.Parser{} require.NoError(t, parser.Init()) @@ -166,6 +199,7 @@ func TestSerializeTransformationBatch(t *testing.T) { Separator: cfg.Separator, Header: cfg.Header, Prefix: cfg.Prefix, + Columns: cfg.Columns, } require.NoError(t, serializer.Init()) // expected results use LF endings diff --git a/plugins/serializers/csv/testcases/ordered.conf b/plugins/serializers/csv/testcases/ordered.conf new file mode 100644 index 000000000..e5716ef89 --- /dev/null +++ b/plugins/serializers/csv/testcases/ordered.conf @@ -0,0 +1,11 @@ +# Example for outputting CSV with a specified column order +# +# Output File: +# testcases/ordered.csv +# +# Input: +# mymetric,machine=A1,host=1cbbb3796fc2 pressure=987.5,temperature=23.7,hours=15i 1653643420000000000 +# mymetric,machine=X9,host=83d2e491ca01 pressure=1022.6,temperature=39.9,hours=231i 1653646789000000000 + +csv_timestamp_format = "unix_ns" +csv_columns = ["timestamp", "field.temperature", "field.pressure", "tag.machine"] diff --git a/plugins/serializers/csv/testcases/ordered.csv b/plugins/serializers/csv/testcases/ordered.csv new file mode 100644 index 000000000..894509fe0 --- /dev/null +++ b/plugins/serializers/csv/testcases/ordered.csv @@ -0,0 +1,2 @@ +1653643420000000000,23.7,987.5,A1 +1653646789000000000,39.9,1022.6,X9 diff --git a/plugins/serializers/csv/testcases/ordered_not_exist.conf b/plugins/serializers/csv/testcases/ordered_not_exist.conf new file mode 100644 index 000000000..ceef3b0df --- /dev/null +++ b/plugins/serializers/csv/testcases/ordered_not_exist.conf @@ -0,0 +1,12 @@ +# Example for outputting CSV with a specified column order +# +# Output File: +# testcases/ordered_not_exist.csv +# +# Input: +# mymetric,machine=A1,host=1cbbb3796fc2 pressure=987.5,temperature=23.7,hours=15i 1653643420000000000 +# mymetric,machine=X9,host=83d2e491ca01 status="healthy",pressure=1022.6,temperature=39.9,hours=231i 1653646789000000000 + +csv_timestamp_format = "unix_ns" +csv_header = true +csv_columns = ["timestamp", "field.temperature", "field.pressure", "field.status", "tag.location", "tag.machine"] diff --git a/plugins/serializers/csv/testcases/ordered_not_exist.csv b/plugins/serializers/csv/testcases/ordered_not_exist.csv new file mode 100644 index 000000000..f84696356 --- /dev/null +++ b/plugins/serializers/csv/testcases/ordered_not_exist.csv @@ -0,0 +1,3 @@ +timestamp,temperature,pressure,status,location,machine +1653643420000000000,23.7,987.5,,,A1 +1653646789000000000,39.9,1022.6,healthy,,X9 diff --git a/plugins/serializers/csv/testcases/ordered_with_header.conf b/plugins/serializers/csv/testcases/ordered_with_header.conf new file mode 100644 index 000000000..41d43b82d --- /dev/null +++ b/plugins/serializers/csv/testcases/ordered_with_header.conf @@ -0,0 +1,12 @@ +# Example for outputting CSV with a specified column order +# +# Output File: +# testcases/ordered_with_header.csv +# +# Input: +# mymetric,machine=A1,host=1cbbb3796fc2 pressure=987.5,temperature=23.7,hours=15i 1653643420000000000 +# mymetric,machine=X9,host=83d2e491ca01 pressure=1022.6,temperature=39.9,hours=231i 1653646789000000000 + +csv_timestamp_format = "unix_ns" +csv_header = true +csv_columns = ["timestamp", "field.temperature", "field.pressure", "tag.machine"] diff --git a/plugins/serializers/csv/testcases/ordered_with_header.csv b/plugins/serializers/csv/testcases/ordered_with_header.csv new file mode 100644 index 000000000..3517384a8 --- /dev/null +++ b/plugins/serializers/csv/testcases/ordered_with_header.csv @@ -0,0 +1,3 @@ +timestamp,temperature,pressure,machine +1653643420000000000,23.7,987.5,A1 +1653646789000000000,39.9,1022.6,X9 diff --git a/plugins/serializers/csv/testcases/ordered_with_header_prefix.conf b/plugins/serializers/csv/testcases/ordered_with_header_prefix.conf new file mode 100644 index 000000000..57ad090fd --- /dev/null +++ b/plugins/serializers/csv/testcases/ordered_with_header_prefix.conf @@ -0,0 +1,13 @@ +# Example for outputting CSV with a specified column order +# +# Output File: +# testcases/ordered_with_header_prefix.csv +# +# Input: +# mymetric,machine=A1,host=1cbbb3796fc2 pressure=987.5,temperature=23.7,hours=15i 1653643420000000000 +# mymetric,machine=X9,host=83d2e491ca01 pressure=1022.6,temperature=39.9,hours=231i 1653646789000000000 + +csv_timestamp_format = "unix_ns" +csv_header = true +csv_column_prefix = true +csv_columns = ["timestamp", "field.temperature", "field.pressure", "tag.machine"] diff --git a/plugins/serializers/csv/testcases/ordered_with_header_prefix.csv b/plugins/serializers/csv/testcases/ordered_with_header_prefix.csv new file mode 100644 index 000000000..221c48a61 --- /dev/null +++ b/plugins/serializers/csv/testcases/ordered_with_header_prefix.csv @@ -0,0 +1,3 @@ +timestamp,field_temperature,field_pressure,tag_machine +1653643420000000000,23.7,987.5,A1 +1653646789000000000,39.9,1022.6,X9