feat(outputs): Add framework to retry on startup errors (#14884)

This commit is contained in:
Sven Rebhan 2024-03-26 18:12:30 +01:00 committed by GitHub
parent 4344972d1a
commit aa030b569a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 405 additions and 30 deletions

View File

@ -793,10 +793,17 @@ func (a *Agent) startOutputs(
src := make(chan telegraf.Metric, 100)
unit := &outputUnit{src: src}
for _, output := range outputs {
err := a.connectOutput(ctx, output)
if err != nil {
for _, output := range unit.outputs {
if err := a.connectOutput(ctx, output); err != nil {
var fatalErr *internal.FatalError
if errors.As(err, &fatalErr) {
// If the model tells us to remove the plugin we do so without error
log.Printf("I! [agent] Failed to connect to [%s], error was %q; shutting down plugin...", output.LogName(), err)
output.Close()
continue
}
for _, unitOutput := range unit.outputs {
unitOutput.Close()
}
return nil, nil, fmt.Errorf("connecting output %s: %w", output.LogName(), err)
}
@ -810,18 +817,14 @@ func (a *Agent) startOutputs(
// connectOutputs connects to all outputs.
func (a *Agent) connectOutput(ctx context.Context, output *models.RunningOutput) error {
log.Printf("D! [agent] Attempting connection to [%s]", output.LogName())
err := output.Output.Connect()
if err != nil {
log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+
"error was %q", output.LogName(), err)
if err := output.Connect(); err != nil {
log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, error was %q", output.LogName(), err)
err := internal.SleepContext(ctx, 15*time.Second)
if err != nil {
if err := internal.SleepContext(ctx, 15*time.Second); err != nil {
return err
}
err = output.Output.Connect()
if err != nil {
if err = output.Connect(); err != nil {
return fmt.Errorf("error connecting to output %q: %w", output.LogName(), err)
}
}

View File

@ -1486,6 +1486,7 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
c.getFieldString(tbl, "name_override", &oc.NameOverride)
c.getFieldString(tbl, "name_suffix", &oc.NameSuffix)
c.getFieldString(tbl, "name_prefix", &oc.NamePrefix)
c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior)
if c.hasErrs() {
return nil, c.firstErr()
@ -1510,7 +1511,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"name_override", "name_prefix", "name_suffix", "namedrop", "namedrop_separator", "namepass", "namepass_separator",
"order",
"pass", "period", "precision",
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags":
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "startup_error_behavior":
// Secret-store options to ignore
case "id":

39
internal/errors.go Normal file
View File

@ -0,0 +1,39 @@
package internal
import "errors"
var ErrNotConnected = errors.New("not connected")
// StartupError indicates an error that occurred during startup of a plugin
// e.g. due to connectivity issues or resources being not yet available.
// In case the 'Retry' flag is set, the startup of the plugin might be retried
// depending on the configured startup-error-behavior. The 'RemovePlugin'
// flag denotes if the agent should remove the plugin from further processing.
type StartupError struct {
Err error
Retry bool
Partial bool
}
func (e *StartupError) Error() string {
return e.Err.Error()
}
func (e *StartupError) Unwrap() error {
return e.Err
}
// FatalError indicates a not-recoverable error in the plugin. The corresponding
// plugin should be remove by the agent stopping any further processing for that
// plugin instance.
type FatalError struct {
Err error
}
func (e *FatalError) Error() string {
return e.Err.Error()
}
func (e *FatalError) Unwrap() error {
return e.Err
}

View File

@ -1,11 +1,14 @@
package models
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/selfstat"
)
@ -19,10 +22,11 @@ const (
// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Alias string
ID string
Filter Filter
Name string
Alias string
ID string
StartupErrorBehavior string
Filter Filter
FlushInterval time.Duration
FlushJitter time.Duration
@ -47,12 +51,16 @@ type RunningOutput struct {
MetricsFiltered selfstat.Stat
WriteTime selfstat.Stat
StartupErrors selfstat.Stat
BatchReady chan time.Time
buffer *Buffer
log telegraf.Logger
started bool
retries uint64
aggMutex sync.Mutex
}
@ -104,6 +112,11 @@ func NewRunningOutput(
"write_time_ns",
tags,
),
StartupErrors: selfstat.Register(
"write",
"startup_errors",
tags,
),
log: logger,
}
@ -119,7 +132,20 @@ func (r *RunningOutput) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}
func (r *RunningOutput) ID() string {
if p, ok := r.Output.(telegraf.PluginWithID); ok {
return p.ID()
}
return r.Config.ID
}
func (r *RunningOutput) Init() error {
switch r.Config.StartupErrorBehavior {
case "", "error", "retry", "ignore":
default:
return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}
if p, ok := r.Output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
@ -129,11 +155,41 @@ func (r *RunningOutput) Init() error {
return nil
}
func (r *RunningOutput) ID() string {
if p, ok := r.Output.(telegraf.PluginWithID); ok {
return p.ID()
func (r *RunningOutput) Connect() error {
// Try to connect and exit early on success
err := r.Output.Connect()
if err == nil {
r.started = true
return nil
}
r.StartupErrors.Incr(1)
// Check if the plugin reports a retry-able error, otherwise we exit.
var serr *internal.StartupError
if !errors.As(err, &serr) || !serr.Retry {
return err
}
// Handle the retry-able error depending on the configured behavior
switch r.Config.StartupErrorBehavior {
case "", "error": // fall-trough to return the actual error
case "retry":
r.log.Infof("Connect failed: %v; retrying...", err)
return nil
case "ignore":
return &internal.FatalError{Err: serr}
default:
r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}
return err
}
// Close closes the output
func (r *RunningOutput) Close() {
if err := r.Output.Close(); err != nil {
r.log.Errorf("Error closing output: %v", err)
}
return r.Config.ID
}
// AddMetric adds a metric to the output.
@ -188,6 +244,22 @@ func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
// Write writes all metrics to the output, stopping when all have been sent on
// or error.
func (r *RunningOutput) Write() error {
// Try to connect if we are not yet started up
if !r.started {
r.retries++
if err := r.Output.Connect(); err != nil {
var serr *internal.StartupError
if !errors.As(err, &serr) || !serr.Retry || !serr.Partial {
r.StartupErrors.Incr(1)
return internal.ErrNotConnected
}
r.log.Debugf("Partially connected after %d attempts", r.retries)
} else {
r.started = true
r.log.Debugf("Successfully connected after %d attempts", r.retries)
}
}
if output, ok := r.Output.(telegraf.AggregatingOutput); ok {
r.aggMutex.Lock()
metrics := output.Push()
@ -220,6 +292,17 @@ func (r *RunningOutput) Write() error {
// WriteBatch writes a single batch of metrics to the output.
func (r *RunningOutput) WriteBatch() error {
// Try to connect if we are not yet started up
if !r.started {
r.retries++
if err := r.Output.Connect(); err != nil {
r.StartupErrors.Incr(1)
return internal.ErrNotConnected
}
r.started = true
r.log.Debugf("Successfully connected after %d attempts", r.retries)
}
batch := r.buffer.Batch(r.MetricBatchSize)
if len(batch) == 0 {
return nil
@ -235,14 +318,6 @@ func (r *RunningOutput) WriteBatch() error {
return nil
}
// Close closes the output
func (r *RunningOutput) Close() {
err := r.Output.Close()
if err != nil {
r.log.Errorf("Error closing output: %v", err)
}
}
func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error {
dropped := atomic.LoadInt64(&r.droppedMetrics)
if dropped > 0 {

View File

@ -2,6 +2,7 @@ package models
import (
"errors"
"fmt"
"sync"
"testing"
"time"
@ -9,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/selfstat"
"github.com/influxdata/telegraf/testutil"
)
@ -487,6 +489,7 @@ func TestInternalMetrics(t *testing.T) {
"metrics_filtered": 0,
"metrics_written": 0,
"write_time_ns": 0,
"startup_errors": 0,
},
time.Unix(0, 0),
),
@ -503,6 +506,243 @@ func TestInternalMetrics(t *testing.T) {
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
}
func TestStartupBehaviorInvalid(t *testing.T) {
ro := NewRunningOutput(
&mockOutput{},
&OutputConfig{
Filter: Filter{},
Name: "test_name",
Alias: "test_alias",
StartupErrorBehavior: "foo",
},
5, 10,
)
require.ErrorContains(t, ro.Init(), "invalid 'startup_error_behavior'")
}
func TestRetryableStartupBehaviorDefault(t *testing.T) {
serr := &internal.StartupError{
Err: errors.New("retryable err"),
Retry: true,
}
ro := NewRunningOutput(
&mockOutput{
startupErrorCount: 1,
startupError: serr,
},
&OutputConfig{
Filter: Filter{},
Name: "test_name",
Alias: "test_alias",
},
5, 10,
)
require.NoError(t, ro.Init())
// If Connect() fails, the agent will stop
require.ErrorIs(t, ro.Connect(), serr)
require.False(t, ro.started)
}
func TestRetryableStartupBehaviorError(t *testing.T) {
serr := &internal.StartupError{
Err: errors.New("retryable err"),
Retry: true,
}
ro := NewRunningOutput(
&mockOutput{
startupErrorCount: 1,
startupError: serr,
},
&OutputConfig{
Filter: Filter{},
Name: "test_name",
Alias: "test_alias",
StartupErrorBehavior: "error",
},
5, 10,
)
require.NoError(t, ro.Init())
// If Connect() fails, the agent will stop
require.ErrorIs(t, ro.Connect(), serr)
require.False(t, ro.started)
}
func TestRetryableStartupBehaviorRetry(t *testing.T) {
serr := &internal.StartupError{
Err: errors.New("retryable err"),
Retry: true,
}
mo := &mockOutput{
startupErrorCount: 2,
startupError: serr,
}
ro := NewRunningOutput(
mo,
&OutputConfig{
Filter: Filter{},
Name: "test_name",
Alias: "test_alias",
StartupErrorBehavior: "retry",
},
5, 10,
)
require.NoError(t, ro.Init())
// For retry, Connect() should succeed even though there is an error but
// should return an error on Write() until we successfully connect.
require.NoError(t, ro.Connect(), serr)
require.False(t, ro.started)
ro.AddMetric(testutil.TestMetric(1))
require.ErrorIs(t, ro.Write(), internal.ErrNotConnected)
require.False(t, ro.started)
ro.AddMetric(testutil.TestMetric(2))
require.NoError(t, ro.Write())
require.True(t, ro.started)
require.Equal(t, 1, mo.writes)
ro.AddMetric(testutil.TestMetric(3))
require.NoError(t, ro.Write())
require.True(t, ro.started)
require.Equal(t, 2, mo.writes)
}
func TestRetryableStartupBehaviorIgnore(t *testing.T) {
serr := &internal.StartupError{
Err: errors.New("retryable err"),
Retry: true,
}
mo := &mockOutput{
startupErrorCount: 2,
startupError: serr,
}
ro := NewRunningOutput(
mo,
&OutputConfig{
Filter: Filter{},
Name: "test_name",
Alias: "test_alias",
StartupErrorBehavior: "ignore",
},
5, 10,
)
require.NoError(t, ro.Init())
// For ignore, Connect() should return a fatal error if connection fails.
// This will force the agent to remove the plugin.
var fatalErr *internal.FatalError
require.ErrorAs(t, ro.Connect(), &fatalErr)
require.ErrorIs(t, fatalErr, serr)
require.False(t, ro.started)
}
func TestNonRetryableStartupBehaviorDefault(t *testing.T) {
serr := &internal.StartupError{
Err: errors.New("non-retryable err"),
Retry: false,
}
for _, behavior := range []string{"", "error", "retry", "ignore"} {
t.Run(behavior, func(t *testing.T) {
mo := &mockOutput{
startupErrorCount: 2,
startupError: serr,
}
ro := NewRunningOutput(
mo,
&OutputConfig{
Filter: Filter{},
Name: "test_name",
Alias: "test_alias",
StartupErrorBehavior: behavior,
},
5, 10,
)
require.NoError(t, ro.Init())
// Non-retryable error should pass through and in turn the agent
// will stop and exit.
require.ErrorIs(t, ro.Connect(), serr)
require.False(t, ro.started)
})
}
}
func TestUntypedtartupBehaviorIgnore(t *testing.T) {
serr := errors.New("untyped err")
for _, behavior := range []string{"", "error", "retry", "ignore"} {
t.Run(behavior, func(t *testing.T) {
mo := &mockOutput{
startupErrorCount: 2,
startupError: serr,
}
ro := NewRunningOutput(
mo,
&OutputConfig{
Filter: Filter{},
Name: "test_name",
Alias: "test_alias",
StartupErrorBehavior: behavior,
},
5, 10,
)
require.NoError(t, ro.Init())
// Untyped error should pass through and in turn the agent will
// stop and exit.
require.ErrorIs(t, ro.Connect(), serr)
require.False(t, ro.started)
})
}
}
func TestPartiallyStarted(t *testing.T) {
serr := &internal.StartupError{
Err: errors.New("partial err"),
Retry: true,
Partial: true,
}
mo := &mockOutput{
startupErrorCount: 2,
startupError: serr,
}
ro := NewRunningOutput(
mo,
&OutputConfig{
Filter: Filter{},
Name: "test_name",
Alias: "test_alias",
StartupErrorBehavior: "retry",
},
5, 10,
)
require.NoError(t, ro.Init())
// For retry, Connect() should succeed even though there is an error but
// should return an error on Write() until we successfully connect.
require.NoError(t, ro.Connect(), serr)
require.False(t, ro.started)
ro.AddMetric(testutil.TestMetric(1))
require.NoError(t, ro.Write())
require.False(t, ro.started)
require.Equal(t, 1, mo.writes)
ro.AddMetric(testutil.TestMetric(2))
require.NoError(t, ro.Write())
require.True(t, ro.started)
require.Equal(t, 2, mo.writes)
ro.AddMetric(testutil.TestMetric(3))
require.NoError(t, ro.Write())
require.True(t, ro.started)
require.Equal(t, 3, mo.writes)
}
type mockOutput struct {
sync.Mutex
@ -510,10 +750,20 @@ type mockOutput struct {
// if true, mock write failure
failWrite bool
startupError error
startupErrorCount int
writes int
}
func (m *mockOutput) Connect() error {
return nil
if m.startupErrorCount == 0 {
return nil
}
if m.startupErrorCount > 0 {
m.startupErrorCount--
}
return m.startupError
}
func (m *mockOutput) Close() error {
@ -529,6 +779,9 @@ func (m *mockOutput) SampleConfig() string {
}
func (m *mockOutput) Write(metrics []telegraf.Metric) error {
fmt.Println("writing")
m.writes++
m.Lock()
defer m.Unlock()
if m.failWrite {

View File

@ -12,6 +12,7 @@ import (
"github.com/gofrs/uuid/v5"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/proxy"
"github.com/influxdata/telegraf/plugins/outputs"
@ -157,13 +158,16 @@ func (k *Kafka) Init() error {
func (k *Kafka) Connect() error {
producer, err := k.producerFunc(k.Brokers, k.saramaConfig)
if err != nil {
return err
return &internal.StartupError{Err: err, Retry: true}
}
k.producer = producer
return nil
}
func (k *Kafka) Close() error {
if k.producer == nil {
return nil
}
return k.producer.Close()
}