diff --git a/plugins/outputs/execd/README.md b/plugins/outputs/execd/README.md index 696856f8a..c5e9f17d0 100644 --- a/plugins/outputs/execd/README.md +++ b/plugins/outputs/execd/README.md @@ -34,8 +34,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Flag to determine whether execd should throw error when part of metrics is unserializable ## Setting this to true will skip the unserializable metrics and process the rest of metrics ## Setting this to false will throw error when encountering unserializable metrics and none will be processed + ## This setting does not apply when use_batch_format is set. # ignore_serialization_error = false + ## Use batch serialization instead of per metric. The batch format allows for the + ## production of batch output formats and may more efficiently encode and write metrics. + # use_batch_format = false + ## Data format to export. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/execd/execd.go b/plugins/outputs/execd/execd.go index 06bd5a4ac..016e3eb0d 100644 --- a/plugins/outputs/execd/execd.go +++ b/plugins/outputs/execd/execd.go @@ -24,6 +24,7 @@ type Execd struct { Environment []string `toml:"environment"` RestartDelay config.Duration `toml:"restart_delay"` IgnoreSerializationError bool `toml:"ignore_serialization_error"` + UseBatchFormat bool `toml:"use_batch_format"` Log telegraf.Logger process *process.Process @@ -78,6 +79,17 @@ func (e *Execd) Close() error { } func (e *Execd) Write(metrics []telegraf.Metric) error { + if e.UseBatchFormat { + b, err := e.serializer.SerializeBatch(metrics) + if err != nil { + return fmt.Errorf("error serializing metrics: %w", err) + } + + if _, err = e.process.Stdin.Write(b); err != nil { + return fmt.Errorf("error writing metrics: %w", err) + } + return nil + } for _, m := range metrics { b, err := e.serializer.Serialize(m) if err != nil { diff --git a/plugins/outputs/execd/execd_test.go b/plugins/outputs/execd/execd_test.go index e501ccf31..eae28d19c 100644 --- a/plugins/outputs/execd/execd_test.go +++ b/plugins/outputs/execd/execd_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "strconv" "strings" "sync" "testing" @@ -33,7 +34,7 @@ func TestExternalOutputWorks(t *testing.T) { e := &Execd{ Command: []string{exe, "-testoutput"}, - Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"}, + Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu", "METRIC_NUM=1"}, RestartDelay: config.Duration(5 * time.Second), serializer: serializer, Log: testutil.Logger{}, @@ -71,6 +72,61 @@ func TestExternalOutputWorks(t *testing.T) { wg.Wait() } +func TestBatchOutputWorks(t *testing.T) { + serializer := &influxSerializer.Serializer{} + require.NoError(t, serializer.Init()) + + exe, err := os.Executable() + require.NoError(t, err) + + e := &Execd{ + Command: []string{exe, "-testoutput"}, + Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu", "METRIC_NUM=2"}, + RestartDelay: config.Duration(5 * time.Second), + UseBatchFormat: true, + serializer: serializer, + Log: testutil.Logger{}, + } + + require.NoError(t, e.Init()) + + wg := &sync.WaitGroup{} + wg.Add(1) + e.process.ReadStderrFn = func(rstderr io.Reader) { + scanner := bufio.NewScanner(rstderr) + + for scanner.Scan() { + t.Errorf("stderr: %q", scanner.Text()) + } + + if err := scanner.Err(); err != nil { + if !strings.HasSuffix(err.Error(), "already closed") { + t.Errorf("error reading stderr: %v", err) + } + } + wg.Done() + } + + m := metric.New( + "cpu", + map[string]string{"name": "cpu1"}, + map[string]interface{}{"idle": 50, "sys": 30}, + now, + ) + + m2 := metric.New( + "cpu", + map[string]string{"name": "cpu1"}, + map[string]interface{}{"idle": 50, "sys": 30}, + now, + ) + + require.NoError(t, e.Connect()) + require.NoError(t, e.Write([]telegraf.Metric{m, m2})) + require.NoError(t, e.Close()) + wg.Wait() +} + func TestPartiallyUnserializableThrowError(t *testing.T) { serializer := &influxSerializer.Serializer{} require.NoError(t, serializer.Init()) @@ -161,13 +217,20 @@ func TestMain(m *testing.M) { func runOutputConsumerProgram() { metricName := os.Getenv("METRIC_NAME") + expectedMetrics, err := strconv.Atoi(os.Getenv("METRIC_NUM")) + if err != nil { + fmt.Fprintf(os.Stderr, "could not parse METRIC_NUM\n") + //nolint:revive // error code is important for this "test" + os.Exit(1) + } parser := influx.NewStreamParser(os.Stdin) + numMetrics := 0 for { m, err := parser.Next() if err != nil { if errors.Is(err, influx.EOF) { - return // stream ended + break // stream ended } var parseErr *influx.ParseError if errors.As(err, &parseErr) { @@ -179,6 +242,7 @@ func runOutputConsumerProgram() { //nolint:revive // error code is important for this "test" os.Exit(1) } + numMetrics++ expected := testutil.MustMetric(metricName, map[string]string{"name": "cpu1"}, @@ -192,4 +256,9 @@ func runOutputConsumerProgram() { os.Exit(1) } } + if expectedMetrics != numMetrics { + fmt.Fprintf(os.Stderr, "number of metrics doesn't match expected: %v, %v\n", numMetrics, expectedMetrics) + //nolint:revive // error code is important for this "test" + os.Exit(1) + } } diff --git a/plugins/outputs/execd/sample.conf b/plugins/outputs/execd/sample.conf index 0ec0e2542..e1e4244c4 100644 --- a/plugins/outputs/execd/sample.conf +++ b/plugins/outputs/execd/sample.conf @@ -16,8 +16,13 @@ ## Flag to determine whether execd should throw error when part of metrics is unserializable ## Setting this to true will skip the unserializable metrics and process the rest of metrics ## Setting this to false will throw error when encountering unserializable metrics and none will be processed + ## This setting does not apply when use_batch_format is set. # ignore_serialization_error = false + ## Use batch serialization instead of per metric. The batch format allows for the + ## production of batch output formats and may more efficiently encode and write metrics. + # use_batch_format = false + ## Data format to export. ## Each data format has its own unique set of configuration options, read ## more about them here: