diff --git a/config/config_test.go b/config/config_test.go index b8a7dfc3b..6f405cf90 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -413,7 +413,7 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) { param: map[string]interface{}{ "HeaderRowCount": cfg.CSVHeaderRowCount, }, - mask: []string{"TimeFunc"}, + mask: []string{"TimeFunc", "ResetMode"}, }, "xpath_protobuf": { param: map[string]interface{}{ @@ -550,7 +550,7 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) { param: map[string]interface{}{ "HeaderRowCount": cfg.CSVHeaderRowCount, }, - mask: []string{"TimeFunc"}, + mask: []string{"TimeFunc", "ResetMode"}, }, "xpath_protobuf": { param: map[string]interface{}{ @@ -638,7 +638,7 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) { options = append(options, cmpopts.IgnoreFields(stype, settings.mask...)) } - // Do a manual comparision as require.EqualValues will also work on unexported fields + // Do a manual comparison as require.EqualValues will also work on unexported fields // that cannot be cleared or ignored. diff := cmp.Diff(expected[i], actual[i], options...) require.Emptyf(t, diff, "Difference in SetParser() for %q", format) diff --git a/plugins/parsers/csv/README.md b/plugins/parsers/csv/README.md index 534725209..b74b3e908 100644 --- a/plugins/parsers/csv/README.md +++ b/plugins/parsers/csv/README.md @@ -98,6 +98,14 @@ values. ## If set to true, the parser will skip csv lines that cannot be parsed. ## By default, this is false csv_skip_errors = false + + ## Reset the parser on given conditions. + ## This option can be used to reset the parser's state e.g. when always reading a + ## full CSV structure including header etc. Available modes are + ## "none" -- do not reset the parser (default) + ## "always" -- reset the parser with each call (ignored in line-wise parsing) + ## Helpful when e.g. reading whole files in each gather-cycle. + # csv_reset_mode = "none" ``` ### csv_timestamp_column, csv_timestamp_format diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 0c3033a06..6e274854b 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers" ) @@ -41,6 +42,7 @@ type Parser struct { MetadataRows int `toml:"csv_metadata_rows"` MetadataSeparators []string `toml:"csv_metadata_separators"` MetadataTrimSet string `toml:"csv_metadata_trim_set"` + ResetMode string `toml:"csv_reset_mode"` Log telegraf.Logger `toml:"-"` metadataSeparatorList metadataPattern @@ -50,6 +52,11 @@ type Parser struct { TimeFunc func() time.Time DefaultTags map[string]string metadataTags map[string]string + + gotInitialColumnNames bool + remainingSkipRows int + remainingHeaderRows int + remainingMetadataRows int } type metadataPattern []string @@ -109,6 +116,19 @@ func (p *Parser) parseMetadataRow(haystack string) map[string]string { return nil } +func (p *Parser) Reset() { + // Reset the columns if they were not user-specified + p.gotColumnNames = p.gotInitialColumnNames + if !p.gotInitialColumnNames { + p.ColumnNames = nil + } + + // Reset the internal counters + p.remainingSkipRows = p.SkipRows + p.remainingHeaderRows = p.HeaderRowCount + p.remainingMetadataRows = p.MetadataRows +} + func (p *Parser) Init() error { if p.HeaderRowCount == 0 && len(p.ColumnNames) == 0 { return fmt.Errorf("`csv_header_row_count` must be defined if `csv_column_names` is not specified") @@ -128,6 +148,7 @@ func (p *Parser) Init() error { } } + p.gotInitialColumnNames = len(p.ColumnNames) > 0 if len(p.ColumnNames) > 0 && len(p.ColumnTypes) > 0 && len(p.ColumnNames) != len(p.ColumnTypes) { return fmt.Errorf("csv_column_names field count doesn't match with csv_column_types") } @@ -136,12 +157,18 @@ func (p *Parser) Init() error { return fmt.Errorf("initializing separators failed: %v", err) } - p.gotColumnNames = len(p.ColumnNames) > 0 - if p.TimeFunc == nil { p.TimeFunc = time.Now } + if p.ResetMode == "" { + p.ResetMode = "none" + } + if !choice.Contains(p.ResetMode, []string{"none", "always"}) { + return fmt.Errorf("unknown reset mode %q", p.ResetMode) + } + p.Reset() + return nil } @@ -164,18 +191,23 @@ func (p *Parser) compile(r io.Reader) *csv.Reader { } func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + // Reset the parser according to the specified mode + if p.ResetMode == "always" { + p.Reset() + } + r := bytes.NewReader(buf) return parseCSV(p, r) } func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { if len(line) == 0 { - if p.SkipRows > 0 { - p.SkipRows-- + if p.remainingSkipRows > 0 { + p.remainingSkipRows-- return nil, io.EOF } - if p.MetadataRows > 0 { - p.MetadataRows-- + if p.remainingMetadataRows > 0 { + p.remainingMetadataRows-- return nil, io.EOF } } @@ -196,20 +228,20 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { func parseCSV(p *Parser, r io.Reader) ([]telegraf.Metric, error) { lineReader := bufio.NewReader(r) // skip first rows - for p.SkipRows > 0 { + for p.remainingSkipRows > 0 { line, err := lineReader.ReadString('\n') if err != nil && len(line) == 0 { return nil, err } - p.SkipRows-- + p.remainingSkipRows-- } // Parse metadata - for p.MetadataRows > 0 { + for p.remainingMetadataRows > 0 { line, err := lineReader.ReadString('\n') if err != nil && len(line) == 0 { return nil, err } - p.MetadataRows-- + p.remainingMetadataRows-- m := p.parseMetadataRow(line) for k, v := range m { p.metadataTags[k] = v @@ -221,12 +253,12 @@ func parseCSV(p *Parser, r io.Reader) ([]telegraf.Metric, error) { // we always reread the header to avoid side effects // in cases where multiple files with different // headers are read - for p.HeaderRowCount > 0 { + for p.remainingHeaderRows > 0 { header, err := csvReader.Read() if err != nil { return nil, err } - p.HeaderRowCount-- + p.remainingHeaderRows-- if p.gotColumnNames { // Ignore header lines if columns are named continue @@ -440,6 +472,7 @@ func (p *Parser) InitFromConfig(config *parsers.Config) error { p.MetadataRows = config.CSVMetadataRows p.MetadataSeparators = config.CSVMetadataSeparators p.MetadataTrimSet = config.CSVMetadataTrimSet + p.ResetMode = "none" return p.Init() } diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 7768d2794..1462d124f 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -1046,3 +1046,424 @@ func TestOverwriteDefaultTagsAndMetaDataTags(t *testing.T) { require.Equal(t, expectedFields[0], m.Fields()) require.Equal(t, expectedTags[0], m.Tags()) } + +func TestParseCSVResetModeInvalid(t *testing.T) { + p := &Parser{ + HeaderRowCount: 1, + ResetMode: "garbage", + } + require.Error(t, p.Init(), `unknown reset mode "garbage"`) +} + +func TestParseCSVResetModeNone(t *testing.T) { + testCSV := `garbage nonsense that needs be skipped + +# version= 1.0 + + invalid meta data that can be ignored. +file created: 2021-10-08T12:34:18+10:00 +timestamp,type,name,status +2020-11-23T08:19:27+00:00,Reader,R002,1 +#2020-11-04T13:23:04+00:00,Reader,R031,0 +2020-11-04T13:29:47+00:00,Coordinator,C001,0` + + expected := []telegraf.Metric{ + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Reader", + "version": "1.0", + }, + map[string]interface{}{ + "name": "R002", + "status": int64(1), + }, + time.Date(2020, 11, 23, 8, 19, 27, 0, time.UTC), + ), + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Coordinator", + "version": "1.0", + }, + map[string]interface{}{ + "name": "C001", + "status": int64(0), + }, + time.Date(2020, 11, 4, 13, 29, 47, 0, time.UTC), + ), + } + + p := &Parser{ + HeaderRowCount: 1, + SkipRows: 2, + MetadataRows: 4, + Comment: "#", + TagColumns: []string{"type"}, + MetadataSeparators: []string{":", "="}, + MetadataTrimSet: " #", + TimestampColumn: "timestamp", + TimestampFormat: "2006-01-02T15:04:05Z07:00", + ResetMode: "none", + } + require.NoError(t, p.Init()) + // Set default Tags + p.SetDefaultTags(map[string]string{"test": "tag"}) + + // Do the parsing the first time + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + testutil.RequireMetricsEqual(t, expected, metrics) + + // Parsing another data line should work when not resetting + additionalCSV := "2021-12-01T19:01:00+00:00,Reader,R009,5\r\n" + additionalExpected := []telegraf.Metric{ + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Reader", + "version": "1.0", + }, + map[string]interface{}{ + "name": "R009", + "status": int64(5), + }, + time.Date(2021, 12, 1, 19, 1, 0, 0, time.UTC), + ), + } + metrics, err = p.Parse([]byte(additionalCSV)) + require.NoError(t, err) + testutil.RequireMetricsEqual(t, additionalExpected, metrics) + + // This should fail when not resetting but reading again due to the header etc + _, err = p.Parse([]byte(testCSV)) + require.Error(t, err, `parsing time "garbage nonsense that needs be skipped" as "2006-01-02T15:04:05Z07:00": cannot parse "garbage nonsense that needs be skipped" as "2006"`) +} + +func TestParseCSVLinewiseResetModeNone(t *testing.T) { + testCSV := []string{ + "garbage nonsense that needs be skipped", + "", + "# version= 1.0\r\n", + "", + " invalid meta data that can be ignored.\r\n", + "file created: 2021-10-08T12:34:18+10:00", + "timestamp,type,name,status\n", + "2020-11-23T08:19:27+00:00,Reader,R002,1\r\n", + "#2020-11-04T13:23:04+00:00,Reader,R031,0\n", + "2020-11-04T13:29:47+00:00,Coordinator,C001,0", + } + + expected := []telegraf.Metric{ + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Reader", + "version": "1.0", + }, + map[string]interface{}{ + "name": "R002", + "status": int64(1), + }, + time.Date(2020, 11, 23, 8, 19, 27, 0, time.UTC), + ), + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Coordinator", + "version": "1.0", + }, + map[string]interface{}{ + "name": "C001", + "status": int64(0), + }, + time.Date(2020, 11, 4, 13, 29, 47, 0, time.UTC), + ), + } + + p := &Parser{ + HeaderRowCount: 1, + SkipRows: 2, + MetadataRows: 4, + Comment: "#", + TagColumns: []string{"type"}, + MetadataSeparators: []string{":", "="}, + MetadataTrimSet: " #", + TimestampColumn: "timestamp", + TimestampFormat: "2006-01-02T15:04:05Z07:00", + ResetMode: "none", + } + require.NoError(t, p.Init()) + + // Set default Tags + p.SetDefaultTags(map[string]string{"test": "tag"}) + + // Do the parsing the first time + var metrics []telegraf.Metric + for i, r := range testCSV { + m, err := p.ParseLine(r) + // Header lines should return EOF + if m == nil { + require.Error(t, io.EOF, err) + continue + } + require.NoErrorf(t, err, "failed in row %d", i) + metrics = append(metrics, m) + } + testutil.RequireMetricsEqual(t, expected, metrics) + + // Parsing another data line should work when not resetting + additionalCSV := "2021-12-01T19:01:00+00:00,Reader,R009,5\r\n" + additionalExpected := metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Reader", + "version": "1.0", + }, + map[string]interface{}{ + "name": "R009", + "status": int64(5), + }, + time.Date(2021, 12, 1, 19, 1, 0, 0, time.UTC), + ) + m, err := p.ParseLine(additionalCSV) + require.NoError(t, err) + testutil.RequireMetricEqual(t, additionalExpected, m) + + // This should fail when not resetting but reading again due to the header etc + _, err = p.ParseLine(testCSV[0]) + require.Error(t, err, `parsing time "garbage nonsense that needs be skipped" as "2006-01-02T15:04:05Z07:00": cannot parse "garbage nonsense that needs be skipped" as "2006"`) +} + +func TestParseCSVResetModeAlways(t *testing.T) { + testCSV := `garbage nonsense that needs be skipped + +# version= 1.0 + + invalid meta data that can be ignored. +file created: 2021-10-08T12:34:18+10:00 +timestamp,type,name,status +2020-11-23T08:19:27+00:00,Reader,R002,1 +#2020-11-04T13:23:04+00:00,Reader,R031,0 +2020-11-04T13:29:47+00:00,Coordinator,C001,0` + + expected := []telegraf.Metric{ + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Reader", + "version": "1.0", + }, + map[string]interface{}{ + "name": "R002", + "status": int64(1), + }, + time.Date(2020, 11, 23, 8, 19, 27, 0, time.UTC), + ), + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Coordinator", + "version": "1.0", + }, + map[string]interface{}{ + "name": "C001", + "status": int64(0), + }, + time.Date(2020, 11, 4, 13, 29, 47, 0, time.UTC), + ), + } + + p := &Parser{ + HeaderRowCount: 1, + SkipRows: 2, + MetadataRows: 4, + Comment: "#", + TagColumns: []string{"type", "category"}, + MetadataSeparators: []string{":", "="}, + MetadataTrimSet: " #", + TimestampColumn: "timestamp", + TimestampFormat: "2006-01-02T15:04:05Z07:00", + ResetMode: "always", + } + require.NoError(t, p.Init()) + // Set default Tags + p.SetDefaultTags(map[string]string{"test": "tag"}) + + // Do the parsing the first time + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + testutil.RequireMetricsEqual(t, expected, metrics) + + // Parsing another data line should fail as it is interpreted as header + additionalCSV := "2021-12-01T19:01:00+00:00,Reader,R009,5\r\n" + metrics, err = p.Parse([]byte(additionalCSV)) + require.Nil(t, metrics) + require.Error(t, io.EOF, err) + + // Prepare a second CSV with different column names + testCSV = `garbage nonsense that needs be skipped + +# version= 1.0 + + invalid meta data that can be ignored. +file created: 2021-10-08T12:34:18+10:00 +timestamp,category,id,flag +2020-11-23T08:19:27+00:00,Reader,R002,1 +#2020-11-04T13:23:04+00:00,Reader,R031,0 +2020-11-04T13:29:47+00:00,Coordinator,C001,0` + + expected = []telegraf.Metric{ + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "category": "Reader", + "version": "1.0", + }, + map[string]interface{}{ + "id": "R002", + "flag": int64(1), + }, + time.Date(2020, 11, 23, 8, 19, 27, 0, time.UTC), + ), + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "category": "Coordinator", + "version": "1.0", + }, + map[string]interface{}{ + "id": "C001", + "flag": int64(0), + }, + time.Date(2020, 11, 4, 13, 29, 47, 0, time.UTC), + ), + } + + // This should work as the parser is reset + metrics, err = p.Parse([]byte(testCSV)) + require.NoError(t, err) + testutil.RequireMetricsEqual(t, expected, metrics) +} + +func TestParseCSVLinewiseResetModeAlways(t *testing.T) { + testCSV := []string{ + "garbage nonsense that needs be skipped", + "", + "# version= 1.0\r\n", + "", + " invalid meta data that can be ignored.\r\n", + "file created: 2021-10-08T12:34:18+10:00", + "timestamp,type,name,status\n", + "2020-11-23T08:19:27+00:00,Reader,R002,1\r\n", + "#2020-11-04T13:23:04+00:00,Reader,R031,0\n", + "2020-11-04T13:29:47+00:00,Coordinator,C001,0", + } + + expected := []telegraf.Metric{ + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Reader", + "version": "1.0", + }, + map[string]interface{}{ + "name": "R002", + "status": int64(1), + }, + time.Date(2020, 11, 23, 8, 19, 27, 0, time.UTC), + ), + metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Coordinator", + "version": "1.0", + }, + map[string]interface{}{ + "name": "C001", + "status": int64(0), + }, + time.Date(2020, 11, 4, 13, 29, 47, 0, time.UTC), + ), + } + + p := &Parser{ + HeaderRowCount: 1, + SkipRows: 2, + MetadataRows: 4, + Comment: "#", + TagColumns: []string{"type"}, + MetadataSeparators: []string{":", "="}, + MetadataTrimSet: " #", + TimestampColumn: "timestamp", + TimestampFormat: "2006-01-02T15:04:05Z07:00", + ResetMode: "always", + } + require.NoError(t, p.Init()) + + // Set default Tags + p.SetDefaultTags(map[string]string{"test": "tag"}) + + // Do the parsing the first time + var metrics []telegraf.Metric + for i, r := range testCSV { + m, err := p.ParseLine(r) + // Header lines should return EOF + if m == nil { + require.Error(t, io.EOF, err) + continue + } + require.NoErrorf(t, err, "failed in row %d", i) + metrics = append(metrics, m) + } + testutil.RequireMetricsEqual(t, expected, metrics) + + // Parsing another data line should work in line-wise parsing as + // reset-mode "always" is ignored. + additionalCSV := "2021-12-01T19:01:00+00:00,Reader,R009,5\r\n" + additionalExpected := metric.New( + "", + map[string]string{ + "file created": "2021-10-08T12:34:18+10:00", + "test": "tag", + "type": "Reader", + "version": "1.0", + }, + map[string]interface{}{ + "name": "R009", + "status": int64(5), + }, + time.Date(2021, 12, 1, 19, 1, 0, 0, time.UTC), + ) + m, err := p.ParseLine(additionalCSV) + require.NoError(t, err) + testutil.RequireMetricEqual(t, additionalExpected, m) + + // This should fail as reset-mode "always" is ignored in line-wise parsing + _, err = p.ParseLine(testCSV[0]) + require.Error(t, err, `parsing time "garbage nonsense that needs be skipped" as "2006-01-02T15:04:05Z07:00": cannot parse "garbage nonsense that needs be skipped" as "2006"`) +}