From aa030b569af01835cfe302565d428a4bc30f3363 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Tue, 26 Mar 2024 18:12:30 +0100 Subject: [PATCH] feat(outputs): Add framework to retry on startup errors (#14884) --- agent/agent.go | 25 ++-- config/config.go | 3 +- internal/errors.go | 39 +++++ models/running_output.go | 107 +++++++++++--- models/running_output_test.go | 255 ++++++++++++++++++++++++++++++++- plugins/outputs/kafka/kafka.go | 6 +- 6 files changed, 405 insertions(+), 30 deletions(-) create mode 100644 internal/errors.go diff --git a/agent/agent.go b/agent/agent.go index f4cf8a2c3..a60b813d4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -793,10 +793,17 @@ func (a *Agent) startOutputs( src := make(chan telegraf.Metric, 100) unit := &outputUnit{src: src} for _, output := range outputs { - err := a.connectOutput(ctx, output) - if err != nil { - for _, output := range unit.outputs { + if err := a.connectOutput(ctx, output); err != nil { + var fatalErr *internal.FatalError + if errors.As(err, &fatalErr) { + // If the model tells us to remove the plugin we do so without error + log.Printf("I! [agent] Failed to connect to [%s], error was %q; shutting down plugin...", output.LogName(), err) output.Close() + continue + } + + for _, unitOutput := range unit.outputs { + unitOutput.Close() } return nil, nil, fmt.Errorf("connecting output %s: %w", output.LogName(), err) } @@ -810,18 +817,14 @@ func (a *Agent) startOutputs( // connectOutputs connects to all outputs. func (a *Agent) connectOutput(ctx context.Context, output *models.RunningOutput) error { log.Printf("D! [agent] Attempting connection to [%s]", output.LogName()) - err := output.Output.Connect() - if err != nil { - log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+ - "error was %q", output.LogName(), err) + if err := output.Connect(); err != nil { + log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, error was %q", output.LogName(), err) - err := internal.SleepContext(ctx, 15*time.Second) - if err != nil { + if err := internal.SleepContext(ctx, 15*time.Second); err != nil { return err } - err = output.Output.Connect() - if err != nil { + if err = output.Connect(); err != nil { return fmt.Errorf("error connecting to output %q: %w", output.LogName(), err) } } diff --git a/config/config.go b/config/config.go index 844581ffc..2a255a1ea 100644 --- a/config/config.go +++ b/config/config.go @@ -1486,6 +1486,7 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, c.getFieldString(tbl, "name_override", &oc.NameOverride) c.getFieldString(tbl, "name_suffix", &oc.NameSuffix) c.getFieldString(tbl, "name_prefix", &oc.NamePrefix) + c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior) if c.hasErrs() { return nil, c.firstErr() @@ -1510,7 +1511,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { "name_override", "name_prefix", "name_suffix", "namedrop", "namedrop_separator", "namepass", "namepass_separator", "order", "pass", "period", "precision", - "tagdrop", "tagexclude", "taginclude", "tagpass", "tags": + "tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "startup_error_behavior": // Secret-store options to ignore case "id": diff --git a/internal/errors.go b/internal/errors.go new file mode 100644 index 000000000..a1f58c3eb --- /dev/null +++ b/internal/errors.go @@ -0,0 +1,39 @@ +package internal + +import "errors" + +var ErrNotConnected = errors.New("not connected") + +// StartupError indicates an error that occurred during startup of a plugin +// e.g. due to connectivity issues or resources being not yet available. +// In case the 'Retry' flag is set, the startup of the plugin might be retried +// depending on the configured startup-error-behavior. The 'RemovePlugin' +// flag denotes if the agent should remove the plugin from further processing. +type StartupError struct { + Err error + Retry bool + Partial bool +} + +func (e *StartupError) Error() string { + return e.Err.Error() +} + +func (e *StartupError) Unwrap() error { + return e.Err +} + +// FatalError indicates a not-recoverable error in the plugin. The corresponding +// plugin should be remove by the agent stopping any further processing for that +// plugin instance. +type FatalError struct { + Err error +} + +func (e *FatalError) Error() string { + return e.Err.Error() +} + +func (e *FatalError) Unwrap() error { + return e.Err +} diff --git a/models/running_output.go b/models/running_output.go index 36bd53abe..8010cc8da 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -1,11 +1,14 @@ package models import ( + "errors" + "fmt" "sync" "sync/atomic" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/selfstat" ) @@ -19,10 +22,11 @@ const ( // OutputConfig containing name and filter type OutputConfig struct { - Name string - Alias string - ID string - Filter Filter + Name string + Alias string + ID string + StartupErrorBehavior string + Filter Filter FlushInterval time.Duration FlushJitter time.Duration @@ -47,12 +51,16 @@ type RunningOutput struct { MetricsFiltered selfstat.Stat WriteTime selfstat.Stat + StartupErrors selfstat.Stat BatchReady chan time.Time buffer *Buffer log telegraf.Logger + started bool + retries uint64 + aggMutex sync.Mutex } @@ -104,6 +112,11 @@ func NewRunningOutput( "write_time_ns", tags, ), + StartupErrors: selfstat.Register( + "write", + "startup_errors", + tags, + ), log: logger, } @@ -119,7 +132,20 @@ func (r *RunningOutput) metricFiltered(metric telegraf.Metric) { metric.Drop() } +func (r *RunningOutput) ID() string { + if p, ok := r.Output.(telegraf.PluginWithID); ok { + return p.ID() + } + return r.Config.ID +} + func (r *RunningOutput) Init() error { + switch r.Config.StartupErrorBehavior { + case "", "error", "retry", "ignore": + default: + return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) + } + if p, ok := r.Output.(telegraf.Initializer); ok { err := p.Init() if err != nil { @@ -129,11 +155,41 @@ func (r *RunningOutput) Init() error { return nil } -func (r *RunningOutput) ID() string { - if p, ok := r.Output.(telegraf.PluginWithID); ok { - return p.ID() +func (r *RunningOutput) Connect() error { + // Try to connect and exit early on success + err := r.Output.Connect() + if err == nil { + r.started = true + return nil + } + r.StartupErrors.Incr(1) + + // Check if the plugin reports a retry-able error, otherwise we exit. + var serr *internal.StartupError + if !errors.As(err, &serr) || !serr.Retry { + return err + } + + // Handle the retry-able error depending on the configured behavior + switch r.Config.StartupErrorBehavior { + case "", "error": // fall-trough to return the actual error + case "retry": + r.log.Infof("Connect failed: %v; retrying...", err) + return nil + case "ignore": + return &internal.FatalError{Err: serr} + default: + r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) + } + + return err +} + +// Close closes the output +func (r *RunningOutput) Close() { + if err := r.Output.Close(); err != nil { + r.log.Errorf("Error closing output: %v", err) } - return r.Config.ID } // AddMetric adds a metric to the output. @@ -188,6 +244,22 @@ func (r *RunningOutput) AddMetric(metric telegraf.Metric) { // Write writes all metrics to the output, stopping when all have been sent on // or error. func (r *RunningOutput) Write() error { + // Try to connect if we are not yet started up + if !r.started { + r.retries++ + if err := r.Output.Connect(); err != nil { + var serr *internal.StartupError + if !errors.As(err, &serr) || !serr.Retry || !serr.Partial { + r.StartupErrors.Incr(1) + return internal.ErrNotConnected + } + r.log.Debugf("Partially connected after %d attempts", r.retries) + } else { + r.started = true + r.log.Debugf("Successfully connected after %d attempts", r.retries) + } + } + if output, ok := r.Output.(telegraf.AggregatingOutput); ok { r.aggMutex.Lock() metrics := output.Push() @@ -220,6 +292,17 @@ func (r *RunningOutput) Write() error { // WriteBatch writes a single batch of metrics to the output. func (r *RunningOutput) WriteBatch() error { + // Try to connect if we are not yet started up + if !r.started { + r.retries++ + if err := r.Output.Connect(); err != nil { + r.StartupErrors.Incr(1) + return internal.ErrNotConnected + } + r.started = true + r.log.Debugf("Successfully connected after %d attempts", r.retries) + } + batch := r.buffer.Batch(r.MetricBatchSize) if len(batch) == 0 { return nil @@ -235,14 +318,6 @@ func (r *RunningOutput) WriteBatch() error { return nil } -// Close closes the output -func (r *RunningOutput) Close() { - err := r.Output.Close() - if err != nil { - r.log.Errorf("Error closing output: %v", err) - } -} - func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error { dropped := atomic.LoadInt64(&r.droppedMetrics) if dropped > 0 { diff --git a/models/running_output_test.go b/models/running_output_test.go index 013ecbe60..db498a0c8 100644 --- a/models/running_output_test.go +++ b/models/running_output_test.go @@ -2,6 +2,7 @@ package models import ( "errors" + "fmt" "sync" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/testutil" ) @@ -487,6 +489,7 @@ func TestInternalMetrics(t *testing.T) { "metrics_filtered": 0, "metrics_written": 0, "write_time_ns": 0, + "startup_errors": 0, }, time.Unix(0, 0), ), @@ -503,6 +506,243 @@ func TestInternalMetrics(t *testing.T) { testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) } +func TestStartupBehaviorInvalid(t *testing.T) { + ro := NewRunningOutput( + &mockOutput{}, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: "foo", + }, + 5, 10, + ) + require.ErrorContains(t, ro.Init(), "invalid 'startup_error_behavior'") +} + +func TestRetryableStartupBehaviorDefault(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("retryable err"), + Retry: true, + } + ro := NewRunningOutput( + &mockOutput{ + startupErrorCount: 1, + startupError: serr, + }, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // If Connect() fails, the agent will stop + require.ErrorIs(t, ro.Connect(), serr) + require.False(t, ro.started) +} + +func TestRetryableStartupBehaviorError(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("retryable err"), + Retry: true, + } + ro := NewRunningOutput( + &mockOutput{ + startupErrorCount: 1, + startupError: serr, + }, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: "error", + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // If Connect() fails, the agent will stop + require.ErrorIs(t, ro.Connect(), serr) + require.False(t, ro.started) +} + +func TestRetryableStartupBehaviorRetry(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("retryable err"), + Retry: true, + } + mo := &mockOutput{ + startupErrorCount: 2, + startupError: serr, + } + ro := NewRunningOutput( + mo, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: "retry", + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // For retry, Connect() should succeed even though there is an error but + // should return an error on Write() until we successfully connect. + require.NoError(t, ro.Connect(), serr) + require.False(t, ro.started) + + ro.AddMetric(testutil.TestMetric(1)) + require.ErrorIs(t, ro.Write(), internal.ErrNotConnected) + require.False(t, ro.started) + + ro.AddMetric(testutil.TestMetric(2)) + require.NoError(t, ro.Write()) + require.True(t, ro.started) + require.Equal(t, 1, mo.writes) + + ro.AddMetric(testutil.TestMetric(3)) + require.NoError(t, ro.Write()) + require.True(t, ro.started) + require.Equal(t, 2, mo.writes) +} + +func TestRetryableStartupBehaviorIgnore(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("retryable err"), + Retry: true, + } + mo := &mockOutput{ + startupErrorCount: 2, + startupError: serr, + } + ro := NewRunningOutput( + mo, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: "ignore", + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // For ignore, Connect() should return a fatal error if connection fails. + // This will force the agent to remove the plugin. + var fatalErr *internal.FatalError + require.ErrorAs(t, ro.Connect(), &fatalErr) + require.ErrorIs(t, fatalErr, serr) + require.False(t, ro.started) +} + +func TestNonRetryableStartupBehaviorDefault(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("non-retryable err"), + Retry: false, + } + + for _, behavior := range []string{"", "error", "retry", "ignore"} { + t.Run(behavior, func(t *testing.T) { + mo := &mockOutput{ + startupErrorCount: 2, + startupError: serr, + } + ro := NewRunningOutput( + mo, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: behavior, + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // Non-retryable error should pass through and in turn the agent + // will stop and exit. + require.ErrorIs(t, ro.Connect(), serr) + require.False(t, ro.started) + }) + } +} + +func TestUntypedtartupBehaviorIgnore(t *testing.T) { + serr := errors.New("untyped err") + + for _, behavior := range []string{"", "error", "retry", "ignore"} { + t.Run(behavior, func(t *testing.T) { + mo := &mockOutput{ + startupErrorCount: 2, + startupError: serr, + } + ro := NewRunningOutput( + mo, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: behavior, + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // Untyped error should pass through and in turn the agent will + // stop and exit. + require.ErrorIs(t, ro.Connect(), serr) + require.False(t, ro.started) + }) + } +} + +func TestPartiallyStarted(t *testing.T) { + serr := &internal.StartupError{ + Err: errors.New("partial err"), + Retry: true, + Partial: true, + } + mo := &mockOutput{ + startupErrorCount: 2, + startupError: serr, + } + ro := NewRunningOutput( + mo, + &OutputConfig{ + Filter: Filter{}, + Name: "test_name", + Alias: "test_alias", + StartupErrorBehavior: "retry", + }, + 5, 10, + ) + require.NoError(t, ro.Init()) + + // For retry, Connect() should succeed even though there is an error but + // should return an error on Write() until we successfully connect. + require.NoError(t, ro.Connect(), serr) + require.False(t, ro.started) + + ro.AddMetric(testutil.TestMetric(1)) + require.NoError(t, ro.Write()) + require.False(t, ro.started) + require.Equal(t, 1, mo.writes) + + ro.AddMetric(testutil.TestMetric(2)) + require.NoError(t, ro.Write()) + require.True(t, ro.started) + require.Equal(t, 2, mo.writes) + + ro.AddMetric(testutil.TestMetric(3)) + require.NoError(t, ro.Write()) + require.True(t, ro.started) + require.Equal(t, 3, mo.writes) +} + type mockOutput struct { sync.Mutex @@ -510,10 +750,20 @@ type mockOutput struct { // if true, mock write failure failWrite bool + + startupError error + startupErrorCount int + writes int } func (m *mockOutput) Connect() error { - return nil + if m.startupErrorCount == 0 { + return nil + } + if m.startupErrorCount > 0 { + m.startupErrorCount-- + } + return m.startupError } func (m *mockOutput) Close() error { @@ -529,6 +779,9 @@ func (m *mockOutput) SampleConfig() string { } func (m *mockOutput) Write(metrics []telegraf.Metric) error { + fmt.Println("writing") + m.writes++ + m.Lock() defer m.Unlock() if m.failWrite { diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 184ed3b80..13440f7ca 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -12,6 +12,7 @@ import ( "github.com/gofrs/uuid/v5" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/common/proxy" "github.com/influxdata/telegraf/plugins/outputs" @@ -157,13 +158,16 @@ func (k *Kafka) Init() error { func (k *Kafka) Connect() error { producer, err := k.producerFunc(k.Brokers, k.saramaConfig) if err != nil { - return err + return &internal.StartupError{Err: err, Retry: true} } k.producer = producer return nil } func (k *Kafka) Close() error { + if k.producer == nil { + return nil + } return k.producer.Close() }