feat: Add CSV serializer (#11307)

This commit is contained in:
Sven Rebhan 2022-06-22 21:06:50 +02:00 committed by GitHub
parent db23718f14
commit 48fa1990ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 512 additions and 2 deletions

View File

@ -1676,8 +1676,11 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)
c.getFieldStringSlice(tbl, "templates", &sc.Templates) c.getFieldStringSlice(tbl, "templates", &sc.Templates)
c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format) c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format)
c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar) c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar)
c.getFieldBool(tbl, "csv_column_prefix", &sc.CSVPrefix)
c.getFieldBool(tbl, "csv_header", &sc.CSVHeader)
c.getFieldString(tbl, "csv_separator", &sc.CSVSeparator)
c.getFieldString(tbl, "csv_timestamp_format", &sc.TimestampFormat)
c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes) c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes)
c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields) c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields)
c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport) c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport)
c.getFieldBool(tbl, "graphite_tag_support", &sc.GraphiteTagSupport) c.getFieldBool(tbl, "graphite_tag_support", &sc.GraphiteTagSupport)
@ -1744,6 +1747,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file", case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter", "collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"collection_offset", "collection_offset",
"csv_separator", "csv_header", "csv_column_prefix", "csv_timestamp_format",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path", "data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",
"dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path", "dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path",
"fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys", "fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys",

View File

@ -6,6 +6,7 @@ plugins.
1. [InfluxDB Line Protocol](/plugins/serializers/influx) 1. [InfluxDB Line Protocol](/plugins/serializers/influx)
1. [Carbon2](/plugins/serializers/carbon2) 1. [Carbon2](/plugins/serializers/carbon2)
1. [CSV](/plugins/serializers/csv)
1. [Graphite](/plugins/serializers/graphite) 1. [Graphite](/plugins/serializers/graphite)
1. [JSON](/plugins/serializers/json) 1. [JSON](/plugins/serializers/json)
1. [MessagePack](/plugins/serializers/msgpack) 1. [MessagePack](/plugins/serializers/msgpack)

View File

@ -0,0 +1,55 @@
# CSV Serializer
The `csv` output data format converts metrics into CSV lines.
## Configuration
```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "csv"
## The default timestamp format is Unix epoch time.
# Other timestamp layout can be configured using the Go language time
# layout specification from https://golang.org/pkg/time/#Time.Format
# e.g.: csv_timestamp_format = "2006-01-02T15:04:05Z07:00"
# csv_timestamp_format = "unix"
## The default separator for the CSV format.
# csv_separator = ","
## Output the CSV header in the first line.
## Enable the header when outputting metrics to a new file.
## Disable when appending to a file or when using a stateless
## output to prevent headers appearing between data lines.
# csv_header = false
## Prefix tag and field columns with "tag_" and "field_" respectively.
## This can be helpful if you need to know the "type" of a column.
# csv_column_prefix = false
```
## Examples
Standard form:
```csv
1458229140,docker,raynor,30,4,...,59,660
```
When an output plugin needs to emit multiple metrics at one time, it may use
the batch format. The use of batch format is determined by the plugin,
reference the documentation for the specific plugin. With `csv_header = true`
you get
```csv
timestamp,measurement,host,field_1,field_2,...,field_N,n_images
1458229140,docker,raynor,30,4,...,59,660
1458229143,docker,raynor,28,5,...,60,665
```

View File

@ -0,0 +1,176 @@
package csv
import (
"bytes"
"encoding/csv"
"fmt"
"runtime"
"sort"
"strconv"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
)
type Serializer struct {
TimestampFormat string `toml:"csv_timestamp_format"`
Separator string `toml:"csv_separator"`
Header bool `toml:"csv_header"`
Prefix bool `toml:"csv_column_prefix"`
buffer bytes.Buffer
writer *csv.Writer
}
func NewSerializer(timestampFormat, separator string, header, prefix bool) (*Serializer, error) {
// Setting defaults
if separator == "" {
separator = ","
}
// Check inputs
if len(separator) > 1 {
return nil, fmt.Errorf("invalid separator %q", separator)
}
switch timestampFormat {
case "":
timestampFormat = "unix"
case "unix", "unix_ms", "unix_us", "unix_ns":
default:
if time.Now().Format(timestampFormat) == timestampFormat {
return nil, fmt.Errorf("invalid timestamp format %q", timestampFormat)
}
}
s := &Serializer{
TimestampFormat: timestampFormat,
Separator: separator,
Header: header,
Prefix: prefix,
}
// Initialize the writer
s.writer = csv.NewWriter(&s.buffer)
s.writer.Comma = []rune(separator)[0]
s.writer.UseCRLF = runtime.GOOS == "windows"
return s, nil
}
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
// Clear the buffer
s.buffer.Truncate(0)
// Write the header if the user wants us to
if s.Header {
if err := s.writeHeader(metric); err != nil {
return nil, fmt.Errorf("writing header failed: %w", err)
}
s.Header = false
}
// Write the data
if err := s.writeData(metric); err != nil {
return nil, fmt.Errorf("writing data failed: %w", err)
}
// Finish up
s.writer.Flush()
return s.buffer.Bytes(), nil
}
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
if len(metrics) < 1 {
return nil, nil
}
// Clear the buffer
s.buffer.Truncate(0)
// Write the header if the user wants us to
if s.Header {
if err := s.writeHeader(metrics[0]); err != nil {
return nil, fmt.Errorf("writing header failed: %w", err)
}
s.Header = false
}
for _, m := range metrics {
if err := s.writeData(m); err != nil {
return nil, fmt.Errorf("writing data failed: %w", err)
}
}
// Finish up
s.writer.Flush()
return s.buffer.Bytes(), nil
}
func (s *Serializer) writeHeader(metric telegraf.Metric) error {
columns := []string{
"timestamp",
"measurement",
}
for _, tag := range metric.TagList() {
if s.Prefix {
columns = append(columns, "tag_"+tag.Key)
} else {
columns = append(columns, tag.Key)
}
}
// Sort the fields by name
sort.Slice(metric.FieldList(), func(i, j int) bool {
return metric.FieldList()[i].Key < metric.FieldList()[j].Key
})
for _, field := range metric.FieldList() {
if s.Prefix {
columns = append(columns, "field_"+field.Key)
} else {
columns = append(columns, field.Key)
}
}
return s.writer.Write(columns)
}
func (s *Serializer) writeData(metric telegraf.Metric) error {
var timestamp string
// Format the time
switch s.TimestampFormat {
case "unix":
timestamp = strconv.FormatInt(metric.Time().Unix(), 10)
case "unix_ms":
timestamp = strconv.FormatInt(metric.Time().UnixNano()/1_000_000, 10)
case "unix_us":
timestamp = strconv.FormatInt(metric.Time().UnixNano()/1_000, 10)
case "unix_ns":
timestamp = strconv.FormatInt(metric.Time().UnixNano(), 10)
default:
timestamp = metric.Time().UTC().Format(s.TimestampFormat)
}
columns := []string{
timestamp,
metric.Name(),
}
for _, tag := range metric.TagList() {
columns = append(columns, tag.Value)
}
// Sort the fields by name
sort.Slice(metric.FieldList(), func(i, j int) bool {
return metric.FieldList()[i].Key < metric.FieldList()[j].Key
})
for _, field := range metric.FieldList() {
v, err := internal.ToString(field.Value)
if err != nil {
return fmt.Errorf("converting field %q to string failed: %w", field.Key, err)
}
columns = append(columns, v)
}
return s.writer.Write(columns)
}

View File

@ -0,0 +1,181 @@
package csv
import (
"bytes"
"os"
"path/filepath"
"strings"
"testing"
"github.com/influxdata/toml"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
func TestInvalidTimestampFormat(t *testing.T) {
_, err := NewSerializer("garbage", "", false, false)
require.EqualError(t, err, `invalid timestamp format "garbage"`)
}
func TestInvalidSeparator(t *testing.T) {
_, err := NewSerializer("", "garbage", false, false)
require.EqualError(t, err, `invalid separator "garbage"`)
serializer, err := NewSerializer("", "\n", false, false)
require.NoError(t, err)
_, err = serializer.Serialize(testutil.TestMetric(42.3, "test"))
require.EqualError(t, err, "writing data failed: csv: invalid field or comment delimiter")
}
func TestSerializeTransformationNonBatch(t *testing.T) {
var tests = []struct {
name string
filename string
}{
{
name: "basic",
filename: "testcases/basic.conf",
},
{
name: "unix nanoseconds timestamp",
filename: "testcases/nanoseconds.conf",
},
{
name: "header",
filename: "testcases/header.conf",
},
{
name: "header with prefix",
filename: "testcases/prefix.conf",
},
{
name: "header and RFC3339 timestamp",
filename: "testcases/rfc3339.conf",
},
{
name: "header and semicolon",
filename: "testcases/semicolon.conf",
},
}
parser := influx.NewParser(influx.NewMetricHandler())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filename := filepath.FromSlash(tt.filename)
cfg, header, err := loadTestConfiguration(filename)
require.NoError(t, err)
// Get the input metrics
metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser)
require.NoError(t, err)
// Get the expectations
expectedFn, err := testutil.ParseRawLinesFrom(header, "Output File:")
require.NoError(t, err)
require.Len(t, expectedFn, 1, "only a single output file is supported")
expected, err := loadCSV(expectedFn[0])
require.NoError(t, err)
// Serialize
serializer, err := NewSerializer(cfg.TimestampFormat, cfg.Separator, cfg.Header, cfg.Prefix)
require.NoError(t, err)
var actual bytes.Buffer
for _, m := range metrics {
buf, err := serializer.Serialize(m)
require.NoError(t, err)
_, err = actual.ReadFrom(bytes.NewReader(buf))
require.NoError(t, err)
}
// Compare
require.EqualValues(t, string(expected), actual.String())
})
}
}
func TestSerializeTransformationBatch(t *testing.T) {
var tests = []struct {
name string
filename string
}{
{
name: "basic",
filename: "testcases/basic.conf",
},
{
name: "unix nanoseconds timestamp",
filename: "testcases/nanoseconds.conf",
},
{
name: "header",
filename: "testcases/header.conf",
},
{
name: "header with prefix",
filename: "testcases/prefix.conf",
},
{
name: "header and RFC3339 timestamp",
filename: "testcases/rfc3339.conf",
},
{
name: "header and semicolon",
filename: "testcases/semicolon.conf",
},
}
parser := influx.NewParser(influx.NewMetricHandler())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filename := filepath.FromSlash(tt.filename)
cfg, header, err := loadTestConfiguration(filename)
require.NoError(t, err)
// Get the input metrics
metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser)
require.NoError(t, err)
// Get the expectations
expectedFn, err := testutil.ParseRawLinesFrom(header, "Output File:")
require.NoError(t, err)
require.Len(t, expectedFn, 1, "only a single output file is supported")
expected, err := loadCSV(expectedFn[0])
require.NoError(t, err)
// Serialize
serializer, err := NewSerializer(cfg.TimestampFormat, cfg.Separator, cfg.Header, cfg.Prefix)
require.NoError(t, err)
actual, err := serializer.SerializeBatch(metrics)
require.NoError(t, err)
// Compare
require.EqualValues(t, string(expected), string(actual))
})
}
}
type Config Serializer
func loadTestConfiguration(filename string) (*Config, []string, error) {
buf, err := os.ReadFile(filename)
if err != nil {
return nil, nil, err
}
header := make([]string, 0)
for _, line := range strings.Split(string(buf), "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "#") {
header = append(header, line)
}
}
var cfg Config
err = toml.Unmarshal(buf, &cfg)
return &cfg, header, err
}
func loadCSV(filename string) ([]byte, error) {
return os.ReadFile(filename)
}

View File

@ -0,0 +1,8 @@
# Example for outputting CSV
#
# Output File:
# testcases/basic.csv
#
# Input:
# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000
# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000

View File

@ -0,0 +1,2 @@
1653643420,impression,F5,1cbbb3796fc2,12345,Java,4.9.1,false,5
1653646789,expression,E42,klaus,67890,Golang,1.18.3,true,42
1 1653643420 impression F5 1cbbb3796fc2 12345 Java 4.9.1 false 5
2 1653646789 expression E42 klaus 67890 Golang 1.18.3 true 42

View File

@ -0,0 +1,10 @@
# Example for outputting CSV in non-batch mode.
#
# Output File:
# testcases/header.csv
#
# Input:
# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000
# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000
csv_header = true

View File

@ -0,0 +1,3 @@
timestamp,measurement,flagname,host,key,platform,sdkver,value,count_sum
1653643420,impression,F5,1cbbb3796fc2,12345,Java,4.9.1,false,5
1653646789,expression,E42,klaus,67890,Golang,1.18.3,true,42
1 timestamp measurement flagname host key platform sdkver value count_sum
2 1653643420 impression F5 1cbbb3796fc2 12345 Java 4.9.1 false 5
3 1653646789 expression E42 klaus 67890 Golang 1.18.3 true 42

View File

@ -0,0 +1,10 @@
# Example for outputting CSV
#
# Output File:
# testcases/nanoseconds.csv
#
# Input:
# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420123456
# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789789012
csv_timestamp_format = "unix_ns"

View File

@ -0,0 +1,2 @@
1653643420123456,impression,F5,1cbbb3796fc2,12345,Java,4.9.1,false,5
1653646789789012,expression,E42,klaus,67890,Golang,1.18.3,true,42
1 1653643420123456 impression F5 1cbbb3796fc2 12345 Java 4.9.1 false 5
2 1653646789789012 expression E42 klaus 67890 Golang 1.18.3 true 42

View File

@ -0,0 +1,11 @@
# Example for outputting CSV in non-batch mode.
#
# Output File:
# testcases/prefix.csv
#
# Input:
# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000
# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000
csv_header = true
csv_column_prefix = true

View File

@ -0,0 +1,3 @@
timestamp,measurement,tag_flagname,tag_host,tag_key,tag_platform,tag_sdkver,tag_value,field_count_sum
1653643420,impression,F5,1cbbb3796fc2,12345,Java,4.9.1,false,5
1653646789,expression,E42,klaus,67890,Golang,1.18.3,true,42
1 timestamp measurement tag_flagname tag_host tag_key tag_platform tag_sdkver tag_value field_count_sum
2 1653643420 impression F5 1cbbb3796fc2 12345 Java 4.9.1 false 5
3 1653646789 expression E42 klaus 67890 Golang 1.18.3 true 42

View File

@ -0,0 +1,11 @@
# Example for outputting CSV in non-batch mode.
#
# Output File:
# testcases/rfc3339.csv
#
# Input:
# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000
# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000
csv_timestamp_format = "2006-01-02T15:04:05Z07:00"
csv_header = true

View File

@ -0,0 +1,3 @@
timestamp,measurement,flagname,host,key,platform,sdkver,value,count_sum
2022-05-27T09:23:40Z,impression,F5,1cbbb3796fc2,12345,Java,4.9.1,false,5
2022-05-27T10:19:49Z,expression,E42,klaus,67890,Golang,1.18.3,true,42
1 timestamp measurement flagname host key platform sdkver value count_sum
2 2022-05-27T09:23:40Z impression F5 1cbbb3796fc2 12345 Java 4.9.1 false 5
3 2022-05-27T10:19:49Z expression E42 klaus 67890 Golang 1.18.3 true 42

View File

@ -0,0 +1,11 @@
# Example for outputting CSV in non-batch mode.
#
# Output File:
# testcases/semicolon.csv
#
# Input:
# impression,flagname=F5,host=1cbbb3796fc2,key=12345,platform=Java,sdkver=4.9.1,value=false count_sum=5i 1653643420000000000
# expression,flagname=E42,host=klaus,key=67890,platform=Golang,sdkver=1.18.3,value=true count_sum=42i 1653646789000000000
csv_separator = ";"
csv_header = true

View File

@ -0,0 +1,3 @@
timestamp;measurement;flagname;host;key;platform;sdkver;value;count_sum
1653643420;impression;F5;1cbbb3796fc2;12345;Java;4.9.1;false;5
1653646789;expression;E42;klaus;67890;Golang;1.18.3;true;42
1 timestamp measurement flagname host key platform sdkver value count_sum
2 1653643420 impression F5 1cbbb3796fc2 12345 Java 4.9.1 false 5
3 1653646789 expression E42 klaus 67890 Golang 1.18.3 true 42

View File

@ -6,6 +6,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/carbon2" "github.com/influxdata/telegraf/plugins/serializers/carbon2"
"github.com/influxdata/telegraf/plugins/serializers/csv"
"github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/plugins/serializers/json"
@ -56,6 +57,15 @@ type Config struct {
// Character used for metric name sanitization in Carbon2. // Character used for metric name sanitization in Carbon2.
Carbon2SanitizeReplaceChar string `toml:"carbon2_sanitize_replace_char"` Carbon2SanitizeReplaceChar string `toml:"carbon2_sanitize_replace_char"`
// Separator for CSV
CSVSeparator string `toml:"csv_separator"`
// Output a CSV header for naming the columns
CSVHeader bool `toml:"csv_header"`
// Prefix the tag and field columns for CSV format
CSVPrefix bool `toml:"csv_column_prefix"`
// Support tags in graphite protocol // Support tags in graphite protocol
GraphiteTagSupport bool `toml:"graphite_tag_support"` GraphiteTagSupport bool `toml:"graphite_tag_support"`
@ -88,7 +98,7 @@ type Config struct {
// Timestamp units to use for JSON formatted output // Timestamp units to use for JSON formatted output
TimestampUnits time.Duration `toml:"timestamp_units"` TimestampUnits time.Duration `toml:"timestamp_units"`
// Timestamp format to use for JSON formatted output // Timestamp format to use for JSON and CSV formatted output
TimestampFormat string `toml:"timestamp_format"` TimestampFormat string `toml:"timestamp_format"`
// Include HEC routing fields for splunkmetric output // Include HEC routing fields for splunkmetric output
@ -124,6 +134,8 @@ func NewSerializer(config *Config) (Serializer, error) {
var err error var err error
var serializer Serializer var serializer Serializer
switch config.DataFormat { switch config.DataFormat {
case "csv":
serializer, err = NewCSVSerializer(config)
case "influx": case "influx":
serializer, err = NewInfluxSerializerConfig(config) serializer, err = NewInfluxSerializerConfig(config)
case "graphite": case "graphite":
@ -150,6 +162,10 @@ func NewSerializer(config *Config) (Serializer, error) {
return serializer, err return serializer, err
} }
func NewCSVSerializer(config *Config) (Serializer, error) {
return csv.NewSerializer(config.TimestampFormat, config.CSVSeparator, config.CSVHeader, config.CSVPrefix)
}
func NewPrometheusRemoteWriteSerializer(config *Config) (Serializer, error) { func NewPrometheusRemoteWriteSerializer(config *Config) (Serializer, error) {
sortMetrics := prometheusremotewrite.NoSortMetrics sortMetrics := prometheusremotewrite.NoSortMetrics
if config.PrometheusExportTimestamp { if config.PrometheusExportTimestamp {