telegraf/plugins/parsers/csv/parser_test.go

1049 lines
26 KiB
Go
Raw Normal View History

2018-08-25 07:40:41 +08:00
package csv
import (
"fmt"
"io"
2018-08-25 07:40:41 +08:00
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
2018-08-25 07:40:41 +08:00
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
2018-08-25 07:40:41 +08:00
)
var DefaultTime = func() time.Time {
return time.Unix(3600, 0)
}
2018-08-25 07:40:41 +08:00
func TestBasicCSV(t *testing.T) {
p := &Parser{
ColumnNames: []string{"first", "second", "third"},
TagColumns: []string{"third"},
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
_, err = p.ParseLine("1.4,true,hi")
2018-08-25 07:40:41 +08:00
require.NoError(t, err)
}
func TestHeaderConcatenationCSV(t *testing.T) {
p := &Parser{
HeaderRowCount: 2,
MeasurementColumn: "3",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `first,second
1,2,3
3.4,70,test_name`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, "test_name", metrics[0].Name())
}
func TestHeaderOverride(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `line1,line2,line3
3.4,70,test_name`
expectedFields := map[string]interface{}{
"first": 3.4,
"second": int64(70),
}
2018-08-25 07:40:41 +08:00
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, "test_name", metrics[0].Name())
require.Equal(t, expectedFields, metrics[0].Fields())
testCSVRows := []string{"line1,line2,line3\r\n", "3.4,70,test_name\r\n"}
p = &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
}
err = p.Init()
require.NoError(t, err)
metrics, err = p.Parse([]byte(testCSVRows[0]))
require.NoError(t, err)
require.Equal(t, []telegraf.Metric{}, metrics)
m, err := p.ParseLine(testCSVRows[1])
require.NoError(t, err)
require.Equal(t, "test_name", m.Name())
require.Equal(t, expectedFields, m.Fields())
2018-08-25 07:40:41 +08:00
}
func TestTimestamp(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `line1,line2,line3
23/05/09 04:05:06 PM,70,test_name
07/11/09 04:05:06 PM,80,test_name2`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094706000000000))
require.Equal(t, metrics[1].Time().UnixNano(), int64(1257609906000000000))
}
func TestTimestampYYYYMMDDHHmm(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "200601021504",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
200905231605,70,test_name
200907111605,80,test_name2`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094700000000000))
require.Equal(t, metrics[1].Time().UnixNano(), int64(1247328300000000000))
}
2018-08-25 07:40:41 +08:00
func TestTimestampError(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `line1,line2,line3
23/05/09 04:05:06 PM,70,test_name
07/11/09 04:05:06 PM,80,test_name2`
_, err = p.Parse([]byte(testCSV))
2018-08-25 07:40:41 +08:00
require.Equal(t, fmt.Errorf("timestamp format must be specified"), err)
}
func TestTimestampUnixFormat(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
1243094706,70,test_name
1257609906,80,test_name2`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094706000000000))
require.Equal(t, metrics[1].Time().UnixNano(), int64(1257609906000000000))
}
func TestTimestampUnixMSFormat(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "unix_ms",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
1243094706123,70,test_name
1257609906123,80,test_name2`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094706123000000))
require.Equal(t, metrics[1].Time().UnixNano(), int64(1257609906123000000))
}
2018-08-25 07:40:41 +08:00
func TestQuotedCharacter(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `line1,line2,line3
"3,4",70,test_name`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, "3,4", metrics[0].Fields()["first"])
}
func TestDelimiter(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
Delimiter: "%",
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `line1%line2%line3
3,4%70%test_name`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, "3,4", metrics[0].Fields()["first"])
}
func TestValueConversion(t *testing.T) {
p := &Parser{
HeaderRowCount: 0,
Delimiter: ",",
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `3.3,4,true,hello`
expectedTags := make(map[string]string)
expectedFields := map[string]interface{}{
"first": 3.3,
"second": 4,
"third": true,
"fourth": "hello",
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
expectedMetric := metric.New("test_value", expectedTags, expectedFields, time.Unix(0, 0))
returnedMetric := metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0))
2018-08-25 07:40:41 +08:00
//deep equal fields
require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields())
// Test explicit type conversion.
p.ColumnTypes = []string{"float", "int", "bool", "string"}
metrics, err = p.Parse([]byte(testCSV))
require.NoError(t, err)
returnedMetric = metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0))
//deep equal fields
require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields())
2018-08-25 07:40:41 +08:00
}
func TestSkipComment(t *testing.T) {
p := &Parser{
HeaderRowCount: 0,
Comment: "#",
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `#3.3,4,true,hello
4,9.9,true,name_this`
expectedFields := map[string]interface{}{
"first": int64(4),
"second": 9.9,
"third": true,
"fourth": "name_this",
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, expectedFields, metrics[0].Fields())
}
func TestTrimSpace(t *testing.T) {
p := &Parser{
HeaderRowCount: 0,
TrimSpace: true,
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := ` 3.3, 4, true,hello`
expectedFields := map[string]interface{}{
"first": 3.3,
"second": int64(4),
"third": true,
"fourth": "hello",
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, expectedFields, metrics[0].Fields())
p = &Parser{
HeaderRowCount: 2,
TrimSpace: true,
TimeFunc: DefaultTime,
}
err = p.Init()
require.NoError(t, err)
testCSV = " col , col ,col\n" +
" 1 , 2 ,3\n" +
" test space , 80 ,test_name"
metrics, err = p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, map[string]interface{}{"col1": "test space", "col2": int64(80), "col3": "test_name"}, metrics[0].Fields())
2018-08-25 07:40:41 +08:00
}
2020-05-16 06:43:32 +08:00
func TestTrimSpaceDelimitedBySpace(t *testing.T) {
p := &Parser{
Delimiter: " ",
HeaderRowCount: 1,
TrimSpace: true,
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
testCSV := ` first second third fourth
abcdefgh 0 2 false
abcdef 3.3 4 true
f 0 2 false`
expectedFields := map[string]interface{}{
"first": "abcdef",
"second": 3.3,
"third": int64(4),
"fourth": true,
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, expectedFields, metrics[1].Fields())
}
2018-08-25 07:40:41 +08:00
func TestSkipRows(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
SkipRows: 1,
TagColumns: []string{"line1"},
MeasurementColumn: "line3",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `garbage nonsense
line1,line2,line3
hello,80,test_name2`
expectedFields := map[string]interface{}{
"line2": int64(80),
}
expectedTags := map[string]string{
"line1": "hello",
}
2018-08-25 07:40:41 +08:00
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, "test_name2", metrics[0].Name())
2018-08-25 07:40:41 +08:00
require.Equal(t, expectedFields, metrics[0].Fields())
require.Equal(t, expectedTags, metrics[0].Tags())
p = &Parser{
HeaderRowCount: 1,
SkipRows: 1,
TagColumns: []string{"line1"},
MeasurementColumn: "line3",
TimeFunc: DefaultTime,
}
err = p.Init()
require.NoError(t, err)
testCSVRows := []string{"garbage nonsense\r\n", "line1,line2,line3\r\n", "hello,80,test_name2\r\n"}
metrics, err = p.Parse([]byte(testCSVRows[0]))
require.Error(t, io.EOF, err)
require.Error(t, err)
require.Nil(t, metrics)
m, err := p.ParseLine(testCSVRows[1])
require.NoError(t, err)
require.Nil(t, m)
m, err = p.ParseLine(testCSVRows[2])
require.NoError(t, err)
require.Equal(t, "test_name2", m.Name())
require.Equal(t, expectedFields, m.Fields())
require.Equal(t, expectedTags, m.Tags())
2018-08-25 07:40:41 +08:00
}
func TestSkipColumns(t *testing.T) {
p := &Parser{
SkipColumns: 1,
ColumnNames: []string{"line1", "line2"},
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `hello,80,test_name`
expectedFields := map[string]interface{}{
"line1": int64(80),
"line2": "test_name",
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, expectedFields, metrics[0].Fields())
}
func TestSkipColumnsWithHeader(t *testing.T) {
p := &Parser{
SkipColumns: 1,
HeaderRowCount: 2,
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
2018-08-25 07:40:41 +08:00
testCSV := `col,col,col
1,2,3
trash,80,test_name`
2018-08-25 07:40:41 +08:00
// we should expect an error if we try to get col1
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, map[string]interface{}{"col2": int64(80), "col3": "test_name"}, metrics[0].Fields())
}
func TestMultiHeader(t *testing.T) {
p := &Parser{
HeaderRowCount: 2,
TimeFunc: DefaultTime,
}
require.NoError(t, p.Init())
testCSV := `col,col
1,2
80,test_name`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, map[string]interface{}{"col1": int64(80), "col2": "test_name"}, metrics[0].Fields())
testCSVRows := []string{"col,col\r\n", "1,2\r\n", "80,test_name\r\n"}
p = &Parser{
HeaderRowCount: 2,
TimeFunc: DefaultTime,
}
err = p.Init()
require.NoError(t, err)
metrics, err = p.Parse([]byte(testCSVRows[0]))
require.Error(t, io.EOF, err)
require.Error(t, err)
require.Nil(t, metrics)
m, err := p.ParseLine(testCSVRows[1])
require.NoError(t, err)
require.Nil(t, m)
m, err = p.ParseLine(testCSVRows[2])
require.NoError(t, err)
require.Equal(t, map[string]interface{}{"col1": int64(80), "col2": "test_name"}, m.Fields())
}
func TestParseStream(t *testing.T) {
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
csvHeader := "a,b,c"
csvBody := "1,2,3"
metrics, err := p.Parse([]byte(csvHeader))
require.NoError(t, err)
require.Len(t, metrics, 0)
m, err := p.ParseLine(csvBody)
require.NoError(t, err)
testutil.RequireMetricEqual(t,
testutil.MustMetric(
"csv",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
"c": int64(3),
},
DefaultTime(),
), m)
}
func TestParseLineMultiMetricErrorMessage(t *testing.T) {
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
TimeFunc: DefaultTime,
}
require.NoError(t, p.Init())
csvHeader := "a,b,c"
csvOneRow := "1,2,3"
csvTwoRows := "4,5,6\n7,8,9"
metrics, err := p.Parse([]byte(csvHeader))
require.NoError(t, err)
require.Len(t, metrics, 0)
m, err := p.ParseLine(csvOneRow)
require.NoError(t, err)
testutil.RequireMetricEqual(t,
testutil.MustMetric(
"csv",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
"c": int64(3),
},
DefaultTime(),
), m)
m, err = p.ParseLine(csvTwoRows)
require.Errorf(t, err, "expected 1 metric found 2")
require.Nil(t, m)
metrics, err = p.Parse([]byte(csvTwoRows))
require.NoError(t, err)
require.Len(t, metrics, 2)
}
func TestTimestampUnixFloatPrecision(t *testing.T) {
p := &Parser{
MetricName: "csv",
ColumnNames: []string{"time", "value"},
TimestampColumn: "time",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
}
err := p.Init()
require.NoError(t, err)
data := `1551129661.95456123352050781250,42`
expected := []telegraf.Metric{
testutil.MustMetric(
"csv",
map[string]string{},
map[string]interface{}{
"value": 42,
},
time.Unix(1551129661, 954561233),
),
}
metrics, err := p.Parse([]byte(data))
require.NoError(t, err)
testutil.RequireMetricsEqual(t, expected, metrics)
}
func TestSkipMeasurementColumn(t *testing.T) {
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
TimestampColumn: "timestamp",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
TrimSpace: true,
}
err := p.Init()
require.NoError(t, err)
data := `id,value,timestamp
1,5,1551129661.954561233`
expected := []telegraf.Metric{
testutil.MustMetric(
"csv",
map[string]string{},
map[string]interface{}{
"id": 1,
"value": 5,
},
time.Unix(1551129661, 954561233),
),
}
metrics, err := p.Parse([]byte(data))
require.NoError(t, err)
testutil.RequireMetricsEqual(t, expected, metrics)
}
func TestSkipTimestampColumn(t *testing.T) {
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
TimestampColumn: "timestamp",
TimestampFormat: "unix",
TimeFunc: DefaultTime,
TrimSpace: true,
}
err := p.Init()
require.NoError(t, err)
data := `id,value,timestamp
1,5,1551129661.954561233`
expected := []telegraf.Metric{
testutil.MustMetric(
"csv",
map[string]string{},
map[string]interface{}{
"id": 1,
"value": 5,
},
time.Unix(1551129661, 954561233),
),
}
metrics, err := p.Parse([]byte(data))
require.NoError(t, err)
testutil.RequireMetricsEqual(t, expected, metrics)
}
func TestTimestampTimezone(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
Timezone: "Asia/Jakarta",
}
err := p.Init()
require.NoError(t, err)
testCSV := `line1,line2,line3
23/05/09 11:05:06 PM,70,test_name
07/11/09 11:05:06 PM,80,test_name2`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094706000000000))
require.Equal(t, metrics[1].Time().UnixNano(), int64(1257609906000000000))
}
func TestEmptyMeasurementName(t *testing.T) {
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"", "b"},
MeasurementColumn: "",
}
err := p.Init()
require.NoError(t, err)
testCSV := `,b
1,2`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric("csv",
map[string]string{},
map[string]interface{}{
"b": 2,
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime())
}
func TestNumericMeasurementName(t *testing.T) {
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
MeasurementColumn: "a",
}
err := p.Init()
require.NoError(t, err)
testCSV := `a,b
1,2`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric("1",
map[string]string{},
map[string]interface{}{
"b": 2,
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime())
}
func TestStaticMeasurementName(t *testing.T) {
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
}
err := p.Init()
require.NoError(t, err)
testCSV := `a,b
1,2`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric("csv",
map[string]string{},
map[string]interface{}{
"a": 1,
"b": 2,
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime())
}
func TestSkipEmptyStringValue(t *testing.T) {
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
SkipValues: []string{""},
}
err := p.Init()
require.NoError(t, err)
testCSV := `a,b
1,""`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric("csv",
map[string]string{},
map[string]interface{}{
"a": 1,
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime())
}
func TestSkipSpecifiedStringValue(t *testing.T) {
p := &Parser{
MetricName: "csv",
HeaderRowCount: 1,
ColumnNames: []string{"a", "b"},
SkipValues: []string{"MM"},
}
err := p.Init()
require.NoError(t, err)
testCSV := `a,b
1,MM`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric("csv",
map[string]string{},
map[string]interface{}{
"a": 1,
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime())
}
func TestSkipErrorOnCorruptedCSVLine(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
TimestampColumn: "date",
TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
SkipErrors: true,
Log: testutil.Logger{},
}
err := p.Init()
require.NoError(t, err)
testCSV := `date,a,b
23/05/09 11:05:06 PM,1,2
corrupted_line
07/11/09 04:06:07 PM,3,4`
expectedFields0 := map[string]interface{}{
"a": int64(1),
"b": int64(2),
}
expectedFields1 := map[string]interface{}{
"a": int64(3),
"b": int64(4),
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, expectedFields0, metrics[0].Fields())
require.Equal(t, expectedFields1, metrics[1].Fields())
}
func TestParseMetadataSeparators(t *testing.T) {
p := &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 0,
MetadataSeparators: []string{},
}
err := p.Init()
require.NoError(t, err)
p = &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 1,
MetadataSeparators: []string{},
}
err = p.Init()
require.Error(t, err)
require.Equal(t, err.Error(), "initializing separators failed: "+
"csv_metadata_separators required when specifying csv_metadata_rows")
p = &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 1,
MetadataSeparators: []string{",", "=", ",", ":", "=", ":="},
}
err = p.Init()
require.NoError(t, err)
require.Len(t, p.metadataSeparatorList, 4)
require.Len(t, p.MetadataTrimSet, 0)
require.Equal(t, p.metadataSeparatorList, metadataPattern{":=", ",", "=", ":"})
p = &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 1,
MetadataSeparators: []string{",", ":", "=", ":="},
MetadataTrimSet: " #'",
}
err = p.Init()
require.NoError(t, err)
require.Len(t, p.metadataSeparatorList, 4)
require.Len(t, p.MetadataTrimSet, 3)
require.Equal(t, p.metadataSeparatorList, metadataPattern{":=", ",", ":", "="})
}
func TestParseMetadataRow(t *testing.T) {
p := &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 5,
MetadataSeparators: []string{":=", ",", ":", "="},
}
err := p.Init()
require.NoError(t, err)
require.Empty(t, p.metadataTags)
m := p.parseMetadataRow("# this is a not matching string")
require.Nil(t, m)
m = p.parseMetadataRow("# key1 : value1 \r\n")
require.Equal(t, m, map[string]string{"# key1 ": " value1 "})
m = p.parseMetadataRow("key2=1234\n")
require.Equal(t, m, map[string]string{"key2": "1234"})
m = p.parseMetadataRow(" file created : 2021-10-08T12:34:18+10:00 \r\n")
require.Equal(t, m, map[string]string{" file created ": " 2021-10-08T12:34:18+10:00 "})
m = p.parseMetadataRow("file created: 2021-10-08T12:34:18\t\r\r\n")
require.Equal(t, m, map[string]string{"file created": " 2021-10-08T12:34:18\t"})
p = &Parser{
ColumnNames: []string{"a", "b"},
MetadataRows: 5,
MetadataSeparators: []string{":=", ",", ":", "="},
MetadataTrimSet: " #'",
}
err = p.Init()
require.NoError(t, err)
require.Empty(t, p.metadataTags)
m = p.parseMetadataRow("# this is a not matching string")
require.Nil(t, m)
m = p.parseMetadataRow("# key1 : value1 \r\n")
require.Equal(t, m, map[string]string{"key1": "value1"})
m = p.parseMetadataRow("key2=1234\n")
require.Equal(t, m, map[string]string{"key2": "1234"})
m = p.parseMetadataRow(" file created : 2021-10-08T12:34:18+10:00 \r\n")
require.Equal(t, m, map[string]string{"file created": "2021-10-08T12:34:18+10:00"})
m = p.parseMetadataRow("file created: '2021-10-08T12:34:18'\r\n")
require.Equal(t, m, map[string]string{"file created": "2021-10-08T12:34:18"})
}
func TestParseCSVFileWithMetadata(t *testing.T) {
p := &Parser{
HeaderRowCount: 1,
SkipRows: 2,
MetadataRows: 4,
Comment: "#",
TagColumns: []string{"type"},
MetadataSeparators: []string{":", "="},
MetadataTrimSet: " #",
}
err := p.Init()
require.NoError(t, err)
testCSV := `garbage nonsense that needs be skipped
# version= 1.0
invalid meta data that can be ignored.
file created: 2021-10-08T12:34:18+10:00
timestamp,type,name,status
2020-11-23T08:19:27+10:00,Reader,R002,1
#2020-11-04T13:23:04+10:00,Reader,R031,0
2020-11-04T13:29:47+10:00,Coordinator,C001,0`
expectedFields := []map[string]interface{}{
{
"name": "R002",
"status": int64(1),
"timestamp": "2020-11-23T08:19:27+10:00",
},
{
"name": "C001",
"status": int64(0),
"timestamp": "2020-11-04T13:29:47+10:00",
},
}
expectedTags := []map[string]string{
{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Reader",
"version": "1.0",
},
{
"file created": "2021-10-08T12:34:18+10:00",
"test": "tag",
"type": "Coordinator",
"version": "1.0",
},
}
// Set default Tags
p.SetDefaultTags(map[string]string{"test": "tag"})
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
for i, m := range metrics {
require.Equal(t, expectedFields[i], m.Fields())
require.Equal(t, expectedTags[i], m.Tags())
}
p = &Parser{
HeaderRowCount: 1,
SkipRows: 2,
MetadataRows: 4,
Comment: "#",
TagColumns: []string{"type", "version"},
MetadataSeparators: []string{":", "="},
MetadataTrimSet: " #",
}
err = p.Init()
require.NoError(t, err)
testCSVRows := []string{
"garbage nonsense that needs be skipped",
"",
"# version= 1.0\r\n",
"",
" invalid meta data that can be ignored.\r\n",
"file created: 2021-10-08T12:34:18+10:00",
"timestamp,type,name,status\n",
"2020-11-23T08:19:27+10:00,Reader,R002,1\r\n",
"#2020-11-04T13:23:04+10:00,Reader,R031,0\n",
"2020-11-04T13:29:47+10:00,Coordinator,C001,0",
}
// Set default Tags
p.SetDefaultTags(map[string]string{"test": "tag"})
rowIndex := 0
for ; rowIndex < 6; rowIndex++ {
m, err := p.ParseLine(testCSVRows[rowIndex])
require.Error(t, io.EOF, err)
require.Error(t, err)
require.Nil(t, m)
}
m, err := p.ParseLine(testCSVRows[rowIndex])
require.Nil(t, err)
require.Nil(t, m)
rowIndex++
m, err = p.ParseLine(testCSVRows[rowIndex])
require.NoError(t, err)
require.Equal(t, expectedFields[0], m.Fields())
require.Equal(t, expectedTags[0], m.Tags())
rowIndex++
m, err = p.ParseLine(testCSVRows[rowIndex])
require.NoError(t, err)
require.Nil(t, m)
rowIndex++
m, err = p.ParseLine(testCSVRows[rowIndex])
require.NoError(t, err)
require.Equal(t, expectedFields[1], m.Fields())
require.Equal(t, expectedTags[1], m.Tags())
}
func TestOverwriteDefaultTagsAndMetaDataTags(t *testing.T) {
// This tests makes sure that the default tags and metadata tags don't overwrite record data
// This test also covers the scenario where the metadata overwrites the default tag
p := &Parser{
ColumnNames: []string{"first", "second", "third"},
TagColumns: []string{"second", "third"},
TimeFunc: DefaultTime,
MetadataRows: 2,
MetadataSeparators: []string{"="},
}
err := p.Init()
require.NoError(t, err)
p.SetDefaultTags(map[string]string{"third": "bye", "fourth": "car"})
m, err := p.ParseLine("second=orange")
require.Error(t, io.EOF, err)
require.Error(t, err)
require.Nil(t, m)
m, err = p.ParseLine("fourth=plain")
require.NoError(t, err)
require.Nil(t, m)
expectedFields := []map[string]interface{}{{"first": 1.4}}
expectedTags := []map[string]string{{"second": "orange", "third": "bye", "fourth": "car"}}
m, err = p.ParseLine("1.4,apple,hi")
require.NoError(t, err)
require.Equal(t, expectedFields[0], m.Fields())
require.Equal(t, expectedTags[0], m.Tags())
}