diff --git a/plugins/outputs/execd/README.md b/plugins/outputs/execd/README.md index 314ee5e82..1b8a45159 100644 --- a/plugins/outputs/execd/README.md +++ b/plugins/outputs/execd/README.md @@ -22,6 +22,11 @@ Telegraf minimum version: Telegraf 1.15.0 ## Delay before the process is restarted after an unexpected termination restart_delay = "10s" + ## Flag to determine whether execd should throw error when part of metrics is unserializable + ## Setting this to true will skip the unserializable metrics and process the rest of metrics + ## Setting this to false will throw error when encountering unserializable metrics and none will be processed + # ignore_serialization_error = false + ## Data format to export. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/execd/execd.go b/plugins/outputs/execd/execd.go index ba979db50..2d8ac42ec 100644 --- a/plugins/outputs/execd/execd.go +++ b/plugins/outputs/execd/execd.go @@ -22,10 +22,11 @@ import ( var sampleConfig string type Execd struct { - Command []string `toml:"command"` - Environment []string `toml:"environment"` - RestartDelay config.Duration `toml:"restart_delay"` - Log telegraf.Logger + Command []string `toml:"command"` + Environment []string `toml:"environment"` + RestartDelay config.Duration `toml:"restart_delay"` + IgnoreSerializationError bool `toml:"ignore_serialization_error"` + Log telegraf.Logger process *process.Process serializer serializers.Serializer @@ -82,7 +83,11 @@ func (e *Execd) Write(metrics []telegraf.Metric) error { for _, m := range metrics { b, err := e.serializer.Serialize(m) if err != nil { - return fmt.Errorf("error serializing metrics: %s", err) + if !e.IgnoreSerializationError { + return fmt.Errorf("error serializing metrics: %w", err) + } + e.Log.Error("Skipping metric due to a serialization error: %w", err) + continue } if _, err = e.process.Stdin.Write(b); err != nil { diff --git a/plugins/outputs/execd/execd_test.go b/plugins/outputs/execd/execd_test.go index ff2ef5b92..b4653db4c 100644 --- a/plugins/outputs/execd/execd_test.go +++ b/plugins/outputs/execd/execd_test.go @@ -70,6 +70,80 @@ func TestExternalOutputWorks(t *testing.T) { wg.Wait() } +func TestPartiallyUnserializableThrowError(t *testing.T) { + influxSerializer, err := serializers.NewInfluxSerializer() + require.NoError(t, err) + + exe, err := os.Executable() + require.NoError(t, err) + + e := &Execd{ + Command: []string{exe, "-testoutput"}, + Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"}, + RestartDelay: config.Duration(5 * time.Second), + IgnoreSerializationError: false, + serializer: influxSerializer, + Log: testutil.Logger{}, + } + + require.NoError(t, e.Init()) + + m1 := metric.New( + "cpu", + map[string]string{"name": "cpu1"}, + map[string]interface{}{"idle": 50, "sys": 30}, + now, + ) + + m2 := metric.New( + "cpu", + map[string]string{"name": "cpu2"}, + map[string]interface{}{}, + now, + ) + + require.NoError(t, e.Connect()) + require.Error(t, e.Write([]telegraf.Metric{m1, m2})) + require.NoError(t, e.Close()) +} + +func TestPartiallyUnserializableCanBeSkipped(t *testing.T) { + influxSerializer, err := serializers.NewInfluxSerializer() + require.NoError(t, err) + + exe, err := os.Executable() + require.NoError(t, err) + + e := &Execd{ + Command: []string{exe, "-testoutput"}, + Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"}, + RestartDelay: config.Duration(5 * time.Second), + IgnoreSerializationError: true, + serializer: influxSerializer, + Log: testutil.Logger{}, + } + + require.NoError(t, e.Init()) + + m1 := metric.New( + "cpu", + map[string]string{"name": "cpu1"}, + map[string]interface{}{"idle": 50, "sys": 30}, + now, + ) + + m2 := metric.New( + "cpu", + map[string]string{"name": "cpu2"}, + map[string]interface{}{}, + now, + ) + + require.NoError(t, e.Connect()) + require.NoError(t, e.Write([]telegraf.Metric{m1, m2})) + require.NoError(t, e.Close()) +} + var testoutput = flag.Bool("testoutput", false, "if true, act like line input program instead of test") diff --git a/plugins/outputs/execd/sample.conf b/plugins/outputs/execd/sample.conf index 2d1315840..0ec0e2542 100644 --- a/plugins/outputs/execd/sample.conf +++ b/plugins/outputs/execd/sample.conf @@ -13,6 +13,11 @@ ## Delay before the process is restarted after an unexpected termination restart_delay = "10s" + ## Flag to determine whether execd should throw error when part of metrics is unserializable + ## Setting this to true will skip the unserializable metrics and process the rest of metrics + ## Setting this to false will throw error when encountering unserializable metrics and none will be processed + # ignore_serialization_error = false + ## Data format to export. ## Each data format has its own unique set of configuration options, read ## more about them here: