fix: add reset-mode flag for CSV parser (#11288)

This commit is contained in:
Sven Rebhan 2022-06-30 20:11:25 +02:00 committed by GitHub
parent a050fee9d3
commit 7d83b076c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 477 additions and 15 deletions

View File

@ -413,7 +413,7 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
param: map[string]interface{}{
"HeaderRowCount": cfg.CSVHeaderRowCount,
},
mask: []string{"TimeFunc"},
mask: []string{"TimeFunc", "ResetMode"},
},
"xpath_protobuf": {
param: map[string]interface{}{
@ -550,7 +550,7 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
param: map[string]interface{}{
"HeaderRowCount": cfg.CSVHeaderRowCount,
},
mask: []string{"TimeFunc"},
mask: []string{"TimeFunc", "ResetMode"},
},
"xpath_protobuf": {
param: map[string]interface{}{
@ -638,7 +638,7 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
options = append(options, cmpopts.IgnoreFields(stype, settings.mask...))
}
// Do a manual comparision as require.EqualValues will also work on unexported fields
// Do a manual comparison as require.EqualValues will also work on unexported fields
// that cannot be cleared or ignored.
diff := cmp.Diff(expected[i], actual[i], options...)
require.Emptyf(t, diff, "Difference in SetParser() for %q", format)

View File

@ -98,6 +98,14 @@ values.
## If set to true, the parser will skip csv lines that cannot be parsed.
## By default, this is false
csv_skip_errors = false
## Reset the parser on given conditions.
## This option can be used to reset the parser's state e.g. when always reading a
## full CSV structure including header etc. Available modes are
## "none" -- do not reset the parser (default)
## "always" -- reset the parser with each call (ignored in line-wise parsing)
## Helpful when e.g. reading whole files in each gather-cycle.
# csv_reset_mode = "none"
```
### csv_timestamp_column, csv_timestamp_format

View File

@ -15,6 +15,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)
@ -41,6 +42,7 @@ type Parser struct {
MetadataRows int `toml:"csv_metadata_rows"`
MetadataSeparators []string `toml:"csv_metadata_separators"`
MetadataTrimSet string `toml:"csv_metadata_trim_set"`
ResetMode string `toml:"csv_reset_mode"`
Log telegraf.Logger `toml:"-"`
metadataSeparatorList metadataPattern
@ -50,6 +52,11 @@ type Parser struct {
TimeFunc func() time.Time
DefaultTags map[string]string
metadataTags map[string]string
gotInitialColumnNames bool
remainingSkipRows int
remainingHeaderRows int
remainingMetadataRows int
}
type metadataPattern []string
@ -109,6 +116,19 @@ func (p *Parser) parseMetadataRow(haystack string) map[string]string {
return nil
}
func (p *Parser) Reset() {
// Reset the columns if they were not user-specified
p.gotColumnNames = p.gotInitialColumnNames
if !p.gotInitialColumnNames {
p.ColumnNames = nil
}
// Reset the internal counters
p.remainingSkipRows = p.SkipRows
p.remainingHeaderRows = p.HeaderRowCount
p.remainingMetadataRows = p.MetadataRows
}
func (p *Parser) Init() error {
if p.HeaderRowCount == 0 && len(p.ColumnNames) == 0 {
return fmt.Errorf("`csv_header_row_count` must be defined if `csv_column_names` is not specified")
@ -128,6 +148,7 @@ func (p *Parser) Init() error {
}
}
p.gotInitialColumnNames = len(p.ColumnNames) > 0
if len(p.ColumnNames) > 0 && len(p.ColumnTypes) > 0 && len(p.ColumnNames) != len(p.ColumnTypes) {
return fmt.Errorf("csv_column_names field count doesn't match with csv_column_types")
}
@ -136,12 +157,18 @@ func (p *Parser) Init() error {
return fmt.Errorf("initializing separators failed: %v", err)
}
p.gotColumnNames = len(p.ColumnNames) > 0
if p.TimeFunc == nil {
p.TimeFunc = time.Now
}
if p.ResetMode == "" {
p.ResetMode = "none"
}
if !choice.Contains(p.ResetMode, []string{"none", "always"}) {
return fmt.Errorf("unknown reset mode %q", p.ResetMode)
}
p.Reset()
return nil
}
@ -164,18 +191,23 @@ func (p *Parser) compile(r io.Reader) *csv.Reader {
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
// Reset the parser according to the specified mode
if p.ResetMode == "always" {
p.Reset()
}
r := bytes.NewReader(buf)
return parseCSV(p, r)
}
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
if len(line) == 0 {
if p.SkipRows > 0 {
p.SkipRows--
if p.remainingSkipRows > 0 {
p.remainingSkipRows--
return nil, io.EOF
}
if p.MetadataRows > 0 {
p.MetadataRows--
if p.remainingMetadataRows > 0 {
p.remainingMetadataRows--
return nil, io.EOF
}
}
@ -196,20 +228,20 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
func parseCSV(p *Parser, r io.Reader) ([]telegraf.Metric, error) {
lineReader := bufio.NewReader(r)
// skip first rows
for p.SkipRows > 0 {
for p.remainingSkipRows > 0 {
line, err := lineReader.ReadString('\n')
if err != nil && len(line) == 0 {
return nil, err
}
p.SkipRows--
p.remainingSkipRows--
}
// Parse metadata
for p.MetadataRows > 0 {
for p.remainingMetadataRows > 0 {
line, err := lineReader.ReadString('\n')
if err != nil && len(line) == 0 {
return nil, err
}
p.MetadataRows--
p.remainingMetadataRows--
m := p.parseMetadataRow(line)
for k, v := range m {
p.metadataTags[k] = v
@ -221,12 +253,12 @@ func parseCSV(p *Parser, r io.Reader) ([]telegraf.Metric, error) {
// we always reread the header to avoid side effects
// in cases where multiple files with different
// headers are read
for p.HeaderRowCount > 0 {
for p.remainingHeaderRows > 0 {
header, err := csvReader.Read()
if err != nil {
return nil, err
}
p.HeaderRowCount--
p.remainingHeaderRows--
if p.gotColumnNames {
// Ignore header lines if columns are named
continue
@ -440,6 +472,7 @@ func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.MetadataRows = config.CSVMetadataRows
p.MetadataSeparators = config.CSVMetadataSeparators
p.MetadataTrimSet = config.CSVMetadataTrimSet
p.ResetMode = "none"
return p.Init()
}

View File

@ -1046,3 +1046,424 @@ func TestOverwriteDefaultTagsAndMetaDataTags(t *testing.T) {
require.Equal(t, expectedFields[0], m.Fields())
require.Equal(t, expectedTags[0], m.Tags())
}
func TestParseCSVResetModeInvalid(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
ResetMode: "garbage",
}
require.Error(t, p.Init(), `unknown reset mode "garbage"`)
}
func TestParseCSVResetModeNone(t *testing.T) {
testCSV := `garbage nonsense that needs be skipped
# version= 1.0
invalid meta data that can be ignored.
file created: 2021-10-08T12:34:18+10:00
timestamp,type,name,status
2020-11-23T08:19:27+00:00,Reader,R002,1
#2020-11-04T13:23:04+00:00,Reader,R031,0
2020-11-04T13:29:47+00:00,Coordinator,C001,0`
expected := []telegraf.Metric{
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Reader",
"version": "1.0",
},
map[string]interface{}{
"name": "R002",
"status": int64(1),
},
time.Date(2020, 11, 23, 8, 19, 27, 0, time.UTC),
),
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Coordinator",
"version": "1.0",
},
map[string]interface{}{
"name": "C001",
"status": int64(0),
},
time.Date(2020, 11, 4, 13, 29, 47, 0, time.UTC),
),
}
p := &Parser{
HeaderRowCount: 1,
SkipRows: 2,
MetadataRows: 4,
Comment: "#",
TagColumns: []string{"type"},
MetadataSeparators: []string{":", "="},
MetadataTrimSet: " #",
TimestampColumn: "timestamp",
TimestampFormat: "2006-01-02T15:04:05Z07:00",
ResetMode: "none",
}
require.NoError(t, p.Init())
// Set default Tags
p.SetDefaultTags(map[string]string{"test": "tag"})
// Do the parsing the first time
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
testutil.RequireMetricsEqual(t, expected, metrics)
// Parsing another data line should work when not resetting
additionalCSV := "2021-12-01T19:01:00+00:00,Reader,R009,5\r\n"
additionalExpected := []telegraf.Metric{
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Reader",
"version": "1.0",
},
map[string]interface{}{
"name": "R009",
"status": int64(5),
},
time.Date(2021, 12, 1, 19, 1, 0, 0, time.UTC),
),
}
metrics, err = p.Parse([]byte(additionalCSV))
require.NoError(t, err)
testutil.RequireMetricsEqual(t, additionalExpected, metrics)
// This should fail when not resetting but reading again due to the header etc
_, err = p.Parse([]byte(testCSV))
require.Error(t, err, `parsing time "garbage nonsense that needs be skipped" as "2006-01-02T15:04:05Z07:00": cannot parse "garbage nonsense that needs be skipped" as "2006"`)
}
func TestParseCSVLinewiseResetModeNone(t *testing.T) {
testCSV := []string{
"garbage nonsense that needs be skipped",
"",
"# version= 1.0\r\n",
"",
" invalid meta data that can be ignored.\r\n",
"file created: 2021-10-08T12:34:18+10:00",
"timestamp,type,name,status\n",
"2020-11-23T08:19:27+00:00,Reader,R002,1\r\n",
"#2020-11-04T13:23:04+00:00,Reader,R031,0\n",
"2020-11-04T13:29:47+00:00,Coordinator,C001,0",
}
expected := []telegraf.Metric{
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Reader",
"version": "1.0",
},
map[string]interface{}{
"name": "R002",
"status": int64(1),
},
time.Date(2020, 11, 23, 8, 19, 27, 0, time.UTC),
),
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Coordinator",
"version": "1.0",
},
map[string]interface{}{
"name": "C001",
"status": int64(0),
},
time.Date(2020, 11, 4, 13, 29, 47, 0, time.UTC),
),
}
p := &Parser{
HeaderRowCount: 1,
SkipRows: 2,
MetadataRows: 4,
Comment: "#",
TagColumns: []string{"type"},
MetadataSeparators: []string{":", "="},
MetadataTrimSet: " #",
TimestampColumn: "timestamp",
TimestampFormat: "2006-01-02T15:04:05Z07:00",
ResetMode: "none",
}
require.NoError(t, p.Init())
// Set default Tags
p.SetDefaultTags(map[string]string{"test": "tag"})
// Do the parsing the first time
var metrics []telegraf.Metric
for i, r := range testCSV {
m, err := p.ParseLine(r)
// Header lines should return EOF
if m == nil {
require.Error(t, io.EOF, err)
continue
}
require.NoErrorf(t, err, "failed in row %d", i)
metrics = append(metrics, m)
}
testutil.RequireMetricsEqual(t, expected, metrics)
// Parsing another data line should work when not resetting
additionalCSV := "2021-12-01T19:01:00+00:00,Reader,R009,5\r\n"
additionalExpected := metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Reader",
"version": "1.0",
},
map[string]interface{}{
"name": "R009",
"status": int64(5),
},
time.Date(2021, 12, 1, 19, 1, 0, 0, time.UTC),
)
m, err := p.ParseLine(additionalCSV)
require.NoError(t, err)
testutil.RequireMetricEqual(t, additionalExpected, m)
// This should fail when not resetting but reading again due to the header etc
_, err = p.ParseLine(testCSV[0])
require.Error(t, err, `parsing time "garbage nonsense that needs be skipped" as "2006-01-02T15:04:05Z07:00": cannot parse "garbage nonsense that needs be skipped" as "2006"`)
}
func TestParseCSVResetModeAlways(t *testing.T) {
testCSV := `garbage nonsense that needs be skipped
# version= 1.0
invalid meta data that can be ignored.
file created: 2021-10-08T12:34:18+10:00
timestamp,type,name,status
2020-11-23T08:19:27+00:00,Reader,R002,1
#2020-11-04T13:23:04+00:00,Reader,R031,0
2020-11-04T13:29:47+00:00,Coordinator,C001,0`
expected := []telegraf.Metric{
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Reader",
"version": "1.0",
},
map[string]interface{}{
"name": "R002",
"status": int64(1),
},
time.Date(2020, 11, 23, 8, 19, 27, 0, time.UTC),
),
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Coordinator",
"version": "1.0",
},
map[string]interface{}{
"name": "C001",
"status": int64(0),
},
time.Date(2020, 11, 4, 13, 29, 47, 0, time.UTC),
),
}
p := &Parser{
HeaderRowCount: 1,
SkipRows: 2,
MetadataRows: 4,
Comment: "#",
TagColumns: []string{"type", "category"},
MetadataSeparators: []string{":", "="},
MetadataTrimSet: " #",
TimestampColumn: "timestamp",
TimestampFormat: "2006-01-02T15:04:05Z07:00",
ResetMode: "always",
}
require.NoError(t, p.Init())
// Set default Tags
p.SetDefaultTags(map[string]string{"test": "tag"})
// Do the parsing the first time
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
testutil.RequireMetricsEqual(t, expected, metrics)
// Parsing another data line should fail as it is interpreted as header
additionalCSV := "2021-12-01T19:01:00+00:00,Reader,R009,5\r\n"
metrics, err = p.Parse([]byte(additionalCSV))
require.Nil(t, metrics)
require.Error(t, io.EOF, err)
// Prepare a second CSV with different column names
testCSV = `garbage nonsense that needs be skipped
# version= 1.0
invalid meta data that can be ignored.
file created: 2021-10-08T12:34:18+10:00
timestamp,category,id,flag
2020-11-23T08:19:27+00:00,Reader,R002,1
#2020-11-04T13:23:04+00:00,Reader,R031,0
2020-11-04T13:29:47+00:00,Coordinator,C001,0`
expected = []telegraf.Metric{
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"category": "Reader",
"version": "1.0",
},
map[string]interface{}{
"id": "R002",
"flag": int64(1),
},
time.Date(2020, 11, 23, 8, 19, 27, 0, time.UTC),
),
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"category": "Coordinator",
"version": "1.0",
},
map[string]interface{}{
"id": "C001",
"flag": int64(0),
},
time.Date(2020, 11, 4, 13, 29, 47, 0, time.UTC),
),
}
// This should work as the parser is reset
metrics, err = p.Parse([]byte(testCSV))
require.NoError(t, err)
testutil.RequireMetricsEqual(t, expected, metrics)
}
func TestParseCSVLinewiseResetModeAlways(t *testing.T) {
testCSV := []string{
"garbage nonsense that needs be skipped",
"",
"# version= 1.0\r\n",
"",
" invalid meta data that can be ignored.\r\n",
"file created: 2021-10-08T12:34:18+10:00",
"timestamp,type,name,status\n",
"2020-11-23T08:19:27+00:00,Reader,R002,1\r\n",
"#2020-11-04T13:23:04+00:00,Reader,R031,0\n",
"2020-11-04T13:29:47+00:00,Coordinator,C001,0",
}
expected := []telegraf.Metric{
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Reader",
"version": "1.0",
},
map[string]interface{}{
"name": "R002",
"status": int64(1),
},
time.Date(2020, 11, 23, 8, 19, 27, 0, time.UTC),
),
metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Coordinator",
"version": "1.0",
},
map[string]interface{}{
"name": "C001",
"status": int64(0),
},
time.Date(2020, 11, 4, 13, 29, 47, 0, time.UTC),
),
}
p := &Parser{
HeaderRowCount: 1,
SkipRows: 2,
MetadataRows: 4,
Comment: "#",
TagColumns: []string{"type"},
MetadataSeparators: []string{":", "="},
MetadataTrimSet: " #",
TimestampColumn: "timestamp",
TimestampFormat: "2006-01-02T15:04:05Z07:00",
ResetMode: "always",
}
require.NoError(t, p.Init())
// Set default Tags
p.SetDefaultTags(map[string]string{"test": "tag"})
// Do the parsing the first time
var metrics []telegraf.Metric
for i, r := range testCSV {
m, err := p.ParseLine(r)
// Header lines should return EOF
if m == nil {
require.Error(t, io.EOF, err)
continue
}
require.NoErrorf(t, err, "failed in row %d", i)
metrics = append(metrics, m)
}
testutil.RequireMetricsEqual(t, expected, metrics)
// Parsing another data line should work in line-wise parsing as
// reset-mode "always" is ignored.
additionalCSV := "2021-12-01T19:01:00+00:00,Reader,R009,5\r\n"
additionalExpected := metric.New(
"",
map[string]string{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Reader",
"version": "1.0",
},
map[string]interface{}{
"name": "R009",
"status": int64(5),
},
time.Date(2021, 12, 1, 19, 1, 0, 0, time.UTC),
)
m, err := p.ParseLine(additionalCSV)
require.NoError(t, err)
testutil.RequireMetricEqual(t, additionalExpected, m)
// This should fail as reset-mode "always" is ignored in line-wise parsing
_, err = p.ParseLine(testCSV[0])
require.Error(t, err, `parsing time "garbage nonsense that needs be skipped" as "2006-01-02T15:04:05Z07:00": cannot parse "garbage nonsense that needs be skipped" as "2006"`)
}