fix(outputs.execd): Fixing the execd behavior to not throw error when partially unserializable metrics are written. (#11767)

This commit is contained in:
Lucas Segawa 2022-09-09 23:00:47 +09:00 committed by GitHub
parent ca32cdc6e6
commit 8d416c4b7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 5 deletions

View File

@ -22,6 +22,11 @@ Telegraf minimum version: Telegraf 1.15.0
## Delay before the process is restarted after an unexpected termination ## Delay before the process is restarted after an unexpected termination
restart_delay = "10s" restart_delay = "10s"
## Flag to determine whether execd should throw error when part of metrics is unserializable
## Setting this to true will skip the unserializable metrics and process the rest of metrics
## Setting this to false will throw error when encountering unserializable metrics and none will be processed
# ignore_serialization_error = false
## Data format to export. ## Data format to export.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:

View File

@ -22,10 +22,11 @@ import (
var sampleConfig string var sampleConfig string
type Execd struct { type Execd struct {
Command []string `toml:"command"` Command []string `toml:"command"`
Environment []string `toml:"environment"` Environment []string `toml:"environment"`
RestartDelay config.Duration `toml:"restart_delay"` RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger IgnoreSerializationError bool `toml:"ignore_serialization_error"`
Log telegraf.Logger
process *process.Process process *process.Process
serializer serializers.Serializer serializer serializers.Serializer
@ -82,7 +83,11 @@ func (e *Execd) Write(metrics []telegraf.Metric) error {
for _, m := range metrics { for _, m := range metrics {
b, err := e.serializer.Serialize(m) b, err := e.serializer.Serialize(m)
if err != nil { if err != nil {
return fmt.Errorf("error serializing metrics: %s", err) if !e.IgnoreSerializationError {
return fmt.Errorf("error serializing metrics: %w", err)
}
e.Log.Error("Skipping metric due to a serialization error: %w", err)
continue
} }
if _, err = e.process.Stdin.Write(b); err != nil { if _, err = e.process.Stdin.Write(b); err != nil {

View File

@ -70,6 +70,80 @@ func TestExternalOutputWorks(t *testing.T) {
wg.Wait() wg.Wait()
} }
func TestPartiallyUnserializableThrowError(t *testing.T) {
influxSerializer, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
exe, err := os.Executable()
require.NoError(t, err)
e := &Execd{
Command: []string{exe, "-testoutput"},
Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"},
RestartDelay: config.Duration(5 * time.Second),
IgnoreSerializationError: false,
serializer: influxSerializer,
Log: testutil.Logger{},
}
require.NoError(t, e.Init())
m1 := metric.New(
"cpu",
map[string]string{"name": "cpu1"},
map[string]interface{}{"idle": 50, "sys": 30},
now,
)
m2 := metric.New(
"cpu",
map[string]string{"name": "cpu2"},
map[string]interface{}{},
now,
)
require.NoError(t, e.Connect())
require.Error(t, e.Write([]telegraf.Metric{m1, m2}))
require.NoError(t, e.Close())
}
func TestPartiallyUnserializableCanBeSkipped(t *testing.T) {
influxSerializer, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
exe, err := os.Executable()
require.NoError(t, err)
e := &Execd{
Command: []string{exe, "-testoutput"},
Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"},
RestartDelay: config.Duration(5 * time.Second),
IgnoreSerializationError: true,
serializer: influxSerializer,
Log: testutil.Logger{},
}
require.NoError(t, e.Init())
m1 := metric.New(
"cpu",
map[string]string{"name": "cpu1"},
map[string]interface{}{"idle": 50, "sys": 30},
now,
)
m2 := metric.New(
"cpu",
map[string]string{"name": "cpu2"},
map[string]interface{}{},
now,
)
require.NoError(t, e.Connect())
require.NoError(t, e.Write([]telegraf.Metric{m1, m2}))
require.NoError(t, e.Close())
}
var testoutput = flag.Bool("testoutput", false, var testoutput = flag.Bool("testoutput", false,
"if true, act like line input program instead of test") "if true, act like line input program instead of test")

View File

@ -13,6 +13,11 @@
## Delay before the process is restarted after an unexpected termination ## Delay before the process is restarted after an unexpected termination
restart_delay = "10s" restart_delay = "10s"
## Flag to determine whether execd should throw error when part of metrics is unserializable
## Setting this to true will skip the unserializable metrics and process the rest of metrics
## Setting this to false will throw error when encountering unserializable metrics and none will be processed
# ignore_serialization_error = false
## Data format to export. ## Data format to export.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here: