feat(parsers.csv): suport null delimiters (#12247)
This commit is contained in:
parent
35b5476c83
commit
e264721cb9
|
|
@ -59,6 +59,10 @@ values.
|
||||||
|
|
||||||
## The separator between csv fields
|
## The separator between csv fields
|
||||||
## By default, the parser assumes a comma (",")
|
## By default, the parser assumes a comma (",")
|
||||||
|
## Please note that if you use invalid delimiters (e.g. "\u0000"), commas
|
||||||
|
## will be changed to "\ufffd", the invalid delimiters changed to a comma
|
||||||
|
## during parsing, and afterwards the invalid characters and commas are
|
||||||
|
## returned to their original values.
|
||||||
csv_delimiter = ","
|
csv_delimiter = ","
|
||||||
|
|
||||||
## The character reserved for marking a row as a comment row
|
## The character reserved for marking a row as a comment row
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
"unicode/utf8"
|
||||||
|
|
||||||
_ "time/tzdata" // needed to bundle timezone info into the binary for Windows
|
_ "time/tzdata" // needed to bundle timezone info into the binary for Windows
|
||||||
|
|
||||||
|
|
@ -23,6 +24,9 @@ import (
|
||||||
|
|
||||||
type TimeFunc func() time.Time
|
type TimeFunc func() time.Time
|
||||||
|
|
||||||
|
const replacementByte = "\ufffd"
|
||||||
|
const commaByte = "\u002C"
|
||||||
|
|
||||||
type Parser struct {
|
type Parser struct {
|
||||||
ColumnNames []string `toml:"csv_column_names"`
|
ColumnNames []string `toml:"csv_column_names"`
|
||||||
ColumnTypes []string `toml:"csv_column_types"`
|
ColumnTypes []string `toml:"csv_column_types"`
|
||||||
|
|
@ -51,6 +55,8 @@ type Parser struct {
|
||||||
|
|
||||||
gotColumnNames bool
|
gotColumnNames bool
|
||||||
|
|
||||||
|
invalidDelimiter bool
|
||||||
|
|
||||||
TimeFunc func() time.Time
|
TimeFunc func() time.Time
|
||||||
DefaultTags map[string]string
|
DefaultTags map[string]string
|
||||||
metadataTags map[string]string
|
metadataTags map[string]string
|
||||||
|
|
@ -141,6 +147,7 @@ func (p *Parser) Init() error {
|
||||||
if len(runeStr) > 1 {
|
if len(runeStr) > 1 {
|
||||||
return fmt.Errorf("csv_delimiter must be a single character, got: %s", p.Delimiter)
|
return fmt.Errorf("csv_delimiter must be a single character, got: %s", p.Delimiter)
|
||||||
}
|
}
|
||||||
|
p.invalidDelimiter = !validDelim(runeStr[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Comment != "" {
|
if p.Comment != "" {
|
||||||
|
|
@ -182,9 +189,13 @@ func (p *Parser) compile(r io.Reader) *csv.Reader {
|
||||||
csvReader := csv.NewReader(r)
|
csvReader := csv.NewReader(r)
|
||||||
// ensures that the reader reads records of different lengths without an error
|
// ensures that the reader reads records of different lengths without an error
|
||||||
csvReader.FieldsPerRecord = -1
|
csvReader.FieldsPerRecord = -1
|
||||||
if p.Delimiter != "" {
|
if !p.invalidDelimiter && p.Delimiter != "" {
|
||||||
csvReader.Comma = []rune(p.Delimiter)[0]
|
csvReader.Comma = []rune(p.Delimiter)[0]
|
||||||
}
|
}
|
||||||
|
// Check if delimiter is invalid
|
||||||
|
if p.invalidDelimiter && p.Delimiter != "" {
|
||||||
|
csvReader.Comma = []rune(commaByte)[0]
|
||||||
|
}
|
||||||
if p.Comment != "" {
|
if p.Comment != "" {
|
||||||
csvReader.Comment = []rune(p.Comment)[0]
|
csvReader.Comment = []rune(p.Comment)[0]
|
||||||
}
|
}
|
||||||
|
|
@ -192,12 +203,23 @@ func (p *Parser) compile(r io.Reader) *csv.Reader {
|
||||||
return csvReader
|
return csvReader
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Taken from upstream Golang code see
|
||||||
|
// https://github.com/golang/go/blob/release-branch.go1.19/src/encoding/csv/reader.go#L95
|
||||||
|
func validDelim(r rune) bool {
|
||||||
|
return r != 0 && r != '"' && r != '\r' && r != '\n' && utf8.ValidRune(r) && r != utf8.RuneError
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||||
// Reset the parser according to the specified mode
|
// Reset the parser according to the specified mode
|
||||||
if p.ResetMode == "always" {
|
if p.ResetMode == "always" {
|
||||||
p.Reset()
|
p.Reset()
|
||||||
}
|
}
|
||||||
|
// If using an invalid delimiter, replace commas with replacement and
|
||||||
|
// invalid delimiter with commas
|
||||||
|
if p.invalidDelimiter {
|
||||||
|
buf = bytes.Replace(buf, []byte(commaByte), []byte(replacementByte), -1)
|
||||||
|
buf = bytes.Replace(buf, []byte(p.Delimiter), []byte(commaByte), -1)
|
||||||
|
}
|
||||||
r := bytes.NewReader(buf)
|
r := bytes.NewReader(buf)
|
||||||
metrics, err := parseCSV(p, r)
|
metrics, err := parseCSV(p, r)
|
||||||
if err != nil && errors.Is(err, io.EOF) {
|
if err != nil && errors.Is(err, io.EOF) {
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package csv
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -221,6 +222,24 @@ func TestDelimiter(t *testing.T) {
|
||||||
require.Equal(t, "3,4", metrics[0].Fields()["first"])
|
require.Equal(t, "3,4", metrics[0].Fields()["first"])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNullDelimiter(t *testing.T) {
|
||||||
|
p := &Parser{
|
||||||
|
HeaderRowCount: 0,
|
||||||
|
Delimiter: "\u0000",
|
||||||
|
ColumnNames: []string{"first", "second", "third"},
|
||||||
|
TimeFunc: DefaultTime,
|
||||||
|
}
|
||||||
|
err := p.Init()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testCSV := strings.Join([]string{"3.4", "70", "test_name"}, "\u0000")
|
||||||
|
metrics, err := p.Parse([]byte(testCSV))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, float64(3.4), metrics[0].Fields()["first"])
|
||||||
|
require.Equal(t, int64(70), metrics[0].Fields()["second"])
|
||||||
|
require.Equal(t, "test_name", metrics[0].Fields()["third"])
|
||||||
|
}
|
||||||
|
|
||||||
func TestValueConversion(t *testing.T) {
|
func TestValueConversion(t *testing.T) {
|
||||||
p := &Parser{
|
p := &Parser{
|
||||||
HeaderRowCount: 0,
|
HeaderRowCount: 0,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue