From a202f68333e4d8da1e4829909251d2bdb53c7baf Mon Sep 17 00:00:00 2001 From: Alberto Fernandez Date: Wed, 22 Dec 2021 21:07:14 +0100 Subject: [PATCH] feat: add option to skip errors during CSV parsing (#10267) --- config/config.go | 3 ++- plugins/parsers/csv/README.md | 4 ++++ plugins/parsers/csv/parser.go | 6 ++++++ plugins/parsers/csv/parser_test.go | 33 ++++++++++++++++++++++++++++++ plugins/parsers/registry.go | 2 ++ 5 files changed, 47 insertions(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index aedf0b890..c8c9e5312 100644 --- a/config/config.go +++ b/config/config.go @@ -1437,6 +1437,7 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config, c.getFieldInt(tbl, "csv_skip_columns", &pc.CSVSkipColumns) c.getFieldBool(tbl, "csv_trim_space", &pc.CSVTrimSpace) c.getFieldStringSlice(tbl, "csv_skip_values", &pc.CSVSkipValues) + c.getFieldBool(tbl, "csv_skip_errors", &pc.CSVSkipErrors) c.getFieldStringSlice(tbl, "form_urlencoded_tag_keys", &pc.FormUrlencodedTagKeys) @@ -1652,7 +1653,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file", "collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter", "csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count", - "csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns", + "csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns", "csv_skip_errors", "csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values", "data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path", "dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path", diff --git a/plugins/parsers/csv/README.md b/plugins/parsers/csv/README.md index c1d727a37..196891c40 100644 --- a/plugins/parsers/csv/README.md +++ b/plugins/parsers/csv/README.md @@ -77,6 +77,10 @@ values. ## Indicates values to skip, such as an empty string value "". ## The field will be skipped entirely where it matches any values inserted here. csv_skip_values = [] + + ## If set to true, the parser will skip csv lines that cannot be parsed. + ## By default, this is false + csv_skip_errors = false ``` ### csv_timestamp_column, csv_timestamp_format diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 3f46c24b9..a607f1dc6 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -32,6 +32,7 @@ type Config struct { Timezone string `toml:"csv_timezone"` TrimSpace bool `toml:"csv_trim_space"` SkipValues []string `toml:"csv_skip_values"` + SkipErrors bool `toml:"csv_skip_errors"` gotColumnNames bool @@ -42,6 +43,7 @@ type Config struct { // Parser is a CSV parser, you should use NewParser to create a new instance. type Parser struct { *Config + Log telegraf.Logger } func NewParser(c *Config) (*Parser, error) { @@ -169,6 +171,10 @@ func parseCSV(p *Parser, r io.Reader) ([]telegraf.Metric, error) { for _, record := range table { m, err := p.parseRecord(record) if err != nil { + if p.SkipErrors { + p.Log.Debugf("Parsing error: %v", err) + continue + } return metrics, err } metrics = append(metrics, m) diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 5fc72bdb5..a0047b1ad 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -817,3 +817,36 @@ func TestSkipSpecifiedStringValue(t *testing.T) { } testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime()) } + +func TestSkipErrorOnCorruptedCSVLine(t *testing.T) { + p, err := NewParser( + &Config{ + HeaderRowCount: 1, + TimestampColumn: "date", + TimestampFormat: "02/01/06 03:04:05 PM", + TimeFunc: DefaultTime, + SkipErrors: true, + }, + ) + require.NoError(t, err) + p.Log = testutil.Logger{} + testCSV := `date,a,b +23/05/09 11:05:06 PM,1,2 +corrupted_line +07/11/09 04:06:07 PM,3,4` + + expectedFields0 := map[string]interface{}{ + "a": int64(1), + "b": int64(2), + } + + expectedFields1 := map[string]interface{}{ + "a": int64(3), + "b": int64(4), + } + + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, expectedFields0, metrics[0].Fields()) + require.Equal(t, expectedFields1, metrics[1].Fields()) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index fcdfc473a..208dd7ccc 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -152,6 +152,7 @@ type Config struct { CSVTimezone string `toml:"csv_timezone"` CSVTrimSpace bool `toml:"csv_trim_space"` CSVSkipValues []string `toml:"csv_skip_values"` + CSVSkipErrors bool `toml:"csv_skip_errors"` // FormData configuration FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"` @@ -250,6 +251,7 @@ func NewParser(config *Config) (Parser, error) { Timezone: config.CSVTimezone, DefaultTags: config.DefaultTags, SkipValues: config.CSVSkipValues, + SkipErrors: config.CSVSkipErrors, } return csv.NewParser(config)