feat(outputs.execd): Add option for batch format (#13673)

This commit is contained in:
Chase Sterling 2023-07-26 14:53:02 -04:00 committed by GitHub
parent f804d6395e
commit 087a6683f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 2 deletions

View File

@ -34,8 +34,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Flag to determine whether execd should throw error when part of metrics is unserializable ## Flag to determine whether execd should throw error when part of metrics is unserializable
## Setting this to true will skip the unserializable metrics and process the rest of metrics ## Setting this to true will skip the unserializable metrics and process the rest of metrics
## Setting this to false will throw error when encountering unserializable metrics and none will be processed ## Setting this to false will throw error when encountering unserializable metrics and none will be processed
## This setting does not apply when use_batch_format is set.
# ignore_serialization_error = false # ignore_serialization_error = false
## Use batch serialization instead of per metric. The batch format allows for the
## production of batch output formats and may more efficiently encode and write metrics.
# use_batch_format = false
## Data format to export. ## Data format to export.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:

View File

@ -24,6 +24,7 @@ type Execd struct {
Environment []string `toml:"environment"` Environment []string `toml:"environment"`
RestartDelay config.Duration `toml:"restart_delay"` RestartDelay config.Duration `toml:"restart_delay"`
IgnoreSerializationError bool `toml:"ignore_serialization_error"` IgnoreSerializationError bool `toml:"ignore_serialization_error"`
UseBatchFormat bool `toml:"use_batch_format"`
Log telegraf.Logger Log telegraf.Logger
process *process.Process process *process.Process
@ -78,6 +79,17 @@ func (e *Execd) Close() error {
} }
func (e *Execd) Write(metrics []telegraf.Metric) error { func (e *Execd) Write(metrics []telegraf.Metric) error {
if e.UseBatchFormat {
b, err := e.serializer.SerializeBatch(metrics)
if err != nil {
return fmt.Errorf("error serializing metrics: %w", err)
}
if _, err = e.process.Stdin.Write(b); err != nil {
return fmt.Errorf("error writing metrics: %w", err)
}
return nil
}
for _, m := range metrics { for _, m := range metrics {
b, err := e.serializer.Serialize(m) b, err := e.serializer.Serialize(m)
if err != nil { if err != nil {

View File

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"strconv"
"strings" "strings"
"sync" "sync"
"testing" "testing"
@ -33,7 +34,7 @@ func TestExternalOutputWorks(t *testing.T) {
e := &Execd{ e := &Execd{
Command: []string{exe, "-testoutput"}, Command: []string{exe, "-testoutput"},
Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"}, Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu", "METRIC_NUM=1"},
RestartDelay: config.Duration(5 * time.Second), RestartDelay: config.Duration(5 * time.Second),
serializer: serializer, serializer: serializer,
Log: testutil.Logger{}, Log: testutil.Logger{},
@ -71,6 +72,61 @@ func TestExternalOutputWorks(t *testing.T) {
wg.Wait() wg.Wait()
} }
func TestBatchOutputWorks(t *testing.T) {
serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init())
exe, err := os.Executable()
require.NoError(t, err)
e := &Execd{
Command: []string{exe, "-testoutput"},
Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu", "METRIC_NUM=2"},
RestartDelay: config.Duration(5 * time.Second),
UseBatchFormat: true,
serializer: serializer,
Log: testutil.Logger{},
}
require.NoError(t, e.Init())
wg := &sync.WaitGroup{}
wg.Add(1)
e.process.ReadStderrFn = func(rstderr io.Reader) {
scanner := bufio.NewScanner(rstderr)
for scanner.Scan() {
t.Errorf("stderr: %q", scanner.Text())
}
if err := scanner.Err(); err != nil {
if !strings.HasSuffix(err.Error(), "already closed") {
t.Errorf("error reading stderr: %v", err)
}
}
wg.Done()
}
m := metric.New(
"cpu",
map[string]string{"name": "cpu1"},
map[string]interface{}{"idle": 50, "sys": 30},
now,
)
m2 := metric.New(
"cpu",
map[string]string{"name": "cpu1"},
map[string]interface{}{"idle": 50, "sys": 30},
now,
)
require.NoError(t, e.Connect())
require.NoError(t, e.Write([]telegraf.Metric{m, m2}))
require.NoError(t, e.Close())
wg.Wait()
}
func TestPartiallyUnserializableThrowError(t *testing.T) { func TestPartiallyUnserializableThrowError(t *testing.T) {
serializer := &influxSerializer.Serializer{} serializer := &influxSerializer.Serializer{}
require.NoError(t, serializer.Init()) require.NoError(t, serializer.Init())
@ -161,13 +217,20 @@ func TestMain(m *testing.M) {
func runOutputConsumerProgram() { func runOutputConsumerProgram() {
metricName := os.Getenv("METRIC_NAME") metricName := os.Getenv("METRIC_NAME")
expectedMetrics, err := strconv.Atoi(os.Getenv("METRIC_NUM"))
if err != nil {
fmt.Fprintf(os.Stderr, "could not parse METRIC_NUM\n")
//nolint:revive // error code is important for this "test"
os.Exit(1)
}
parser := influx.NewStreamParser(os.Stdin) parser := influx.NewStreamParser(os.Stdin)
numMetrics := 0
for { for {
m, err := parser.Next() m, err := parser.Next()
if err != nil { if err != nil {
if errors.Is(err, influx.EOF) { if errors.Is(err, influx.EOF) {
return // stream ended break // stream ended
} }
var parseErr *influx.ParseError var parseErr *influx.ParseError
if errors.As(err, &parseErr) { if errors.As(err, &parseErr) {
@ -179,6 +242,7 @@ func runOutputConsumerProgram() {
//nolint:revive // error code is important for this "test" //nolint:revive // error code is important for this "test"
os.Exit(1) os.Exit(1)
} }
numMetrics++
expected := testutil.MustMetric(metricName, expected := testutil.MustMetric(metricName,
map[string]string{"name": "cpu1"}, map[string]string{"name": "cpu1"},
@ -192,4 +256,9 @@ func runOutputConsumerProgram() {
os.Exit(1) os.Exit(1)
} }
} }
if expectedMetrics != numMetrics {
fmt.Fprintf(os.Stderr, "number of metrics doesn't match expected: %v, %v\n", numMetrics, expectedMetrics)
//nolint:revive // error code is important for this "test"
os.Exit(1)
}
} }

View File

@ -16,8 +16,13 @@
## Flag to determine whether execd should throw error when part of metrics is unserializable ## Flag to determine whether execd should throw error when part of metrics is unserializable
## Setting this to true will skip the unserializable metrics and process the rest of metrics ## Setting this to true will skip the unserializable metrics and process the rest of metrics
## Setting this to false will throw error when encountering unserializable metrics and none will be processed ## Setting this to false will throw error when encountering unserializable metrics and none will be processed
## This setting does not apply when use_batch_format is set.
# ignore_serialization_error = false # ignore_serialization_error = false
## Use batch serialization instead of per metric. The batch format allows for the
## production of batch output formats and may more efficiently encode and write metrics.
# use_batch_format = false
## Data format to export. ## Data format to export.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here: