From 5adecc3cd9b93252388142b8e155b37d393de8df Mon Sep 17 00:00:00 2001 From: Ehsan <57578566+etycomputer@users.noreply.github.com> Date: Thu, 24 Feb 2022 11:28:16 +1000 Subject: [PATCH] feat(parsers/csv): Add metadata support to CSV parser plugin (#10083) --- plugins/inputs/http/http_test.go | 2 +- plugins/parsers/csv/README.md | 56 +++++++- plugins/parsers/csv/parser.go | 146 +++++++++++++++---- plugins/parsers/csv/parser_test.go | 219 +++++++++++++++++++++++++++++ plugins/parsers/registry.go | 32 +++-- 5 files changed, 412 insertions(+), 43 deletions(-) diff --git a/plugins/inputs/http/http_test.go b/plugins/inputs/http/http_test.go index 80454e80c..68ff02ba8 100644 --- a/plugins/inputs/http/http_test.go +++ b/plugins/inputs/http/http_test.go @@ -377,7 +377,7 @@ func TestHTTPWithCSVFormat(t *testing.T) { plugin.SetParserFunc(func() (telegraf.Parser, error) { parser := &csv.Parser{ MetricName: "metricName", - SkipRows: 2, + SkipRows: 3, ColumnNames: []string{"a", "b", "c"}, TagColumns: []string{"c"}, } diff --git a/plugins/parsers/csv/README.md b/plugins/parsers/csv/README.md index 196891c40..e320a0471 100644 --- a/plugins/parsers/csv/README.md +++ b/plugins/parsers/csv/README.md @@ -33,8 +33,26 @@ values. ## If this is not specified, type conversion will be done on the types above. csv_column_types = [] - ## Indicates the number of rows to skip before looking for header information. + ## Indicates the number of rows to skip before looking for metadata and header information. csv_skip_rows = 0 + + + ## Indicates the number of rows to parse as metadata before looking for header information. + ## By default, the parser assumes there are no metadata rows to parse. + ## If set, the parser would use the provided separators in the csv_metadata_separators to look for metadata. + ## Please note that by default, the (key, value) pairs will be added as fields. + ## Use the tag_columns to convert the metadata into tags. + csv_metadata_rows = 0 + + ## A list of metadata separators. If csv_metadata_rows is set, + ## csv_metadata_separators must contain at least one separator. + ## Please note that separators are case sensitive and the sequence of the seperators are respected. + csv_metadata_separators = [":", "="] + + ## A set of metadata trim characters. + ## If csv_metadata_trim_cutset is not set, no trimming is performed. + ## Please note that the trim cutset is case sensitive. + csv_metadata_trim_set = "" ## Indicates the number of columns to skip before looking for data to parse. ## These columns will be skipped in the header as well. @@ -85,7 +103,7 @@ values. ### csv_timestamp_column, csv_timestamp_format -By default the current time will be used for all created metrics, to set the +By default, the current time will be used for all created metrics, to set the time using the JSON document you can use the `csv_timestamp_column` and `csv_timestamp_format` options together to set the time to a value in the parsed document. @@ -121,15 +139,45 @@ Config: Input: -```shell +```csv measurement,cpu,time_user,time_system,time_idle,time cpu,cpu0,42,42,42,2018-09-13T13:03:28Z ``` Output: -```shell +```text cpu cpu=cpu0,time_user=42,time_system=42,time_idle=42 1536869008000000000 ``` +Config: + +```toml +[[inputs.file]] + files = ["example"] + data_format = "csv" + csv_metadata_rows = 2 + csv_metadata_separators = [":", "="] + csv_metadata_trim_set = " #" + csv_header_row_count = 1 + csv_tag_columns = ["Version","File Created"] + csv_timestamp_column = "time" + csv_timestamp_format = "2006-01-02T15:04:05Z07:00" +``` + +Input: + +```csv +# Version=1.1 +# File Created: 2021-11-17T07:02:45+10:00 +measurement,cpu,time_user,time_system,time_idle,time +cpu,cpu0,42,42,42,2018-09-13T13:03:28Z +``` + +Output: + +```text +cpu,File\ Created=2021-11-17T07:02:45+10:00,Version=1.1 cpu=cpu0,time_user=42,time_system=42,time_idle=42 1536869008000000000 +``` + [metric filtering]: /docs/CONFIGURATION.md#metric-filtering diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 4b53188eb..e998acab0 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -1,10 +1,12 @@ package csv import ( + "bufio" "bytes" "encoding/csv" "fmt" "io" + "sort" "strconv" "strings" "time" @@ -20,28 +22,91 @@ import ( type TimeFunc func() time.Time type Parser struct { - ColumnNames []string `toml:"csv_column_names"` - ColumnTypes []string `toml:"csv_column_types"` - Comment string `toml:"csv_comment"` - Delimiter string `toml:"csv_delimiter"` - HeaderRowCount int `toml:"csv_header_row_count"` - MeasurementColumn string `toml:"csv_measurement_column"` - MetricName string `toml:"metric_name"` - SkipColumns int `toml:"csv_skip_columns"` - SkipRows int `toml:"csv_skip_rows"` - TagColumns []string `toml:"csv_tag_columns"` - TimestampColumn string `toml:"csv_timestamp_column"` - TimestampFormat string `toml:"csv_timestamp_format"` - Timezone string `toml:"csv_timezone"` - TrimSpace bool `toml:"csv_trim_space"` - SkipValues []string `toml:"csv_skip_values"` - SkipErrors bool `toml:"csv_skip_errors"` - Log telegraf.Logger `toml:"-"` + ColumnNames []string `toml:"csv_column_names"` + ColumnTypes []string `toml:"csv_column_types"` + Comment string `toml:"csv_comment"` + Delimiter string `toml:"csv_delimiter"` + HeaderRowCount int `toml:"csv_header_row_count"` + MeasurementColumn string `toml:"csv_measurement_column"` + MetricName string `toml:"metric_name"` + SkipColumns int `toml:"csv_skip_columns"` + SkipRows int `toml:"csv_skip_rows"` + TagColumns []string `toml:"csv_tag_columns"` + TimestampColumn string `toml:"csv_timestamp_column"` + TimestampFormat string `toml:"csv_timestamp_format"` + Timezone string `toml:"csv_timezone"` + TrimSpace bool `toml:"csv_trim_space"` + SkipValues []string `toml:"csv_skip_values"` + SkipErrors bool `toml:"csv_skip_errors"` + MetadataRows int `toml:"csv_metadata_rows"` + MetadataSeparators []string `toml:"cvs_metadata_separators"` + MetadataTrimSet string `toml:"cvs_metadata_trim_set"` + Log telegraf.Logger `toml:"-"` + + metadataSeparatorList metadataPattern gotColumnNames bool - TimeFunc func() time.Time - DefaultTags map[string]string + TimeFunc func() time.Time + DefaultTags map[string]string + metadataTags map[string]string +} + +type metadataPattern []string + +func (record metadataPattern) Len() int { + return len(record) +} +func (record metadataPattern) Swap(i, j int) { + record[i], record[j] = record[j], record[i] +} +func (record metadataPattern) Less(i, j int) bool { + // Metadata with longer lengths should be ordered before shorter metadata + return len(record[i]) > len(record[j]) +} + +func (p *Parser) initializeMetadataSeparators() error { + // initialize metadata + p.metadataTags = map[string]string{} + p.metadataSeparatorList = []string{} + + if p.MetadataRows <= 0 { + return nil + } + + if len(p.MetadataSeparators) == 0 { + return fmt.Errorf("csv_metadata_separator required when specifying csv_metadata_rows") + } + + p.metadataSeparatorList = metadataPattern{} + patternList := map[string]bool{} + for _, pattern := range p.MetadataSeparators { + if patternList[pattern] { + // Ignore further, duplicated entries + continue + } + patternList[pattern] = true + p.metadataSeparatorList = append(p.metadataSeparatorList, pattern) + } + sort.Stable(p.metadataSeparatorList) + + return nil +} + +func (p *Parser) parseMetadataRow(haystack string) map[string]string { + haystack = strings.TrimRight(haystack, "\r\n") + for _, needle := range p.metadataSeparatorList { + metadata := strings.SplitN(haystack, needle, 2) + if len(metadata) < 2 { + continue + } + key := strings.Trim(metadata[0], p.MetadataTrimSet) + if len(key) > 0 { + value := strings.Trim(metadata[1], p.MetadataTrimSet) + return map[string]string{key: value} + } + } + return nil } func (p *Parser) Init() error { @@ -67,6 +132,10 @@ func (p *Parser) Init() error { return fmt.Errorf("csv_column_names field count doesn't match with csv_column_types") } + if err := p.initializeMetadataSeparators(); err != nil { + return fmt.Errorf("initializing separators failed: %v", err) + } + p.gotColumnNames = len(p.ColumnNames) > 0 if p.TimeFunc == nil { @@ -99,9 +168,17 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { return parseCSV(p, r) } -// ParseLine does not use any information in header and assumes DataColumns is set -// it will also not skip any rows func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + if len(line) == 0 { + if p.SkipRows > 0 { + p.SkipRows-- + return nil, io.EOF + } + if p.MetadataRows > 0 { + p.MetadataRows-- + return nil, io.EOF + } + } r := bytes.NewReader([]byte(line)) metrics, err := parseCSV(p, r) if err != nil { @@ -117,15 +194,28 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { } func parseCSV(p *Parser, r io.Reader) ([]telegraf.Metric, error) { - csvReader := p.compile(r) + lineReader := bufio.NewReader(r) // skip first rows for p.SkipRows > 0 { - _, err := csvReader.Read() - if err != nil { + line, err := lineReader.ReadString('\n') + if err != nil && len(line) == 0 { return nil, err } p.SkipRows-- } + // Parse metadata + for p.MetadataRows > 0 { + line, err := lineReader.ReadString('\n') + if err != nil && len(line) == 0 { + return nil, err + } + p.MetadataRows-- + m := p.parseMetadataRow(line) + for k, v := range m { + p.metadataTags[k] = v + } + } + csvReader := p.compile(lineReader) // if there is a header, and we did not get DataColumns // set DataColumns to names extracted from the header // we always reread the header to avoid side effects @@ -261,6 +351,11 @@ outer: } } + // add metadata fields + for k, v := range p.metadataTags { + tags[k] = v + } + // add default tags for k, v := range p.DefaultTags { tags[k] = v @@ -342,6 +437,9 @@ func (p *Parser) InitFromConfig(config *parsers.Config) error { p.Timezone = config.CSVTimezone p.DefaultTags = config.DefaultTags p.SkipValues = config.CSVSkipValues + p.MetadataRows = config.CSVMetadataRows + p.MetadataSeparators = config.CSVMetadataSeparators + p.MetadataTrimSet = config.CSVMetadataTrimSet return p.Init() } diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 398f61449..b0dd6152b 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -827,3 +827,222 @@ corrupted_line require.Equal(t, expectedFields0, metrics[0].Fields()) require.Equal(t, expectedFields1, metrics[1].Fields()) } + +func TestParseMetadataSeparators(t *testing.T) { + p := &Parser{ + ColumnNames: []string{"a", "b"}, + MetadataRows: 0, + MetadataSeparators: []string{}, + } + err := p.Init() + require.NoError(t, err) + p = &Parser{ + ColumnNames: []string{"a", "b"}, + MetadataRows: 1, + MetadataSeparators: []string{}, + } + err = p.Init() + require.Error(t, err) + require.Equal(t, err.Error(), "initializing separators failed: "+ + "csv_metadata_separator required when specifying csv_metadata_rows") + p = &Parser{ + ColumnNames: []string{"a", "b"}, + MetadataRows: 1, + MetadataSeparators: []string{",", "=", ",", ":", "=", ":="}, + } + err = p.Init() + require.NoError(t, err) + require.Len(t, p.metadataSeparatorList, 4) + require.Len(t, p.MetadataTrimSet, 0) + require.Equal(t, p.metadataSeparatorList, metadataPattern{":=", ",", "=", ":"}) + p = &Parser{ + ColumnNames: []string{"a", "b"}, + MetadataRows: 1, + MetadataSeparators: []string{",", ":", "=", ":="}, + MetadataTrimSet: " #'", + } + err = p.Init() + require.NoError(t, err) + require.Len(t, p.metadataSeparatorList, 4) + require.Len(t, p.MetadataTrimSet, 3) + require.Equal(t, p.metadataSeparatorList, metadataPattern{":=", ",", ":", "="}) +} + +func TestParseMetadataRow(t *testing.T) { + p := &Parser{ + ColumnNames: []string{"a", "b"}, + MetadataRows: 5, + MetadataSeparators: []string{":=", ",", ":", "="}, + } + err := p.Init() + require.NoError(t, err) + require.Empty(t, p.metadataTags) + m := p.parseMetadataRow("# this is a not matching string") + require.Nil(t, m) + m = p.parseMetadataRow("# key1 : value1 \r\n") + require.Equal(t, m, map[string]string{"# key1 ": " value1 "}) + m = p.parseMetadataRow("key2=1234\n") + require.Equal(t, m, map[string]string{"key2": "1234"}) + m = p.parseMetadataRow(" file created : 2021-10-08T12:34:18+10:00 \r\n") + require.Equal(t, m, map[string]string{" file created ": " 2021-10-08T12:34:18+10:00 "}) + m = p.parseMetadataRow("file created: 2021-10-08T12:34:18\t\r\r\n") + require.Equal(t, m, map[string]string{"file created": " 2021-10-08T12:34:18\t"}) + p = &Parser{ + ColumnNames: []string{"a", "b"}, + MetadataRows: 5, + MetadataSeparators: []string{":=", ",", ":", "="}, + MetadataTrimSet: " #'", + } + err = p.Init() + require.NoError(t, err) + require.Empty(t, p.metadataTags) + m = p.parseMetadataRow("# this is a not matching string") + require.Nil(t, m) + m = p.parseMetadataRow("# key1 : value1 \r\n") + require.Equal(t, m, map[string]string{"key1": "value1"}) + m = p.parseMetadataRow("key2=1234\n") + require.Equal(t, m, map[string]string{"key2": "1234"}) + m = p.parseMetadataRow(" file created : 2021-10-08T12:34:18+10:00 \r\n") + require.Equal(t, m, map[string]string{"file created": "2021-10-08T12:34:18+10:00"}) + m = p.parseMetadataRow("file created: '2021-10-08T12:34:18'\r\n") + require.Equal(t, m, map[string]string{"file created": "2021-10-08T12:34:18"}) +} + +func TestParseCSVFileWithMetadata(t *testing.T) { + p := &Parser{ + HeaderRowCount: 1, + SkipRows: 2, + MetadataRows: 4, + Comment: "#", + TagColumns: []string{"type"}, + MetadataSeparators: []string{":", "="}, + MetadataTrimSet: " #", + } + err := p.Init() + require.NoError(t, err) + testCSV := `garbage nonsense that needs be skipped + +# version= 1.0 + + invalid meta data that can be ignored. +file created: 2021-10-08T12:34:18+10:00 +timestamp,type,name,status +2020-11-23T08:19:27+10:00,Reader,R002,1 +#2020-11-04T13:23:04+10:00,Reader,R031,0 +2020-11-04T13:29:47+10:00,Coordinator,C001,0` + expectedFields := []map[string]interface{}{ + { + "name": "R002", + "status": int64(1), + "timestamp": "2020-11-23T08:19:27+10:00", + }, + { + "name": "C001", + "status": int64(0), + "timestamp": "2020-11-04T13:29:47+10:00", + }, + } + expectedTags := []map[string]string{ + { + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Reader", + "version": "1.0", + }, + { + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Coordinator", + "version": "1.0", + }, + } + // Set default Tags + p.SetDefaultTags(map[string]string{"test": "tag"}) + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + for i, m := range metrics { + require.Equal(t, expectedFields[i], m.Fields()) + require.Equal(t, expectedTags[i], m.Tags()) + } + + p = &Parser{ + HeaderRowCount: 1, + SkipRows: 2, + MetadataRows: 4, + Comment: "#", + TagColumns: []string{"type", "version"}, + MetadataSeparators: []string{":", "="}, + MetadataTrimSet: " #", + } + err = p.Init() + require.NoError(t, err) + testCSVRows := []string{ + "garbage nonsense that needs be skipped", + "", + "# version= 1.0\r\n", + "", + " invalid meta data that can be ignored.\r\n", + "file created: 2021-10-08T12:34:18+10:00", + "timestamp,type,name,status\n", + "2020-11-23T08:19:27+10:00,Reader,R002,1\r\n", + "#2020-11-04T13:23:04+10:00,Reader,R031,0\n", + "2020-11-04T13:29:47+10:00,Coordinator,C001,0", + } + + // Set default Tags + p.SetDefaultTags(map[string]string{"test": "tag"}) + rowIndex := 0 + for ; rowIndex < 6; rowIndex++ { + m, err := p.ParseLine(testCSVRows[rowIndex]) + require.Error(t, io.EOF, err) + require.Error(t, err) + require.Nil(t, m) + } + m, err := p.ParseLine(testCSVRows[rowIndex]) + require.Nil(t, err) + require.Nil(t, m) + rowIndex++ + m, err = p.ParseLine(testCSVRows[rowIndex]) + require.NoError(t, err) + require.Equal(t, expectedFields[0], m.Fields()) + require.Equal(t, expectedTags[0], m.Tags()) + rowIndex++ + m, err = p.ParseLine(testCSVRows[rowIndex]) + require.NoError(t, err) + require.Nil(t, m) + rowIndex++ + m, err = p.ParseLine(testCSVRows[rowIndex]) + require.NoError(t, err) + require.Equal(t, expectedFields[1], m.Fields()) + require.Equal(t, expectedTags[1], m.Tags()) +} + +func TestOverwriteDefaultTagsAndMetaDataTags(t *testing.T) { + // This tests makes sure that the default tags and metadata tags don't overwrite record data + // This test also covers the scenario where the metadata overwrites the default tag + p := &Parser{ + ColumnNames: []string{"first", "second", "third"}, + TagColumns: []string{"second", "third"}, + TimeFunc: DefaultTime, + MetadataRows: 2, + MetadataSeparators: []string{"="}, + } + err := p.Init() + require.NoError(t, err) + p.SetDefaultTags(map[string]string{"third": "bye", "fourth": "car"}) + m, err := p.ParseLine("second=orange") + require.Error(t, io.EOF, err) + require.Error(t, err) + require.Nil(t, m) + m, err = p.ParseLine("fourth=plain") + require.NoError(t, err) + require.Nil(t, m) + expectedFields := []map[string]interface{}{{"first": 1.4}} + expectedTags := []map[string]string{{"second": "orange", "third": "bye", "fourth": "car"}} + + m, err = p.ParseLine("1.4,apple,hi") + require.NoError(t, err) + + require.Equal(t, expectedFields[0], m.Fields()) + require.Equal(t, expectedTags[0], m.Tags()) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 3c974a320..985d2ab0d 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -154,20 +154,24 @@ type Config struct { GrokUniqueTimestamp string `toml:"grok_unique_timestamp"` //csv configuration - CSVColumnNames []string `toml:"csv_column_names"` - CSVColumnTypes []string `toml:"csv_column_types"` - CSVComment string `toml:"csv_comment"` - CSVDelimiter string `toml:"csv_delimiter"` - CSVHeaderRowCount int `toml:"csv_header_row_count"` - CSVMeasurementColumn string `toml:"csv_measurement_column"` - CSVSkipColumns int `toml:"csv_skip_columns"` - CSVSkipRows int `toml:"csv_skip_rows"` - CSVTagColumns []string `toml:"csv_tag_columns"` - CSVTimestampColumn string `toml:"csv_timestamp_column"` - CSVTimestampFormat string `toml:"csv_timestamp_format"` - CSVTimezone string `toml:"csv_timezone"` - CSVTrimSpace bool `toml:"csv_trim_space"` - CSVSkipValues []string `toml:"csv_skip_values"` + CSVColumnNames []string `toml:"csv_column_names"` + CSVColumnTypes []string `toml:"csv_column_types"` + CSVComment string `toml:"csv_comment"` + CSVDelimiter string `toml:"csv_delimiter"` + CSVHeaderRowCount int `toml:"csv_header_row_count"` + CSVMeasurementColumn string `toml:"csv_measurement_column"` + CSVSkipColumns int `toml:"csv_skip_columns"` + CSVSkipRows int `toml:"csv_skip_rows"` + CSVTagColumns []string `toml:"csv_tag_columns"` + CSVTimestampColumn string `toml:"csv_timestamp_column"` + CSVTimestampFormat string `toml:"csv_timestamp_format"` + CSVTimezone string `toml:"csv_timezone"` + CSVTrimSpace bool `toml:"csv_trim_space"` + CSVSkipValues []string `toml:"csv_skip_values"` + CSVSkipErrors bool `toml:"csv_skip_errors"` + CSVMetadataRows int `toml:"csv_metadata_rows"` + CSVMetadataSeparators []string `toml:"csv_metadata_separators"` + CSVMetadataTrimSet string `toml:"csv_metadata_trim_set"` // FormData configuration FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"`