fix(parsers.csv): Remove direct checks for the parser type (#11825)

This commit is contained in:
Sven Rebhan 2022-09-19 20:15:32 +02:00 committed by GitHub
parent 6236059817
commit 4897f86ed7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 122 additions and 44 deletions

View File

@ -24,7 +24,6 @@ import (
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/selfstat"
)
@ -293,17 +292,12 @@ func (monitor *DirectoryMonitor) parseAtOnce(parser parsers.Parser, reader io.Re
}
func (monitor *DirectoryMonitor) parseMetrics(parser parsers.Parser, line []byte, fileName string) (metrics []telegraf.Metric, err error) {
switch parser.(type) {
case *csv.Parser:
metrics, err = parser.Parse(line)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, err
metrics, err = parser.Parse(line)
if err != nil {
if errors.Is(err, parsers.ErrEOF) {
return nil, nil
}
default:
metrics, err = parser.Parse(line)
return nil, err
}
if monitor.FileTag != "" {

View File

@ -99,6 +99,77 @@ func TestCSVGZImport(t *testing.T) {
require.NoError(t, err)
}
func TestCSVGZImportWithHeader(t *testing.T) {
acc := testutil.Accumulator{}
testCsvFile := "test.csv"
testCsvGzFile := "test.csv.gz"
// Establish process directory and finished directory.
finishedDirectory := t.TempDir()
processDirectory := t.TempDir()
// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
r.SetParserFunc(func() (parsers.Parser, error) {
parser := csv.Parser{
HeaderRowCount: 1,
SkipRows: 1,
}
err := parser.Init()
return &parser, err
})
r.Log = testutil.Logger{}
// Write csv file to process into the 'process' directory.
f, err := os.Create(filepath.Join(processDirectory, testCsvFile))
require.NoError(t, err)
_, err = f.WriteString("This is some garbage to be skipped\n")
require.NoError(t, err)
_, err = f.WriteString("thing,color\nsky,blue\ngrass,green\nclifford,red\n")
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)
// Write csv.gz file to process into the 'process' directory.
var b bytes.Buffer
w := gzip.NewWriter(&b)
_, err = w.Write([]byte("This is some garbage to be skipped\n"))
require.NoError(t, err)
_, err = w.Write([]byte("thing,color\nsky,blue\ngrass,green\nclifford,red\n"))
require.NoError(t, err)
err = w.Close()
require.NoError(t, err)
err = os.WriteFile(filepath.Join(processDirectory, testCsvGzFile), b.Bytes(), 0666)
require.NoError(t, err)
// Start plugin before adding file.
err = r.Start(&acc)
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(6)
r.Stop()
// Verify that we read both files once.
require.Equal(t, len(acc.Metrics), 6)
// File should have gone back to the test directory, as we configured.
_, err = os.Stat(filepath.Join(finishedDirectory, testCsvFile))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(finishedDirectory, testCsvGzFile))
require.NoError(t, err)
}
func TestMultipleJSONFileImports(t *testing.T) {
acc := testutil.Accumulator{}
testJSONFile := "test.json"

View File

@ -23,7 +23,6 @@ import (
"github.com/influxdata/telegraf/plugins/common/encoding"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
)
//go:embed sample.conf
@ -234,19 +233,14 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
// ParseLine parses a line of text.
func parseLine(parser parsers.Parser, line string) ([]telegraf.Metric, error) {
switch parser.(type) {
case *csv.Parser:
m, err := parser.Parse([]byte(line))
if err != nil {
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, err
m, err := parser.Parse([]byte(line))
if err != nil {
if errors.Is(err, parsers.ErrEOF) {
return nil, nil
}
return m, err
default:
return parser.Parse([]byte(line))
return nil, err
}
return m, err
}
// Receiver is launched as a goroutine to continuously watch a tailed logfile

View File

@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"encoding/csv"
"errors"
"fmt"
"io"
"sort"
@ -197,23 +198,30 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
}
r := bytes.NewReader(buf)
return parseCSV(p, r)
metrics, err := parseCSV(p, r)
if err != nil && errors.Is(err, io.EOF) {
return nil, parsers.ErrEOF
}
return metrics, err
}
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
if len(line) == 0 {
if p.remainingSkipRows > 0 {
p.remainingSkipRows--
return nil, io.EOF
return nil, parsers.ErrEOF
}
if p.remainingMetadataRows > 0 {
p.remainingMetadataRows--
return nil, io.EOF
return nil, parsers.ErrEOF
}
}
r := bytes.NewReader([]byte(line))
metrics, err := parseCSV(p, r)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, parsers.ErrEOF
}
return nil, err
}
if len(metrics) == 1 {

View File

@ -2,7 +2,6 @@ package csv
import (
"fmt"
"io"
"testing"
"time"
@ -10,6 +9,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
)
@ -392,8 +392,7 @@ hello,80,test_name2`
testCSVRows := []string{"garbage nonsense\r\n", "line1,line2,line3\r\n", "hello,80,test_name2\r\n"}
metrics, err = p.Parse([]byte(testCSVRows[0]))
require.Error(t, io.EOF, err)
require.Error(t, err)
require.ErrorIs(t, err, parsers.ErrEOF)
require.Nil(t, metrics)
m, err := p.ParseLine(testCSVRows[1])
require.NoError(t, err)
@ -467,8 +466,7 @@ func TestMultiHeader(t *testing.T) {
require.NoError(t, err)
metrics, err = p.Parse([]byte(testCSVRows[0]))
require.Error(t, io.EOF, err)
require.Error(t, err)
require.ErrorIs(t, err, parsers.ErrEOF)
require.Nil(t, metrics)
m, err := p.ParseLine(testCSVRows[1])
require.NoError(t, err)
@ -994,8 +992,7 @@ timestamp,type,name,status
rowIndex := 0
for ; rowIndex < 6; rowIndex++ {
m, err := p.ParseLine(testCSVRows[rowIndex])
require.Error(t, io.EOF, err)
require.Error(t, err)
require.ErrorIs(t, err, parsers.ErrEOF)
require.Nil(t, m)
}
m, err := p.ParseLine(testCSVRows[rowIndex])
@ -1031,8 +1028,7 @@ func TestOverwriteDefaultTagsAndMetaDataTags(t *testing.T) {
require.NoError(t, err)
p.SetDefaultTags(map[string]string{"third": "bye", "fourth": "car"})
m, err := p.ParseLine("second=orange")
require.Error(t, io.EOF, err)
require.Error(t, err)
require.ErrorIs(t, err, parsers.ErrEOF)
require.Nil(t, m)
m, err = p.ParseLine("fourth=plain")
require.NoError(t, err)
@ -1212,13 +1208,16 @@ func TestParseCSVLinewiseResetModeNone(t *testing.T) {
var metrics []telegraf.Metric
for i, r := range testCSV {
m, err := p.ParseLine(r)
// Header lines should return EOF
if m == nil {
require.Error(t, io.EOF, err)
// Header lines should return "not enough data"
if i < p.SkipRows+p.MetadataRows {
require.ErrorIs(t, err, parsers.ErrEOF)
require.Nil(t, m)
continue
}
require.NoErrorf(t, err, "failed in row %d", i)
metrics = append(metrics, m)
if m != nil {
metrics = append(metrics, m)
}
}
testutil.RequireMetricsEqual(t, expected, metrics)
@ -1314,8 +1313,8 @@ timestamp,type,name,status
// Parsing another data line should fail as it is interpreted as header
additionalCSV := "2021-12-01T19:01:00+00:00,Reader,R009,5\r\n"
metrics, err = p.Parse([]byte(additionalCSV))
require.ErrorIs(t, err, parsers.ErrEOF)
require.Nil(t, metrics)
require.Error(t, io.EOF, err)
// Prepare a second CSV with different column names
testCSV = `garbage nonsense that needs be skipped
@ -1432,13 +1431,16 @@ func TestParseCSVLinewiseResetModeAlways(t *testing.T) {
var metrics []telegraf.Metric
for i, r := range testCSV {
m, err := p.ParseLine(r)
// Header lines should return EOF
if m == nil {
require.Error(t, io.EOF, err)
// Header lines should return "not enough data"
if i < p.SkipRows+p.MetadataRows {
require.ErrorIs(t, err, parsers.ErrEOF)
require.Nil(t, m)
continue
}
require.NoErrorf(t, err, "failed in row %d", i)
metrics = append(metrics, m)
if m != nil {
metrics = append(metrics, m)
}
}
testutil.RequireMetricsEqual(t, expected, metrics)

View File

@ -0,0 +1,9 @@
package parsers
import "errors"
// ErrEOF is similar to io.EOF but is a separate type to make sure we
// have checked the parsers using it to have the same meaning (i.e.
// it needs more data to complete parsing) and a way to detect partial
// data.
var ErrEOF = errors.New("not enough data")