fix(inputs.exec): restore pre-v1.21 behavior for CSV data_format (#12533)

This commit is contained in:
Sven Rebhan 2023-01-26 21:51:39 +01:00 committed by GitHub
parent 8edcd8cb80
commit eb03bb5599
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 341 additions and 0 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/secretstores"
"github.com/influxdata/telegraf/plugins/serializers"
@ -882,6 +883,14 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
}
parser := creator(parentname)
// Handle reset-mode of CSV parsers to stay backward compatible (see issue #12022)
if dataformat == "csv" && parentcategory == "inputs" {
if parentname == "exec" {
csvParser := parser.(*csv.Parser)
csvParser.ResetMode = "always"
}
}
conf := c.buildParser(parentname, table)
if err := c.toml.UnmarshalTable(table, parser); err != nil {
return nil, err

View File

@ -12,8 +12,14 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/testutil"
@ -299,3 +305,155 @@ func TestRemoveCarriageReturns(t *testing.T) {
}
}
}
func TestCSVBehavior(t *testing.T) {
// Setup the CSV parser
parser := &csv.Parser{
MetricName: "exec",
HeaderRowCount: 1,
ResetMode: "always",
}
require.NoError(t, parser.Init())
// Setup the plugin
plugin := NewExec()
plugin.Commands = []string{"echo \"a,b\n1,2\n3,4\""}
plugin.Log = testutil.Logger{}
plugin.SetParser(parser)
require.NoError(t, plugin.Init())
expected := []telegraf.Metric{
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 1),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 2),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 3),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 4),
),
}
var acc testutil.Accumulator
// Run gather once
require.NoError(t, plugin.Gather(&acc))
// Run gather a second time
require.NoError(t, plugin.Gather(&acc))
require.Eventuallyf(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
// Check the result
options := []cmp.Option{
testutil.SortMetrics(),
testutil.IgnoreTime(),
}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, options...)
}
func TestCases(t *testing.T) {
// Register the plugin
inputs.Add("exec", func() telegraf.Input {
return NewExec()
})
// Setup the plugin
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfigData([]byte(`
[[inputs.exec]]
commands = [ "echo \"a,b\n1,2\n3,4\"" ]
data_format = "csv"
csv_header_row_count = 1
`)))
require.Len(t, cfg.Inputs, 1)
plugin := cfg.Inputs[0]
require.NoError(t, plugin.Init())
expected := []telegraf.Metric{
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 1),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 2),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 3),
),
metric.New(
"exec",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 4),
),
}
var acc testutil.Accumulator
// Run gather once
require.NoError(t, plugin.Gather(&acc))
// Run gather a second time
require.NoError(t, plugin.Gather(&acc))
require.Eventuallyf(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
// Check the result
options := []cmp.Option{
testutil.SortMetrics(),
testutil.IgnoreTime(),
}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, options...)
}

View File

@ -11,9 +11,11 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/json"
@ -371,3 +373,80 @@ func TestStatefulParsers(t *testing.T) {
})
}
}
func TestCSVBehavior(t *testing.T) {
// Setup the CSV parser creator function
parserFunc := func() (telegraf.Parser, error) {
parser := &csv.Parser{
MetricName: "file",
HeaderRowCount: 1,
}
err := parser.Init()
return parser, err
}
// Setup the plugin
plugin := &File{
Files: []string{filepath.Join("testdata", "csv_behavior_input.csv")},
}
plugin.SetParserFunc(parserFunc)
require.NoError(t, plugin.Init())
expected := []telegraf.Metric{
metric.New(
"file",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 1),
),
metric.New(
"file",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 2),
),
metric.New(
"file",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 3),
),
metric.New(
"file",
map[string]string{},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 4),
),
}
var acc testutil.Accumulator
// Run gather once
require.NoError(t, plugin.Gather(&acc))
// Run gather a second time
require.NoError(t, plugin.Gather(&acc))
require.Eventuallyf(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
// Check the result
options := []cmp.Option{
testutil.SortMetrics(),
testutil.IgnoreTime(),
}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, options...)
}

View File

@ -0,0 +1,3 @@
a,b
1,2
3,4
1 a b
2 1 2
3 3 4

View File

@ -9,10 +9,12 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/grok"
@ -688,6 +690,96 @@ func TestTailEOF(t *testing.T) {
require.NoError(t, err)
}
func TestCSVBehavior(t *testing.T) {
// Prepare the input file
input, err := os.CreateTemp("", "")
require.NoError(t, err)
defer os.Remove(input.Name())
// Write header
_, err = input.WriteString("a,b\n")
require.NoError(t, err)
require.NoError(t, input.Sync())
// Setup the CSV parser creator function
parserFunc := func() (parsers.Parser, error) {
parser := &csv.Parser{
MetricName: "tail",
HeaderRowCount: 1,
}
err := parser.Init()
return parser, err
}
// Setup the plugin
plugin := &Tail{
Files: []string{input.Name()},
FromBeginning: true,
MaxUndeliveredLines: 1000,
offsets: make(map[string]int64, 0),
PathTag: "path",
Log: testutil.Logger{},
}
plugin.SetParserFunc(parserFunc)
require.NoError(t, plugin.Init())
expected := []telegraf.Metric{
metric.New(
"tail",
map[string]string{
"path": input.Name(),
},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
},
time.Unix(0, 0),
),
metric.New(
"tail",
map[string]string{
"path": input.Name(),
},
map[string]interface{}{
"a": int64(3),
"b": int64(4),
},
time.Unix(0, 0),
),
}
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
// Write the first line of data
_, err = input.WriteString("1,2\n")
require.NoError(t, err)
require.NoError(t, input.Sync())
require.NoError(t, plugin.Gather(&acc))
// Write another line of data
_, err = input.WriteString("3,4\n")
require.NoError(t, err)
require.NoError(t, input.Sync())
require.NoError(t, plugin.Gather(&acc))
require.Eventuallyf(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
// Check the result
options := []cmp.Option{
testutil.SortMetrics(),
testutil.IgnoreTime(),
}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, options...)
// Close the input file
require.NoError(t, input.Close())
}
func getTestdataDir() string {
dir, err := os.Getwd()
if err != nil {