feat(parsers/csv): Add metadata support to CSV parser plugin (#10083)

This commit is contained in:
Ehsan 2022-02-24 11:28:16 +10:00 committed by GitHub
parent 10cc56039a
commit 5adecc3cd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 412 additions and 43 deletions

View File

@ -377,7 +377,7 @@ func TestHTTPWithCSVFormat(t *testing.T) {
plugin.SetParserFunc(func() (telegraf.Parser, error) {
parser := &csv.Parser{
MetricName: "metricName",
SkipRows: 2,
SkipRows: 3,
ColumnNames: []string{"a", "b", "c"},
TagColumns: []string{"c"},
}

View File

@ -33,8 +33,26 @@ values.
## If this is not specified, type conversion will be done on the types above.
csv_column_types = []
## Indicates the number of rows to skip before looking for header information.
## Indicates the number of rows to skip before looking for metadata and header information.
csv_skip_rows = 0
## Indicates the number of rows to parse as metadata before looking for header information.
## By default, the parser assumes there are no metadata rows to parse.
## If set, the parser would use the provided separators in the csv_metadata_separators to look for metadata.
## Please note that by default, the (key, value) pairs will be added as fields.
## Use the tag_columns to convert the metadata into tags.
csv_metadata_rows = 0
## A list of metadata separators. If csv_metadata_rows is set,
## csv_metadata_separators must contain at least one separator.
## Please note that separators are case sensitive and the sequence of the seperators are respected.
csv_metadata_separators = [":", "="]
## A set of metadata trim characters.
## If csv_metadata_trim_cutset is not set, no trimming is performed.
## Please note that the trim cutset is case sensitive.
csv_metadata_trim_set = ""
## Indicates the number of columns to skip before looking for data to parse.
## These columns will be skipped in the header as well.
@ -85,7 +103,7 @@ values.
### csv_timestamp_column, csv_timestamp_format
By default the current time will be used for all created metrics, to set the
By default, the current time will be used for all created metrics, to set the
time using the JSON document you can use the `csv_timestamp_column` and
`csv_timestamp_format` options together to set the time to a value in the parsed
document.
@ -121,15 +139,45 @@ Config:
Input:
```shell
```csv
measurement,cpu,time_user,time_system,time_idle,time
cpu,cpu0,42,42,42,2018-09-13T13:03:28Z
```
Output:
```shell
```text
cpu cpu=cpu0,time_user=42,time_system=42,time_idle=42 1536869008000000000
```
Config:
```toml
[[inputs.file]]
files = ["example"]
data_format = "csv"
csv_metadata_rows = 2
csv_metadata_separators = [":", "="]
csv_metadata_trim_set = " #"
csv_header_row_count = 1
csv_tag_columns = ["Version","File Created"]
csv_timestamp_column = "time"
csv_timestamp_format = "2006-01-02T15:04:05Z07:00"
```
Input:
```csv
# Version=1.1
# File Created: 2021-11-17T07:02:45+10:00
measurement,cpu,time_user,time_system,time_idle,time
cpu,cpu0,42,42,42,2018-09-13T13:03:28Z
```
Output:
```text
cpu,File\ Created=2021-11-17T07:02:45+10:00,Version=1.1 cpu=cpu0,time_user=42,time_system=42,time_idle=42 1536869008000000000
```
[metric filtering]: /docs/CONFIGURATION.md#metric-filtering

View File

@ -1,10 +1,12 @@
package csv
import (
"bufio"
"bytes"
"encoding/csv"
"fmt"
"io"
"sort"
"strconv"
"strings"
"time"
@ -20,28 +22,91 @@ import (
type TimeFunc func() time.Time
type Parser struct {
ColumnNames []string `toml:"csv_column_names"`
ColumnTypes []string `toml:"csv_column_types"`
Comment string `toml:"csv_comment"`
Delimiter string `toml:"csv_delimiter"`
HeaderRowCount int `toml:"csv_header_row_count"`
MeasurementColumn string `toml:"csv_measurement_column"`
MetricName string `toml:"metric_name"`
SkipColumns int `toml:"csv_skip_columns"`
SkipRows int `toml:"csv_skip_rows"`
TagColumns []string `toml:"csv_tag_columns"`
TimestampColumn string `toml:"csv_timestamp_column"`
TimestampFormat string `toml:"csv_timestamp_format"`
Timezone string `toml:"csv_timezone"`
TrimSpace bool `toml:"csv_trim_space"`
SkipValues []string `toml:"csv_skip_values"`
SkipErrors bool `toml:"csv_skip_errors"`
Log telegraf.Logger `toml:"-"`
ColumnNames []string `toml:"csv_column_names"`
ColumnTypes []string `toml:"csv_column_types"`
Comment string `toml:"csv_comment"`
Delimiter string `toml:"csv_delimiter"`
HeaderRowCount int `toml:"csv_header_row_count"`
MeasurementColumn string `toml:"csv_measurement_column"`
MetricName string `toml:"metric_name"`
SkipColumns int `toml:"csv_skip_columns"`
SkipRows int `toml:"csv_skip_rows"`
TagColumns []string `toml:"csv_tag_columns"`
TimestampColumn string `toml:"csv_timestamp_column"`
TimestampFormat string `toml:"csv_timestamp_format"`
Timezone string `toml:"csv_timezone"`
TrimSpace bool `toml:"csv_trim_space"`
SkipValues []string `toml:"csv_skip_values"`
SkipErrors bool `toml:"csv_skip_errors"`
MetadataRows int `toml:"csv_metadata_rows"`
MetadataSeparators []string `toml:"cvs_metadata_separators"`
MetadataTrimSet string `toml:"cvs_metadata_trim_set"`
Log telegraf.Logger `toml:"-"`
metadataSeparatorList metadataPattern
gotColumnNames bool
TimeFunc func() time.Time
DefaultTags map[string]string
TimeFunc func() time.Time
DefaultTags map[string]string
metadataTags map[string]string
}
type metadataPattern []string
func (record metadataPattern) Len() int {
return len(record)
}
func (record metadataPattern) Swap(i, j int) {
record[i], record[j] = record[j], record[i]
}
func (record metadataPattern) Less(i, j int) bool {
// Metadata with longer lengths should be ordered before shorter metadata
return len(record[i]) > len(record[j])
}
func (p *Parser) initializeMetadataSeparators() error {
// initialize metadata
p.metadataTags = map[string]string{}
p.metadataSeparatorList = []string{}
if p.MetadataRows <= 0 {
return nil
}
if len(p.MetadataSeparators) == 0 {
return fmt.Errorf("csv_metadata_separator required when specifying csv_metadata_rows")
}
p.metadataSeparatorList = metadataPattern{}
patternList := map[string]bool{}
for _, pattern := range p.MetadataSeparators {
if patternList[pattern] {
// Ignore further, duplicated entries
continue
}
patternList[pattern] = true
p.metadataSeparatorList = append(p.metadataSeparatorList, pattern)
}
sort.Stable(p.metadataSeparatorList)
return nil
}
func (p *Parser) parseMetadataRow(haystack string) map[string]string {
haystack = strings.TrimRight(haystack, "\r\n")
for _, needle := range p.metadataSeparatorList {
metadata := strings.SplitN(haystack, needle, 2)
if len(metadata) < 2 {
continue
}
key := strings.Trim(metadata[0], p.MetadataTrimSet)
if len(key) > 0 {
value := strings.Trim(metadata[1], p.MetadataTrimSet)
return map[string]string{key: value}
}
}
return nil
}
func (p *Parser) Init() error {
@ -67,6 +132,10 @@ func (p *Parser) Init() error {
return fmt.Errorf("csv_column_names field count doesn't match with csv_column_types")
}
if err := p.initializeMetadataSeparators(); err != nil {
return fmt.Errorf("initializing separators failed: %v", err)
}
p.gotColumnNames = len(p.ColumnNames) > 0
if p.TimeFunc == nil {
@ -99,9 +168,17 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
return parseCSV(p, r)
}
// ParseLine does not use any information in header and assumes DataColumns is set
// it will also not skip any rows
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
if len(line) == 0 {
if p.SkipRows > 0 {
p.SkipRows--
return nil, io.EOF
}
if p.MetadataRows > 0 {
p.MetadataRows--
return nil, io.EOF
}
}
r := bytes.NewReader([]byte(line))
metrics, err := parseCSV(p, r)
if err != nil {
@ -117,15 +194,28 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
}
func parseCSV(p *Parser, r io.Reader) ([]telegraf.Metric, error) {
csvReader := p.compile(r)
lineReader := bufio.NewReader(r)
// skip first rows
for p.SkipRows > 0 {
_, err := csvReader.Read()
if err != nil {
line, err := lineReader.ReadString('\n')
if err != nil && len(line) == 0 {
return nil, err
}
p.SkipRows--
}
// Parse metadata
for p.MetadataRows > 0 {
line, err := lineReader.ReadString('\n')
if err != nil && len(line) == 0 {
return nil, err
}
p.MetadataRows--
m := p.parseMetadataRow(line)
for k, v := range m {
p.metadataTags[k] = v
}
}
csvReader := p.compile(lineReader)
// if there is a header, and we did not get DataColumns
// set DataColumns to names extracted from the header
// we always reread the header to avoid side effects
@ -261,6 +351,11 @@ outer:
}
}
// add metadata fields
for k, v := range p.metadataTags {
tags[k] = v
}
// add default tags
for k, v := range p.DefaultTags {
tags[k] = v
@ -342,6 +437,9 @@ func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.Timezone = config.CSVTimezone
p.DefaultTags = config.DefaultTags
p.SkipValues = config.CSVSkipValues
p.MetadataRows = config.CSVMetadataRows
p.MetadataSeparators = config.CSVMetadataSeparators
p.MetadataTrimSet = config.CSVMetadataTrimSet
return p.Init()
}

View File

@ -827,3 +827,222 @@ corrupted_line
require.Equal(t, expectedFields0, metrics[0].Fields())
require.Equal(t, expectedFields1, metrics[1].Fields())
}
func TestParseMetadataSeparators(t *testing.T) {
p := &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 0,
MetadataSeparators: []string{},
}
err := p.Init()
require.NoError(t, err)
p = &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 1,
MetadataSeparators: []string{},
}
err = p.Init()
require.Error(t, err)
require.Equal(t, err.Error(), "initializing separators failed: "+
"csv_metadata_separator required when specifying csv_metadata_rows")
p = &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 1,
MetadataSeparators: []string{",", "=", ",", ":", "=", ":="},
}
err = p.Init()
require.NoError(t, err)
require.Len(t, p.metadataSeparatorList, 4)
require.Len(t, p.MetadataTrimSet, 0)
require.Equal(t, p.metadataSeparatorList, metadataPattern{":=", ",", "=", ":"})
p = &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 1,
MetadataSeparators: []string{",", ":", "=", ":="},
MetadataTrimSet: " #'",
}
err = p.Init()
require.NoError(t, err)
require.Len(t, p.metadataSeparatorList, 4)
require.Len(t, p.MetadataTrimSet, 3)
require.Equal(t, p.metadataSeparatorList, metadataPattern{":=", ",", ":", "="})
}
func TestParseMetadataRow(t *testing.T) {
p := &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 5,
MetadataSeparators: []string{":=", ",", ":", "="},
}
err := p.Init()
require.NoError(t, err)
require.Empty(t, p.metadataTags)
m := p.parseMetadataRow("# this is a not matching string")
require.Nil(t, m)
m = p.parseMetadataRow("# key1 : value1 \r\n")
require.Equal(t, m, map[string]string{"# key1 ": " value1 "})
m = p.parseMetadataRow("key2=1234\n")
require.Equal(t, m, map[string]string{"key2": "1234"})
m = p.parseMetadataRow(" file created : 2021-10-08T12:34:18+10:00 \r\n")
require.Equal(t, m, map[string]string{" file created ": " 2021-10-08T12:34:18+10:00 "})
m = p.parseMetadataRow("file created: 2021-10-08T12:34:18\t\r\r\n")
require.Equal(t, m, map[string]string{"file created": " 2021-10-08T12:34:18\t"})
p = &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 5,
MetadataSeparators: []string{":=", ",", ":", "="},
MetadataTrimSet: " #'",
}
err = p.Init()
require.NoError(t, err)
require.Empty(t, p.metadataTags)
m = p.parseMetadataRow("# this is a not matching string")
require.Nil(t, m)
m = p.parseMetadataRow("# key1 : value1 \r\n")
require.Equal(t, m, map[string]string{"key1": "value1"})
m = p.parseMetadataRow("key2=1234\n")
require.Equal(t, m, map[string]string{"key2": "1234"})
m = p.parseMetadataRow(" file created : 2021-10-08T12:34:18+10:00 \r\n")
require.Equal(t, m, map[string]string{"file created": "2021-10-08T12:34:18+10:00"})
m = p.parseMetadataRow("file created: '2021-10-08T12:34:18'\r\n")
require.Equal(t, m, map[string]string{"file created": "2021-10-08T12:34:18"})
}
func TestParseCSVFileWithMetadata(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
SkipRows: 2,
MetadataRows: 4,
Comment: "#",
TagColumns: []string{"type"},
MetadataSeparators: []string{":", "="},
MetadataTrimSet: " #",
}
err := p.Init()
require.NoError(t, err)
testCSV := `garbage nonsense that needs be skipped
# version= 1.0
invalid meta data that can be ignored.
file created: 2021-10-08T12:34:18+10:00
timestamp,type,name,status
2020-11-23T08:19:27+10:00,Reader,R002,1
#2020-11-04T13:23:04+10:00,Reader,R031,0
2020-11-04T13:29:47+10:00,Coordinator,C001,0`
expectedFields := []map[string]interface{}{
{
"name": "R002",
"status": int64(1),
"timestamp": "2020-11-23T08:19:27+10:00",
},
{
"name": "C001",
"status": int64(0),
"timestamp": "2020-11-04T13:29:47+10:00",
},
}
expectedTags := []map[string]string{
{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Reader",
"version": "1.0",
},
{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Coordinator",
"version": "1.0",
},
}
// Set default Tags
p.SetDefaultTags(map[string]string{"test": "tag"})
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
for i, m := range metrics {
require.Equal(t, expectedFields[i], m.Fields())
require.Equal(t, expectedTags[i], m.Tags())
}
p = &Parser{
HeaderRowCount: 1,
SkipRows: 2,
MetadataRows: 4,
Comment: "#",
TagColumns: []string{"type", "version"},
MetadataSeparators: []string{":", "="},
MetadataTrimSet: " #",
}
err = p.Init()
require.NoError(t, err)
testCSVRows := []string{
"garbage nonsense that needs be skipped",
"",
"# version= 1.0\r\n",
"",
" invalid meta data that can be ignored.\r\n",
"file created: 2021-10-08T12:34:18+10:00",
"timestamp,type,name,status\n",
"2020-11-23T08:19:27+10:00,Reader,R002,1\r\n",
"#2020-11-04T13:23:04+10:00,Reader,R031,0\n",
"2020-11-04T13:29:47+10:00,Coordinator,C001,0",
}
// Set default Tags
p.SetDefaultTags(map[string]string{"test": "tag"})
rowIndex := 0
for ; rowIndex < 6; rowIndex++ {
m, err := p.ParseLine(testCSVRows[rowIndex])
require.Error(t, io.EOF, err)
require.Error(t, err)
require.Nil(t, m)
}
m, err := p.ParseLine(testCSVRows[rowIndex])
require.Nil(t, err)
require.Nil(t, m)
rowIndex++
m, err = p.ParseLine(testCSVRows[rowIndex])
require.NoError(t, err)
require.Equal(t, expectedFields[0], m.Fields())
require.Equal(t, expectedTags[0], m.Tags())
rowIndex++
m, err = p.ParseLine(testCSVRows[rowIndex])
require.NoError(t, err)
require.Nil(t, m)
rowIndex++
m, err = p.ParseLine(testCSVRows[rowIndex])
require.NoError(t, err)
require.Equal(t, expectedFields[1], m.Fields())
require.Equal(t, expectedTags[1], m.Tags())
}
func TestOverwriteDefaultTagsAndMetaDataTags(t *testing.T) {
// This tests makes sure that the default tags and metadata tags don't overwrite record data
// This test also covers the scenario where the metadata overwrites the default tag
p := &Parser{
ColumnNames: []string{"first", "second", "third"},
TagColumns: []string{"second", "third"},
TimeFunc: DefaultTime,
MetadataRows: 2,
MetadataSeparators: []string{"="},
}
err := p.Init()
require.NoError(t, err)
p.SetDefaultTags(map[string]string{"third": "bye", "fourth": "car"})
m, err := p.ParseLine("second=orange")
require.Error(t, io.EOF, err)
require.Error(t, err)
require.Nil(t, m)
m, err = p.ParseLine("fourth=plain")
require.NoError(t, err)
require.Nil(t, m)
expectedFields := []map[string]interface{}{{"first": 1.4}}
expectedTags := []map[string]string{{"second": "orange", "third": "bye", "fourth": "car"}}
m, err = p.ParseLine("1.4,apple,hi")
require.NoError(t, err)
require.Equal(t, expectedFields[0], m.Fields())
require.Equal(t, expectedTags[0], m.Tags())
}

View File

@ -154,20 +154,24 @@ type Config struct {
GrokUniqueTimestamp string `toml:"grok_unique_timestamp"`
//csv configuration
CSVColumnNames []string `toml:"csv_column_names"`
CSVColumnTypes []string `toml:"csv_column_types"`
CSVComment string `toml:"csv_comment"`
CSVDelimiter string `toml:"csv_delimiter"`
CSVHeaderRowCount int `toml:"csv_header_row_count"`
CSVMeasurementColumn string `toml:"csv_measurement_column"`
CSVSkipColumns int `toml:"csv_skip_columns"`
CSVSkipRows int `toml:"csv_skip_rows"`
CSVTagColumns []string `toml:"csv_tag_columns"`
CSVTimestampColumn string `toml:"csv_timestamp_column"`
CSVTimestampFormat string `toml:"csv_timestamp_format"`
CSVTimezone string `toml:"csv_timezone"`
CSVTrimSpace bool `toml:"csv_trim_space"`
CSVSkipValues []string `toml:"csv_skip_values"`
CSVColumnNames []string `toml:"csv_column_names"`
CSVColumnTypes []string `toml:"csv_column_types"`
CSVComment string `toml:"csv_comment"`
CSVDelimiter string `toml:"csv_delimiter"`
CSVHeaderRowCount int `toml:"csv_header_row_count"`
CSVMeasurementColumn string `toml:"csv_measurement_column"`
CSVSkipColumns int `toml:"csv_skip_columns"`
CSVSkipRows int `toml:"csv_skip_rows"`
CSVTagColumns []string `toml:"csv_tag_columns"`
CSVTimestampColumn string `toml:"csv_timestamp_column"`
CSVTimestampFormat string `toml:"csv_timestamp_format"`
CSVTimezone string `toml:"csv_timezone"`
CSVTrimSpace bool `toml:"csv_trim_space"`
CSVSkipValues []string `toml:"csv_skip_values"`
CSVSkipErrors bool `toml:"csv_skip_errors"`
CSVMetadataRows int `toml:"csv_metadata_rows"`
CSVMetadataSeparators []string `toml:"csv_metadata_separators"`
CSVMetadataTrimSet string `toml:"csv_metadata_trim_set"`
// FormData configuration
FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"`