From 7673624bcd7d632827adb33c25c95379b001f6f1 Mon Sep 17 00:00:00 2001 From: Chase Sterling Date: Thu, 12 Oct 2023 16:32:09 -0400 Subject: [PATCH] feat(outputs.exec): Add ability to exec command once per metric (#13672) --- plugins/outputs/exec/README.md | 4 ++ plugins/outputs/exec/exec.go | 45 +++++++++++------ plugins/outputs/exec/exec_test.go | 83 +++++++++++++++++++++++++++++++ plugins/outputs/exec/sample.conf | 4 ++ 4 files changed, 122 insertions(+), 14 deletions(-) diff --git a/plugins/outputs/exec/README.md b/plugins/outputs/exec/README.md index cd16e327c..19e201278 100644 --- a/plugins/outputs/exec/README.md +++ b/plugins/outputs/exec/README.md @@ -38,6 +38,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Timeout for command to complete. # timeout = "5s" + ## Whether the command gets executed once per metric, or once per metric batch + ## The serializer will also run in batch mode when this is true. + # use_batch_format = true + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/exec/exec.go b/plugins/outputs/exec/exec.go index 67ca31992..24aacdd68 100644 --- a/plugins/outputs/exec/exec.go +++ b/plugins/outputs/exec/exec.go @@ -26,10 +26,11 @@ const maxStderrBytes = 512 // Exec defines the exec output plugin. type Exec struct { - Command []string `toml:"command"` - Environment []string `toml:"environment"` - Timeout config.Duration `toml:"timeout"` - Log telegraf.Logger `toml:"-"` + Command []string `toml:"command"` + Environment []string `toml:"environment"` + Timeout config.Duration `toml:"timeout"` + UseBatchFormat bool `toml:"use_batch_format"` + Log telegraf.Logger `toml:"-"` runner Runner serializer serializers.Serializer @@ -63,17 +64,32 @@ func (e *Exec) Close() error { // Write writes the metrics to the configured command. func (e *Exec) Write(metrics []telegraf.Metric) error { var buffer bytes.Buffer - serializedMetrics, err := e.serializer.SerializeBatch(metrics) - if err != nil { - return err - } - buffer.Write(serializedMetrics) + if e.UseBatchFormat { + serializedMetrics, err := e.serializer.SerializeBatch(metrics) + if err != nil { + return err + } + buffer.Write(serializedMetrics) - if buffer.Len() <= 0 { - return nil - } + if buffer.Len() <= 0 { + return nil + } - return e.runner.Run(time.Duration(e.Timeout), e.Command, e.Environment, &buffer) + return e.runner.Run(time.Duration(e.Timeout), e.Command, e.Environment, &buffer) + } + errs := make([]error, 0, len(metrics)) + for _, metric := range metrics { + serializedMetric, err := e.serializer.Serialize(metric) + if err != nil { + return err + } + buffer.Reset() + buffer.Write(serializedMetric) + + err = e.runner.Run(time.Duration(e.Timeout), e.Command, e.Environment, &buffer) + errs = append(errs, err) + } + return errors.Join(errs...) } // Runner provides an interface for running exec.Cmd. @@ -149,7 +165,8 @@ func (c *CommandRunner) truncate(buf bytes.Buffer) string { func init() { outputs.Add("exec", func() telegraf.Output { return &Exec{ - Timeout: config.Duration(time.Second * 5), + Timeout: config.Duration(time.Second * 5), + UseBatchFormat: true, } }) } diff --git a/plugins/outputs/exec/exec_test.go b/plugins/outputs/exec/exec_test.go index 688bac4f9..3fe8fe6e7 100644 --- a/plugins/outputs/exec/exec_test.go +++ b/plugins/outputs/exec/exec_test.go @@ -2,6 +2,9 @@ package exec import ( "bytes" + "errors" + "github.com/influxdata/telegraf/metric" + "io" "strings" "testing" "time" @@ -10,10 +13,90 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + influxParser "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) +var now = time.Date(2020, 6, 30, 16, 16, 0, 0, time.UTC) + +type MockRunner struct { + runs []int +} + +// Run runs the command. +func (c *MockRunner) Run(timeout time.Duration, command []string, environments []string, buffer io.Reader) error { + parser := influxParser.NewStreamParser(buffer) + numMetrics := 0 + + for { + _, err := parser.Next() + if err != nil { + if errors.Is(err, influxParser.EOF) { + break // stream ended + } + continue + } + numMetrics++ + } + + c.runs = append(c.runs, numMetrics) + return nil +} + +func TestExternalOutputBatch(t *testing.T) { + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) + + runner := MockRunner{} + + e := &Exec{ + UseBatchFormat: true, + serializer: serializer, + Log: testutil.Logger{}, + runner: &runner, + } + + m := metric.New( + "cpu", + map[string]string{"name": "cpu1"}, + map[string]interface{}{"idle": 50, "sys": 30}, + now, + ) + + require.NoError(t, e.Connect()) + require.NoError(t, e.Write([]telegraf.Metric{m, m})) + // Make sure it executed the command once, with 2 metrics + require.Equal(t, []int{2}, runner.runs) + require.NoError(t, e.Close()) +} + +func TestExternalOutputNoBatch(t *testing.T) { + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) + runner := MockRunner{} + + e := &Exec{ + UseBatchFormat: false, + serializer: serializer, + Log: testutil.Logger{}, + runner: &runner, + } + + m := metric.New( + "cpu", + map[string]string{"name": "cpu1"}, + map[string]interface{}{"idle": 50, "sys": 30}, + now, + ) + + require.NoError(t, e.Connect()) + require.NoError(t, e.Write([]telegraf.Metric{m, m})) + // Make sure it executed the command twice, both with a single metric + require.Equal(t, []int{1, 1}, runner.runs) + require.NoError(t, e.Close()) +} + func TestExec(t *testing.T) { t.Skip("Skipping test due to OS/executable dependencies and race condition when ran as part of a test-all") diff --git a/plugins/outputs/exec/sample.conf b/plugins/outputs/exec/sample.conf index 4685af1aa..5c718a97f 100644 --- a/plugins/outputs/exec/sample.conf +++ b/plugins/outputs/exec/sample.conf @@ -12,6 +12,10 @@ ## Timeout for command to complete. # timeout = "5s" + ## Whether the command gets executed once per metric, or once per metric batch + ## The serializer will also run in batch mode when this is true. + # use_batch_format = true + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: