From 4897f86ed7d470b7e6929eac1a2ebb99ce2f08e3 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Mon, 19 Sep 2022 20:15:32 +0200 Subject: [PATCH] fix(parsers.csv): Remove direct checks for the parser type (#11825) --- .../directory_monitor/directory_monitor.go | 16 ++--- .../directory_monitor_test.go | 71 +++++++++++++++++++ plugins/inputs/tail/tail.go | 18 ++--- plugins/parsers/csv/parser.go | 14 +++- plugins/parsers/csv/parser_test.go | 38 +++++----- plugins/parsers/errors.go | 9 +++ 6 files changed, 122 insertions(+), 44 deletions(-) create mode 100644 plugins/parsers/errors.go diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index 4b4031996..78302e814 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -24,7 +24,6 @@ import ( "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" - "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/selfstat" ) @@ -293,17 +292,12 @@ func (monitor *DirectoryMonitor) parseAtOnce(parser parsers.Parser, reader io.Re } func (monitor *DirectoryMonitor) parseMetrics(parser parsers.Parser, line []byte, fileName string) (metrics []telegraf.Metric, err error) { - switch parser.(type) { - case *csv.Parser: - metrics, err = parser.Parse(line) - if err != nil { - if errors.Is(err, io.EOF) { - return nil, nil - } - return nil, err + metrics, err = parser.Parse(line) + if err != nil { + if errors.Is(err, parsers.ErrEOF) { + return nil, nil } - default: - metrics, err = parser.Parse(line) + return nil, err } if monitor.FileTag != "" { diff --git a/plugins/inputs/directory_monitor/directory_monitor_test.go b/plugins/inputs/directory_monitor/directory_monitor_test.go index 7ca4309d4..e1e4a2f90 100644 --- a/plugins/inputs/directory_monitor/directory_monitor_test.go +++ b/plugins/inputs/directory_monitor/directory_monitor_test.go @@ -99,6 +99,77 @@ func TestCSVGZImport(t *testing.T) { require.NoError(t, err) } +func TestCSVGZImportWithHeader(t *testing.T) { + acc := testutil.Accumulator{} + testCsvFile := "test.csv" + testCsvGzFile := "test.csv.gz" + + // Establish process directory and finished directory. + finishedDirectory := t.TempDir() + processDirectory := t.TempDir() + + // Init plugin. + r := DirectoryMonitor{ + Directory: processDirectory, + FinishedDirectory: finishedDirectory, + MaxBufferedMetrics: defaultMaxBufferedMetrics, + FileQueueSize: defaultFileQueueSize, + ParseMethod: defaultParseMethod, + } + err := r.Init() + require.NoError(t, err) + + r.SetParserFunc(func() (parsers.Parser, error) { + parser := csv.Parser{ + HeaderRowCount: 1, + SkipRows: 1, + } + err := parser.Init() + return &parser, err + }) + r.Log = testutil.Logger{} + + // Write csv file to process into the 'process' directory. + f, err := os.Create(filepath.Join(processDirectory, testCsvFile)) + require.NoError(t, err) + _, err = f.WriteString("This is some garbage to be skipped\n") + require.NoError(t, err) + _, err = f.WriteString("thing,color\nsky,blue\ngrass,green\nclifford,red\n") + require.NoError(t, err) + err = f.Close() + require.NoError(t, err) + + // Write csv.gz file to process into the 'process' directory. + var b bytes.Buffer + w := gzip.NewWriter(&b) + _, err = w.Write([]byte("This is some garbage to be skipped\n")) + require.NoError(t, err) + _, err = w.Write([]byte("thing,color\nsky,blue\ngrass,green\nclifford,red\n")) + require.NoError(t, err) + err = w.Close() + require.NoError(t, err) + err = os.WriteFile(filepath.Join(processDirectory, testCsvGzFile), b.Bytes(), 0666) + require.NoError(t, err) + + // Start plugin before adding file. + err = r.Start(&acc) + require.NoError(t, err) + err = r.Gather(&acc) + require.NoError(t, err) + acc.Wait(6) + r.Stop() + + // Verify that we read both files once. + require.Equal(t, len(acc.Metrics), 6) + + // File should have gone back to the test directory, as we configured. + _, err = os.Stat(filepath.Join(finishedDirectory, testCsvFile)) + require.NoError(t, err) + + _, err = os.Stat(filepath.Join(finishedDirectory, testCsvGzFile)) + require.NoError(t, err) +} + func TestMultipleJSONFileImports(t *testing.T) { acc := testutil.Accumulator{} testJSONFile := "test.json" diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index a018276cd..692e7dbde 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -23,7 +23,6 @@ import ( "github.com/influxdata/telegraf/plugins/common/encoding" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" - "github.com/influxdata/telegraf/plugins/parsers/csv" ) //go:embed sample.conf @@ -234,19 +233,14 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { // ParseLine parses a line of text. func parseLine(parser parsers.Parser, line string) ([]telegraf.Metric, error) { - switch parser.(type) { - case *csv.Parser: - m, err := parser.Parse([]byte(line)) - if err != nil { - if errors.Is(err, io.EOF) { - return nil, nil - } - return nil, err + m, err := parser.Parse([]byte(line)) + if err != nil { + if errors.Is(err, parsers.ErrEOF) { + return nil, nil } - return m, err - default: - return parser.Parse([]byte(line)) + return nil, err } + return m, err } // Receiver is launched as a goroutine to continuously watch a tailed logfile diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 6e274854b..2856628b4 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "encoding/csv" + "errors" "fmt" "io" "sort" @@ -197,23 +198,30 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { } r := bytes.NewReader(buf) - return parseCSV(p, r) + metrics, err := parseCSV(p, r) + if err != nil && errors.Is(err, io.EOF) { + return nil, parsers.ErrEOF + } + return metrics, err } func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { if len(line) == 0 { if p.remainingSkipRows > 0 { p.remainingSkipRows-- - return nil, io.EOF + return nil, parsers.ErrEOF } if p.remainingMetadataRows > 0 { p.remainingMetadataRows-- - return nil, io.EOF + return nil, parsers.ErrEOF } } r := bytes.NewReader([]byte(line)) metrics, err := parseCSV(p, r) if err != nil { + if errors.Is(err, io.EOF) { + return nil, parsers.ErrEOF + } return nil, err } if len(metrics) == 1 { diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 1462d124f..9d2d1442a 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -2,7 +2,6 @@ package csv import ( "fmt" - "io" "testing" "time" @@ -10,6 +9,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" ) @@ -392,8 +392,7 @@ hello,80,test_name2` testCSVRows := []string{"garbage nonsense\r\n", "line1,line2,line3\r\n", "hello,80,test_name2\r\n"} metrics, err = p.Parse([]byte(testCSVRows[0])) - require.Error(t, io.EOF, err) - require.Error(t, err) + require.ErrorIs(t, err, parsers.ErrEOF) require.Nil(t, metrics) m, err := p.ParseLine(testCSVRows[1]) require.NoError(t, err) @@ -467,8 +466,7 @@ func TestMultiHeader(t *testing.T) { require.NoError(t, err) metrics, err = p.Parse([]byte(testCSVRows[0])) - require.Error(t, io.EOF, err) - require.Error(t, err) + require.ErrorIs(t, err, parsers.ErrEOF) require.Nil(t, metrics) m, err := p.ParseLine(testCSVRows[1]) require.NoError(t, err) @@ -994,8 +992,7 @@ timestamp,type,name,status rowIndex := 0 for ; rowIndex < 6; rowIndex++ { m, err := p.ParseLine(testCSVRows[rowIndex]) - require.Error(t, io.EOF, err) - require.Error(t, err) + require.ErrorIs(t, err, parsers.ErrEOF) require.Nil(t, m) } m, err := p.ParseLine(testCSVRows[rowIndex]) @@ -1031,8 +1028,7 @@ func TestOverwriteDefaultTagsAndMetaDataTags(t *testing.T) { require.NoError(t, err) p.SetDefaultTags(map[string]string{"third": "bye", "fourth": "car"}) m, err := p.ParseLine("second=orange") - require.Error(t, io.EOF, err) - require.Error(t, err) + require.ErrorIs(t, err, parsers.ErrEOF) require.Nil(t, m) m, err = p.ParseLine("fourth=plain") require.NoError(t, err) @@ -1212,13 +1208,16 @@ func TestParseCSVLinewiseResetModeNone(t *testing.T) { var metrics []telegraf.Metric for i, r := range testCSV { m, err := p.ParseLine(r) - // Header lines should return EOF - if m == nil { - require.Error(t, io.EOF, err) + // Header lines should return "not enough data" + if i < p.SkipRows+p.MetadataRows { + require.ErrorIs(t, err, parsers.ErrEOF) + require.Nil(t, m) continue } require.NoErrorf(t, err, "failed in row %d", i) - metrics = append(metrics, m) + if m != nil { + metrics = append(metrics, m) + } } testutil.RequireMetricsEqual(t, expected, metrics) @@ -1314,8 +1313,8 @@ timestamp,type,name,status // Parsing another data line should fail as it is interpreted as header additionalCSV := "2021-12-01T19:01:00+00:00,Reader,R009,5\r\n" metrics, err = p.Parse([]byte(additionalCSV)) + require.ErrorIs(t, err, parsers.ErrEOF) require.Nil(t, metrics) - require.Error(t, io.EOF, err) // Prepare a second CSV with different column names testCSV = `garbage nonsense that needs be skipped @@ -1432,13 +1431,16 @@ func TestParseCSVLinewiseResetModeAlways(t *testing.T) { var metrics []telegraf.Metric for i, r := range testCSV { m, err := p.ParseLine(r) - // Header lines should return EOF - if m == nil { - require.Error(t, io.EOF, err) + // Header lines should return "not enough data" + if i < p.SkipRows+p.MetadataRows { + require.ErrorIs(t, err, parsers.ErrEOF) + require.Nil(t, m) continue } require.NoErrorf(t, err, "failed in row %d", i) - metrics = append(metrics, m) + if m != nil { + metrics = append(metrics, m) + } } testutil.RequireMetricsEqual(t, expected, metrics) diff --git a/plugins/parsers/errors.go b/plugins/parsers/errors.go new file mode 100644 index 000000000..bc83afe89 --- /dev/null +++ b/plugins/parsers/errors.go @@ -0,0 +1,9 @@ +package parsers + +import "errors" + +// ErrEOF is similar to io.EOF but is a separate type to make sure we +// have checked the parsers using it to have the same meaning (i.e. +// it needs more data to complete parsing) and a way to detect partial +// data. +var ErrEOF = errors.New("not enough data")