feat(serializers.csv): Allow specifying fixed column order (#14870)
This commit is contained in:
parent
5847e1d9ee
commit
2942f84aa3
|
|
@ -33,6 +33,20 @@ The `csv` output data format converts metrics into CSV lines.
|
||||||
## Prefix tag and field columns with "tag_" and "field_" respectively.
|
## Prefix tag and field columns with "tag_" and "field_" respectively.
|
||||||
## This can be helpful if you need to know the "type" of a column.
|
## This can be helpful if you need to know the "type" of a column.
|
||||||
# csv_column_prefix = false
|
# csv_column_prefix = false
|
||||||
|
|
||||||
|
## Use the specified order for the columns.
|
||||||
|
## This can be helpful if you need a specific output order. To specify tags,
|
||||||
|
## use a `tag.` prefix, for fields use a `field.` prefix and use `name` and
|
||||||
|
## `timestamp` to reference the measurement name and timestamp respectively.
|
||||||
|
## NOTE: The output will only contain the specified tags, fields, etc. All
|
||||||
|
## other data will be dropped. In case a tag or field does not exist,
|
||||||
|
## the column will be empty.
|
||||||
|
## ex. csv_columns = ["timestamp", "tag.host", "field.value"]
|
||||||
|
##
|
||||||
|
## By default all metric data will be written in the order:
|
||||||
|
## timestamp, name, tags..., fields...
|
||||||
|
## with tags and fields being ordered alphabetically.
|
||||||
|
# csv_columns = []
|
||||||
```
|
```
|
||||||
|
|
||||||
## Examples
|
## Examples
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
|
|
@ -16,10 +17,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Serializer struct {
|
type Serializer struct {
|
||||||
TimestampFormat string `toml:"csv_timestamp_format"`
|
TimestampFormat string `toml:"csv_timestamp_format"`
|
||||||
Separator string `toml:"csv_separator"`
|
Separator string `toml:"csv_separator"`
|
||||||
Header bool `toml:"csv_header"`
|
Header bool `toml:"csv_header"`
|
||||||
Prefix bool `toml:"csv_column_prefix"`
|
Prefix bool `toml:"csv_column_prefix"`
|
||||||
|
Columns []string `toml:"csv_columns"`
|
||||||
|
|
||||||
buffer bytes.Buffer
|
buffer bytes.Buffer
|
||||||
writer *csv.Writer
|
writer *csv.Writer
|
||||||
|
|
@ -45,6 +47,17 @@ func (s *Serializer) Init() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check columns if any
|
||||||
|
for _, name := range s.Columns {
|
||||||
|
switch {
|
||||||
|
case name == "timestamp", name == "name",
|
||||||
|
strings.HasPrefix(name, "tag."),
|
||||||
|
strings.HasPrefix(name, "field."):
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("invalid column reference %q", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize the writer
|
// Initialize the writer
|
||||||
s.writer = csv.NewWriter(&s.buffer)
|
s.writer = csv.NewWriter(&s.buffer)
|
||||||
s.writer.Comma, _ = utf8.DecodeRuneInString(s.Separator)
|
s.writer.Comma, _ = utf8.DecodeRuneInString(s.Separator)
|
||||||
|
|
@ -54,25 +67,7 @@ func (s *Serializer) Init() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
// Clear the buffer
|
return s.SerializeBatch([]telegraf.Metric{metric})
|
||||||
s.buffer.Truncate(0)
|
|
||||||
|
|
||||||
// Write the header if the user wants us to
|
|
||||||
if s.Header {
|
|
||||||
if err := s.writeHeader(metric); err != nil {
|
|
||||||
return nil, fmt.Errorf("writing header failed: %w", err)
|
|
||||||
}
|
|
||||||
s.Header = false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write the data
|
|
||||||
if err := s.writeData(metric); err != nil {
|
|
||||||
return nil, fmt.Errorf("writing data failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Finish up
|
|
||||||
s.writer.Flush()
|
|
||||||
return s.buffer.Bytes(), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
|
|
@ -85,15 +80,27 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
|
|
||||||
// Write the header if the user wants us to
|
// Write the header if the user wants us to
|
||||||
if s.Header {
|
if s.Header {
|
||||||
if err := s.writeHeader(metrics[0]); err != nil {
|
if len(s.Columns) > 0 {
|
||||||
return nil, fmt.Errorf("writing header failed: %w", err)
|
if err := s.writeHeaderOrdered(); err != nil {
|
||||||
|
return nil, fmt.Errorf("writing header failed: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := s.writeHeader(metrics[0]); err != nil {
|
||||||
|
return nil, fmt.Errorf("writing header failed: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
s.Header = false
|
s.Header = false
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
if err := s.writeData(m); err != nil {
|
if len(s.Columns) > 0 {
|
||||||
return nil, fmt.Errorf("writing data failed: %w", err)
|
if err := s.writeDataOrdered(m); err != nil {
|
||||||
|
return nil, fmt.Errorf("writing data failed: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := s.writeData(m); err != nil {
|
||||||
|
return nil, fmt.Errorf("writing data failed: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -130,6 +137,21 @@ func (s *Serializer) writeHeader(metric telegraf.Metric) error {
|
||||||
return s.writer.Write(columns)
|
return s.writer.Write(columns)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Serializer) writeHeaderOrdered() error {
|
||||||
|
columns := make([]string, 0, len(s.Columns))
|
||||||
|
for _, name := range s.Columns {
|
||||||
|
if s.Prefix {
|
||||||
|
name = strings.ReplaceAll(name, ".", "_")
|
||||||
|
} else {
|
||||||
|
name = strings.TrimPrefix(name, "tag.")
|
||||||
|
name = strings.TrimPrefix(name, "field.")
|
||||||
|
}
|
||||||
|
columns = append(columns, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.writer.Write(columns)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Serializer) writeData(metric telegraf.Metric) error {
|
func (s *Serializer) writeData(metric telegraf.Metric) error {
|
||||||
var timestamp string
|
var timestamp string
|
||||||
|
|
||||||
|
|
@ -170,6 +192,50 @@ func (s *Serializer) writeData(metric telegraf.Metric) error {
|
||||||
return s.writer.Write(columns)
|
return s.writer.Write(columns)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Serializer) writeDataOrdered(metric telegraf.Metric) error {
|
||||||
|
var timestamp string
|
||||||
|
|
||||||
|
// Format the time
|
||||||
|
switch s.TimestampFormat {
|
||||||
|
case "unix":
|
||||||
|
timestamp = strconv.FormatInt(metric.Time().Unix(), 10)
|
||||||
|
case "unix_ms":
|
||||||
|
timestamp = strconv.FormatInt(metric.Time().UnixNano()/1_000_000, 10)
|
||||||
|
case "unix_us":
|
||||||
|
timestamp = strconv.FormatInt(metric.Time().UnixNano()/1_000, 10)
|
||||||
|
case "unix_ns":
|
||||||
|
timestamp = strconv.FormatInt(metric.Time().UnixNano(), 10)
|
||||||
|
default:
|
||||||
|
timestamp = metric.Time().UTC().Format(s.TimestampFormat)
|
||||||
|
}
|
||||||
|
|
||||||
|
columns := make([]string, 0, len(s.Columns))
|
||||||
|
for _, name := range s.Columns {
|
||||||
|
switch {
|
||||||
|
case name == "timestamp":
|
||||||
|
columns = append(columns, timestamp)
|
||||||
|
case name == "name":
|
||||||
|
columns = append(columns, metric.Name())
|
||||||
|
case strings.HasPrefix(name, "tag."):
|
||||||
|
v, _ := metric.GetTag(strings.TrimPrefix(name, "tag."))
|
||||||
|
columns = append(columns, v)
|
||||||
|
case strings.HasPrefix(name, "field."):
|
||||||
|
var v string
|
||||||
|
field := strings.TrimPrefix(name, "field.")
|
||||||
|
if raw, ok := metric.GetField(field); ok {
|
||||||
|
var err error
|
||||||
|
v, err = internal.ToString(raw)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("converting field %q to string failed: %w", field, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
columns = append(columns, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.writer.Write(columns)
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
serializers.Add("csv",
|
serializers.Add("csv",
|
||||||
func() serializers.Serializer {
|
func() serializers.Serializer {
|
||||||
|
|
|
||||||
|
|
@ -66,6 +66,22 @@ func TestSerializeTransformationNonBatch(t *testing.T) {
|
||||||
name: "header and semicolon",
|
name: "header and semicolon",
|
||||||
filename: "testcases/semicolon.conf",
|
filename: "testcases/semicolon.conf",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "ordered without header",
|
||||||
|
filename: "testcases/ordered.conf",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ordered with header",
|
||||||
|
filename: "testcases/ordered_with_header.conf",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ordered with header and prefix",
|
||||||
|
filename: "testcases/ordered_with_header_prefix.conf",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ordered non-existing fields and tags",
|
||||||
|
filename: "testcases/ordered_not_exist.conf",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
parser := &influx.Parser{}
|
parser := &influx.Parser{}
|
||||||
require.NoError(t, parser.Init())
|
require.NoError(t, parser.Init())
|
||||||
|
|
@ -93,6 +109,7 @@ func TestSerializeTransformationNonBatch(t *testing.T) {
|
||||||
Separator: cfg.Separator,
|
Separator: cfg.Separator,
|
||||||
Header: cfg.Header,
|
Header: cfg.Header,
|
||||||
Prefix: cfg.Prefix,
|
Prefix: cfg.Prefix,
|
||||||
|
Columns: cfg.Columns,
|
||||||
}
|
}
|
||||||
require.NoError(t, serializer.Init())
|
require.NoError(t, serializer.Init())
|
||||||
// expected results use LF endings
|
// expected results use LF endings
|
||||||
|
|
@ -139,6 +156,22 @@ func TestSerializeTransformationBatch(t *testing.T) {
|
||||||
name: "header and semicolon",
|
name: "header and semicolon",
|
||||||
filename: "testcases/semicolon.conf",
|
filename: "testcases/semicolon.conf",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "ordered without header",
|
||||||
|
filename: "testcases/ordered.conf",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ordered with header",
|
||||||
|
filename: "testcases/ordered_with_header.conf",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ordered with header and prefix",
|
||||||
|
filename: "testcases/ordered_with_header_prefix.conf",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ordered non-existing fields and tags",
|
||||||
|
filename: "testcases/ordered_not_exist.conf",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
parser := &influx.Parser{}
|
parser := &influx.Parser{}
|
||||||
require.NoError(t, parser.Init())
|
require.NoError(t, parser.Init())
|
||||||
|
|
@ -166,6 +199,7 @@ func TestSerializeTransformationBatch(t *testing.T) {
|
||||||
Separator: cfg.Separator,
|
Separator: cfg.Separator,
|
||||||
Header: cfg.Header,
|
Header: cfg.Header,
|
||||||
Prefix: cfg.Prefix,
|
Prefix: cfg.Prefix,
|
||||||
|
Columns: cfg.Columns,
|
||||||
}
|
}
|
||||||
require.NoError(t, serializer.Init())
|
require.NoError(t, serializer.Init())
|
||||||
// expected results use LF endings
|
// expected results use LF endings
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
# Example for outputting CSV with a specified column order
|
||||||
|
#
|
||||||
|
# Output File:
|
||||||
|
# testcases/ordered.csv
|
||||||
|
#
|
||||||
|
# Input:
|
||||||
|
# mymetric,machine=A1,host=1cbbb3796fc2 pressure=987.5,temperature=23.7,hours=15i 1653643420000000000
|
||||||
|
# mymetric,machine=X9,host=83d2e491ca01 pressure=1022.6,temperature=39.9,hours=231i 1653646789000000000
|
||||||
|
|
||||||
|
csv_timestamp_format = "unix_ns"
|
||||||
|
csv_columns = ["timestamp", "field.temperature", "field.pressure", "tag.machine"]
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
1653643420000000000,23.7,987.5,A1
|
||||||
|
1653646789000000000,39.9,1022.6,X9
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
# Example for outputting CSV with a specified column order
|
||||||
|
#
|
||||||
|
# Output File:
|
||||||
|
# testcases/ordered_not_exist.csv
|
||||||
|
#
|
||||||
|
# Input:
|
||||||
|
# mymetric,machine=A1,host=1cbbb3796fc2 pressure=987.5,temperature=23.7,hours=15i 1653643420000000000
|
||||||
|
# mymetric,machine=X9,host=83d2e491ca01 status="healthy",pressure=1022.6,temperature=39.9,hours=231i 1653646789000000000
|
||||||
|
|
||||||
|
csv_timestamp_format = "unix_ns"
|
||||||
|
csv_header = true
|
||||||
|
csv_columns = ["timestamp", "field.temperature", "field.pressure", "field.status", "tag.location", "tag.machine"]
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
timestamp,temperature,pressure,status,location,machine
|
||||||
|
1653643420000000000,23.7,987.5,,,A1
|
||||||
|
1653646789000000000,39.9,1022.6,healthy,,X9
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
# Example for outputting CSV with a specified column order
|
||||||
|
#
|
||||||
|
# Output File:
|
||||||
|
# testcases/ordered_with_header.csv
|
||||||
|
#
|
||||||
|
# Input:
|
||||||
|
# mymetric,machine=A1,host=1cbbb3796fc2 pressure=987.5,temperature=23.7,hours=15i 1653643420000000000
|
||||||
|
# mymetric,machine=X9,host=83d2e491ca01 pressure=1022.6,temperature=39.9,hours=231i 1653646789000000000
|
||||||
|
|
||||||
|
csv_timestamp_format = "unix_ns"
|
||||||
|
csv_header = true
|
||||||
|
csv_columns = ["timestamp", "field.temperature", "field.pressure", "tag.machine"]
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
timestamp,temperature,pressure,machine
|
||||||
|
1653643420000000000,23.7,987.5,A1
|
||||||
|
1653646789000000000,39.9,1022.6,X9
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
# Example for outputting CSV with a specified column order
|
||||||
|
#
|
||||||
|
# Output File:
|
||||||
|
# testcases/ordered_with_header_prefix.csv
|
||||||
|
#
|
||||||
|
# Input:
|
||||||
|
# mymetric,machine=A1,host=1cbbb3796fc2 pressure=987.5,temperature=23.7,hours=15i 1653643420000000000
|
||||||
|
# mymetric,machine=X9,host=83d2e491ca01 pressure=1022.6,temperature=39.9,hours=231i 1653646789000000000
|
||||||
|
|
||||||
|
csv_timestamp_format = "unix_ns"
|
||||||
|
csv_header = true
|
||||||
|
csv_column_prefix = true
|
||||||
|
csv_columns = ["timestamp", "field.temperature", "field.pressure", "tag.machine"]
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
timestamp,field_temperature,field_pressure,tag_machine
|
||||||
|
1653643420000000000,23.7,987.5,A1
|
||||||
|
1653646789000000000,39.9,1022.6,X9
|
||||||
|
Loading…
Reference in New Issue