diff --git a/config/config.go b/config/config.go index 20c817fa4..b4ccf9fe0 100644 --- a/config/config.go +++ b/config/config.go @@ -31,6 +31,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/secretstores" "github.com/influxdata/telegraf/plugins/serializers" @@ -882,6 +883,14 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) } parser := creator(parentname) + // Handle reset-mode of CSV parsers to stay backward compatible (see issue #12022) + if dataformat == "csv" && parentcategory == "inputs" { + if parentname == "exec" { + csvParser := parser.(*csv.Parser) + csvParser.ResetMode = "always" + } + } + conf := c.buildParser(parentname, table) if err := c.toml.UnmarshalTable(table, parser); err != nil { return nil, err diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 958f9dcac..3ac20f0fe 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -12,8 +12,14 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/testutil" @@ -299,3 +305,155 @@ func TestRemoveCarriageReturns(t *testing.T) { } } } + +func TestCSVBehavior(t *testing.T) { + // Setup the CSV parser + parser := &csv.Parser{ + MetricName: "exec", + HeaderRowCount: 1, + ResetMode: "always", + } + require.NoError(t, parser.Init()) + + // Setup the plugin + plugin := NewExec() + plugin.Commands = []string{"echo \"a,b\n1,2\n3,4\""} + plugin.Log = testutil.Logger{} + plugin.SetParser(parser) + require.NoError(t, plugin.Init()) + + expected := []telegraf.Metric{ + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 1), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 2), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 3), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 4), + ), + } + + var acc testutil.Accumulator + // Run gather once + require.NoError(t, plugin.Gather(&acc)) + // Run gather a second time + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} + +func TestCases(t *testing.T) { + // Register the plugin + inputs.Add("exec", func() telegraf.Input { + return NewExec() + }) + + // Setup the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfigData([]byte(` + [[inputs.exec]] + commands = [ "echo \"a,b\n1,2\n3,4\"" ] + data_format = "csv" + csv_header_row_count = 1 +`))) + require.Len(t, cfg.Inputs, 1) + plugin := cfg.Inputs[0] + require.NoError(t, plugin.Init()) + + expected := []telegraf.Metric{ + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 1), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 2), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 3), + ), + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 4), + ), + } + + var acc testutil.Accumulator + // Run gather once + require.NoError(t, plugin.Gather(&acc)) + // Run gather a second time + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} diff --git a/plugins/inputs/file/file_test.go b/plugins/inputs/file/file_test.go index ffcc02dd1..1dac3ea44 100644 --- a/plugins/inputs/file/file_test.go +++ b/plugins/inputs/file/file_test.go @@ -11,9 +11,11 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/json" @@ -371,3 +373,80 @@ func TestStatefulParsers(t *testing.T) { }) } } + +func TestCSVBehavior(t *testing.T) { + // Setup the CSV parser creator function + parserFunc := func() (telegraf.Parser, error) { + parser := &csv.Parser{ + MetricName: "file", + HeaderRowCount: 1, + } + err := parser.Init() + return parser, err + } + + // Setup the plugin + plugin := &File{ + Files: []string{filepath.Join("testdata", "csv_behavior_input.csv")}, + } + plugin.SetParserFunc(parserFunc) + require.NoError(t, plugin.Init()) + + expected := []telegraf.Metric{ + metric.New( + "file", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 1), + ), + metric.New( + "file", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 2), + ), + metric.New( + "file", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 3), + ), + metric.New( + "file", + map[string]string{}, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 4), + ), + } + + var acc testutil.Accumulator + // Run gather once + require.NoError(t, plugin.Gather(&acc)) + // Run gather a second time + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} diff --git a/plugins/inputs/file/testdata/csv_behavior_input.csv b/plugins/inputs/file/testdata/csv_behavior_input.csv new file mode 100644 index 000000000..0099ae937 --- /dev/null +++ b/plugins/inputs/file/testdata/csv_behavior_input.csv @@ -0,0 +1,3 @@ +a,b +1,2 +3,4 diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index e9e2bcdbf..303a069f3 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -9,10 +9,12 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/grok" @@ -688,6 +690,96 @@ func TestTailEOF(t *testing.T) { require.NoError(t, err) } +func TestCSVBehavior(t *testing.T) { + // Prepare the input file + input, err := os.CreateTemp("", "") + require.NoError(t, err) + defer os.Remove(input.Name()) + // Write header + _, err = input.WriteString("a,b\n") + require.NoError(t, err) + require.NoError(t, input.Sync()) + + // Setup the CSV parser creator function + parserFunc := func() (parsers.Parser, error) { + parser := &csv.Parser{ + MetricName: "tail", + HeaderRowCount: 1, + } + err := parser.Init() + return parser, err + } + + // Setup the plugin + plugin := &Tail{ + Files: []string{input.Name()}, + FromBeginning: true, + MaxUndeliveredLines: 1000, + offsets: make(map[string]int64, 0), + PathTag: "path", + Log: testutil.Logger{}, + } + plugin.SetParserFunc(parserFunc) + require.NoError(t, plugin.Init()) + + expected := []telegraf.Metric{ + metric.New( + "tail", + map[string]string{ + "path": input.Name(), + }, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + }, + time.Unix(0, 0), + ), + metric.New( + "tail", + map[string]string{ + "path": input.Name(), + }, + map[string]interface{}{ + "a": int64(3), + "b": int64(4), + }, + time.Unix(0, 0), + ), + } + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Write the first line of data + _, err = input.WriteString("1,2\n") + require.NoError(t, err) + require.NoError(t, input.Sync()) + require.NoError(t, plugin.Gather(&acc)) + + // Write another line of data + _, err = input.WriteString("3,4\n") + require.NoError(t, err) + require.NoError(t, input.Sync()) + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) + + // Close the input file + require.NoError(t, input.Close()) +} + func getTestdataDir() string { dir, err := os.Getwd() if err != nil {