feat: add option to skip errors during CSV parsing (#10267)

This commit is contained in:
Alberto Fernandez 2021-12-22 21:07:14 +01:00 committed by GitHub
parent 0cedc36729
commit a202f68333
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 1 deletions

View File

@ -1437,6 +1437,7 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config,
c.getFieldInt(tbl, "csv_skip_columns", &pc.CSVSkipColumns)
c.getFieldBool(tbl, "csv_trim_space", &pc.CSVTrimSpace)
c.getFieldStringSlice(tbl, "csv_skip_values", &pc.CSVSkipValues)
c.getFieldBool(tbl, "csv_skip_errors", &pc.CSVSkipErrors)
c.getFieldStringSlice(tbl, "form_urlencoded_tag_keys", &pc.FormUrlencodedTagKeys)
@ -1652,7 +1653,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
"csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns",
"csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns", "csv_skip_errors",
"csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",
"dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path",

View File

@ -77,6 +77,10 @@ values.
## Indicates values to skip, such as an empty string value "".
## The field will be skipped entirely where it matches any values inserted here.
csv_skip_values = []
## If set to true, the parser will skip csv lines that cannot be parsed.
## By default, this is false
csv_skip_errors = false
```
### csv_timestamp_column, csv_timestamp_format

View File

@ -32,6 +32,7 @@ type Config struct {
Timezone string `toml:"csv_timezone"`
TrimSpace bool `toml:"csv_trim_space"`
SkipValues []string `toml:"csv_skip_values"`
SkipErrors bool `toml:"csv_skip_errors"`
gotColumnNames bool
@ -42,6 +43,7 @@ type Config struct {
// Parser is a CSV parser, you should use NewParser to create a new instance.
type Parser struct {
*Config
Log telegraf.Logger
}
func NewParser(c *Config) (*Parser, error) {
@ -169,6 +171,10 @@ func parseCSV(p *Parser, r io.Reader) ([]telegraf.Metric, error) {
for _, record := range table {
m, err := p.parseRecord(record)
if err != nil {
if p.SkipErrors {
p.Log.Debugf("Parsing error: %v", err)
continue
}
return metrics, err
}
metrics = append(metrics, m)

View File

@ -817,3 +817,36 @@ func TestSkipSpecifiedStringValue(t *testing.T) {
}
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime())
}
func TestSkipErrorOnCorruptedCSVLine(t *testing.T) {
p, err := NewParser(
&Config{
HeaderRowCount: 1,
TimestampColumn: "date",
TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
SkipErrors: true,
},
)
require.NoError(t, err)
p.Log = testutil.Logger{}
testCSV := `date,a,b
23/05/09 11:05:06 PM,1,2
corrupted_line
07/11/09 04:06:07 PM,3,4`
expectedFields0 := map[string]interface{}{
"a": int64(1),
"b": int64(2),
}
expectedFields1 := map[string]interface{}{
"a": int64(3),
"b": int64(4),
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, expectedFields0, metrics[0].Fields())
require.Equal(t, expectedFields1, metrics[1].Fields())
}

View File

@ -152,6 +152,7 @@ type Config struct {
CSVTimezone string `toml:"csv_timezone"`
CSVTrimSpace bool `toml:"csv_trim_space"`
CSVSkipValues []string `toml:"csv_skip_values"`
CSVSkipErrors bool `toml:"csv_skip_errors"`
// FormData configuration
FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"`
@ -250,6 +251,7 @@ func NewParser(config *Config) (Parser, error) {
Timezone: config.CSVTimezone,
DefaultTags: config.DefaultTags,
SkipValues: config.CSVSkipValues,
SkipErrors: config.CSVSkipErrors,
}
return csv.NewParser(config)