diff --git a/plugins/processors/execd/execd.go b/plugins/processors/execd/execd.go index 481ffd35f..75b72978f 100644 --- a/plugins/processors/execd/execd.go +++ b/plugins/processors/execd/execd.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal/process" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/processors" ) @@ -32,12 +33,6 @@ type Execd struct { process *process.Process } -func New() *Execd { - return &Execd{ - RestartDelay: config.Duration(10 * time.Second), - } -} - func (e *Execd) SetParser(p telegraf.Parser) { e.parser = p } @@ -101,7 +96,14 @@ func (e *Execd) Stop() { func (e *Execd) cmdReadOut(out io.Reader) { // Prefer using the StreamParser when parsing influx format. - if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser { + var parser telegraf.Parser + if rp, ok := e.parser.(*models.RunningParser); ok { + parser = rp.Parser + } else { + parser = e.parser + } + + if _, isInfluxParser := parser.(*influx.Parser); isInfluxParser { e.cmdReadOutStream(out) return } @@ -175,6 +177,8 @@ func (e *Execd) Init() error { func init() { processors.AddStreaming("execd", func() telegraf.StreamingProcessor { - return New() + return &Execd{ + RestartDelay: config.Duration(10 * time.Second), + } }) } diff --git a/plugins/processors/execd/execd_test.go b/plugins/processors/execd/execd_test.go index 88d2e63ad..1b45c3ebd 100644 --- a/plugins/processors/execd/execd_test.go +++ b/plugins/processors/execd/execd_test.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -24,101 +25,94 @@ import ( ) func TestExternalProcessorWorks(t *testing.T) { - e := New() - e.Log = testutil.Logger{} + // Determine name of the test executable for mocking an external program + exe, err := os.Executable() + require.NoError(t, err) + // Setup the plugin + plugin := &Execd{ + Command: []string{ + exe, + "-case", "multiply", + "-field", "count", + }, + Environment: []string{"PLUGINS_PROCESSORS_EXECD_MODE=application"}, + RestartDelay: config.Duration(5 * time.Second), + Log: testutil.Logger{}, + } + + // Setup the parser and serializer in the processor parser := &influx.Parser{} require.NoError(t, parser.Init()) - e.SetParser(parser) + plugin.SetParser(parser) serializer := &serializers_influx.Serializer{} require.NoError(t, serializer.Init()) - e.SetSerializer(serializer) - - exe, err := os.Executable() - require.NoError(t, err) - t.Log(exe) - e.Command = []string{exe, "-countmultiplier"} - e.Environment = []string{"PLUGINS_PROCESSORS_EXECD_MODE=application", "FIELD_NAME=count"} - e.RestartDelay = config.Duration(5 * time.Second) - - acc := &testutil.Accumulator{} - - require.NoError(t, e.Start(acc)) + plugin.SetSerializer(serializer) + // Setup the input and expected output metrucs now := time.Now() - orig := now + var input []telegraf.Metric + var expected []telegraf.Metric for i := 0; i < 10; i++ { - m := metric.New("test", - map[string]string{ - "city": "Toronto", - }, - map[string]interface{}{ - "population": 6000000, - "count": 1, - }, - now) - now = now.Add(1) + m := metric.New( + "test", + map[string]string{"city": "Toronto"}, + map[string]interface{}{"population": 6000000, "count": 1}, + now.Add(time.Duration(i)), + ) + input = append(input, m) - require.NoError(t, e.Add(m, acc)) + e := m.Copy() + e.AddField("count", 2) + expected = append(expected, e) } - acc.Wait(1) - e.Stop() - acc.Wait(9) - - metrics := acc.GetTelegrafMetrics() - m := metrics[0] - - expected := testutil.MustMetric("test", - map[string]string{ - "city": "Toronto", - }, - map[string]interface{}{ - "population": 6000000, - "count": 2, - }, - orig, - ) - testutil.RequireMetricEqual(t, expected, m) - - metricTime := m.Time().UnixNano() - - // make sure the other 9 are ordered properly - for i := 0; i < 9; i++ { - m = metrics[i+1] - require.EqualValues(t, metricTime+1, m.Time().UnixNano()) - metricTime = m.Time().UnixNano() + // Perform the test and check the result + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + for _, m := range input { + require.NoError(t, plugin.Add(m, &acc)) } + + require.Eventually(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, 3*time.Second, 100*time.Millisecond) + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } func TestParseLinesWithNewLines(t *testing.T) { - e := New() - e.Log = testutil.Logger{} + // Determine name of the test executable for mocking an external program + exe, err := os.Executable() + require.NoError(t, err) + // Setup the plugin + plugin := &Execd{ + Command: []string{ + exe, + "-case", "multiply", + "-field", "count", + }, + Environment: []string{"PLUGINS_PROCESSORS_EXECD_MODE=application"}, + RestartDelay: config.Duration(5 * time.Second), + Log: testutil.Logger{}, + } + + // Setup the parser and serializer in the processor parser := &influx.Parser{} require.NoError(t, parser.Init()) - e.SetParser(parser) + plugin.SetParser(parser) serializer := &serializers_influx.Serializer{} require.NoError(t, serializer.Init()) - e.SetSerializer(serializer) - - exe, err := os.Executable() - require.NoError(t, err) - t.Log(exe) - e.Command = []string{exe, "-countmultiplier"} - e.Environment = []string{"PLUGINS_PROCESSORS_EXECD_MODE=application", "FIELD_NAME=count"} - e.RestartDelay = config.Duration(5 * time.Second) - - acc := &testutil.Accumulator{} - - require.NoError(t, e.Start(acc)) + plugin.SetSerializer(serializer) + // Setup the input and expected output metrucs now := time.Now() - orig := now - - m := metric.New("test", + input := metric.New( + "test", map[string]string{ "author": "Mr. Gopher", }, @@ -126,93 +120,90 @@ func TestParseLinesWithNewLines(t *testing.T) { "phrase": "Gophers are amazing creatures.\nAbsolutely amazing.", "count": 3, }, - now) - - require.NoError(t, e.Add(m, acc)) - - acc.Wait(1) - e.Stop() - - processedMetric := acc.GetTelegrafMetrics()[0] - - expectedMetric := testutil.MustMetric("test", - map[string]string{ - "author": "Mr. Gopher", - }, - map[string]interface{}{ - "phrase": "Gophers are amazing creatures.\nAbsolutely amazing.", - "count": 6, - }, - orig, + now, ) - - testutil.RequireMetricEqual(t, expectedMetric, processedMetric) -} - -var countmultiplier = flag.Bool("countmultiplier", false, - "if true, act like line input program instead of test") - -func TestMain(m *testing.M) { - flag.Parse() - runMode := os.Getenv("PLUGINS_PROCESSORS_EXECD_MODE") - if *countmultiplier && runMode == "application" { - runCountMultiplierProgram() - os.Exit(0) + expected := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"author": "Mr. Gopher"}, + map[string]interface{}{ + "phrase": "Gophers are amazing creatures.\nAbsolutely amazing.", + "count": 6, + }, + now, + ), } - code := m.Run() - os.Exit(code) + + // Perform the test and check the result + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + require.NoError(t, plugin.Add(input, &acc)) + + require.Eventually(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, 3*time.Second, 100*time.Millisecond) + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } -func runCountMultiplierProgram() { - fieldName := os.Getenv("FIELD_NAME") - parser := influx.NewStreamParser(os.Stdin) +func TestLongLinesForLineProtocol(t *testing.T) { + // Determine name of the test executable for mocking an external program + exe, err := os.Executable() + require.NoError(t, err) + + // Setup the plugin + plugin := &Execd{ + Command: []string{ + exe, + "-case", "long", + "-field", "long", + }, + Environment: []string{"PLUGINS_PROCESSORS_EXECD_MODE=application"}, + RestartDelay: config.Duration(5 * time.Second), + Log: testutil.Logger{}, + } + + // Setup the parser and serializer in the processor + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + serializer := &serializers_influx.Serializer{} - //nolint:errcheck // this should always succeed - serializer.Init() + require.NoError(t, serializer.Init()) + plugin.SetSerializer(serializer) - for { - m, err := parser.Next() - if err != nil { - if errors.Is(err, influx.EOF) { - return // stream ended - } - var parseErr *influx.ParseError - if errors.As(err, &parseErr) { - fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) - //nolint:revive // os.Exit called intentionally - os.Exit(1) - } - fmt.Fprintf(os.Stderr, "ERR %v\n", err) - //nolint:revive // os.Exit called intentionally - os.Exit(1) - } - - c, found := m.GetField(fieldName) - if !found { - fmt.Fprintf(os.Stderr, "metric has no %s field\n", fieldName) - //nolint:revive // os.Exit called intentionally - os.Exit(1) - } - switch t := c.(type) { - case float64: - t *= 2 - m.AddField(fieldName, t) - case int64: - t *= 2 - m.AddField(fieldName, t) - default: - fmt.Fprintf(os.Stderr, "%s is not an unknown type, it's a %T\n", fieldName, c) - //nolint:revive // os.Exit called intentionally - os.Exit(1) - } - b, err := serializer.Serialize(m) - if err != nil { - fmt.Fprintf(os.Stderr, "ERR %v\n", err) - //nolint:revive // os.Exit called intentionally - os.Exit(1) - } - fmt.Fprint(os.Stdout, string(b)) + // Setup the input and expected output metrucs + now := time.Now() + input := metric.New( + "test", + map[string]string{"author": "Mr. Gopher"}, + map[string]interface{}{"count": 3}, + now, + ) + expected := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"author": "Mr. Gopher"}, + map[string]interface{}{ + "long": strings.Repeat("foobar", 280_000/6), + "count": 3, + }, + now, + ), } + + // Perform the test and check the result + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + require.NoError(t, plugin.Add(input, &acc)) + + require.Eventually(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, 3*time.Second, 100*time.Millisecond) + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } func TestCases(t *testing.T) { @@ -225,7 +216,7 @@ func TestCases(t *testing.T) { // Set up for file inputs processors.AddStreaming("execd", func() telegraf.StreamingProcessor { - return New() + return &Execd{RestartDelay: config.Duration(10 * time.Second)} }) for _, f := range folders { @@ -266,8 +257,6 @@ func TestCases(t *testing.T) { plugin.Stop() require.Eventually(t, func() bool { - acc.Lock() - defer acc.Unlock() return acc.NMetrics() >= uint64(len(expected)) }, time.Second, 100*time.Millisecond) @@ -353,8 +342,12 @@ func TestTracking(t *testing.T) { require.NoError(t, err) plugin := &Execd{ - Command: []string{exe, "-countmultiplier"}, - Environment: []string{"PLUGINS_PROCESSORS_EXECD_MODE=application", "FIELD_NAME=count"}, + Command: []string{ + exe, + "-case", "multiply", + "-field", "count", + }, + Environment: []string{"PLUGINS_PROCESSORS_EXECD_MODE=application"}, RestartDelay: config.Duration(5 * time.Second), Log: testutil.Logger{}, } @@ -395,3 +388,105 @@ func TestTracking(t *testing.T) { return len(input) == len(delivered) }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected)) } + +func TestMain(m *testing.M) { + var testcase, field string + flag.StringVar(&testcase, "case", "", "test-case to mock [multiply, long]") + flag.StringVar(&field, "field", "count", "name of the field to multiply") + flag.Parse() + + if os.Getenv("PLUGINS_PROCESSORS_EXECD_MODE") != "application" || testcase == "" { + os.Exit(m.Run()) + } + + switch testcase { + case "multiply": + os.Exit(runTestCaseMultiply(field)) + case "long": + os.Exit(runTestCaseLong(field)) + } + os.Exit(5) +} + +func runTestCaseMultiply(field string) int { + parser := influx.NewStreamParser(os.Stdin) + serializer := &serializers_influx.Serializer{} + if err := serializer.Init(); err != nil { + fmt.Fprintf(os.Stderr, "initialization ERR %v\n", err) + return 1 + } + + for { + m, err := parser.Next() + if err != nil { + if errors.Is(err, influx.EOF) { + return 0 + } + var parseErr *influx.ParseError + if errors.As(err, &parseErr) { + fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) + return 1 + } + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + return 1 + } + + c, found := m.GetField(field) + if !found { + fmt.Fprintf(os.Stderr, "metric has no field %q\n", field) + return 1 + } + switch t := c.(type) { + case float64: + m.AddField(field, t*2) + case int64: + m.AddField(field, t*2) + default: + fmt.Fprintf(os.Stderr, "%s has an unknown type, it's a %T\n", field, c) + return 1 + } + b, err := serializer.Serialize(m) + if err != nil { + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + return 1 + } + fmt.Fprint(os.Stdout, string(b)) + } +} + +func runTestCaseLong(field string) int { + parser := influx.NewStreamParser(os.Stdin) + serializer := &serializers_influx.Serializer{} + if err := serializer.Init(); err != nil { + fmt.Fprintf(os.Stderr, "initialization ERR %v\n", err) + return 1 + } + + // Setup a field with a lot of characters to exceed the scanner limit + long := strings.Repeat("foobar", 280_000/6) + + for { + m, err := parser.Next() + if err != nil { + if errors.Is(err, influx.EOF) { + return 0 + } + var parseErr *influx.ParseError + if errors.As(err, &parseErr) { + fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) + return 1 + } + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + return 1 + } + + m.AddField(field, long) + + b, err := serializer.Serialize(m) + if err != nil { + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + return 1 + } + fmt.Fprint(os.Stdout, string(b)) + } +}