diff --git a/agent/agent.go b/agent/agent.go index ab390a5ac..0a67d5501 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -375,6 +375,12 @@ func (*Agent) startInputs(dst chan<- telegraf.Metric, inputs []*models.RunningIn return nil, fmt.Errorf("starting input %s: %w", input.LogName(), err) } + if err := input.Probe(); err != nil { + // Probe failures are non-fatal to the agent but should only remove the plugin + log.Printf("I! [agent] Failed to probe %s, shutting down plugin: %s", input.LogName(), err) + input.Stop() + continue + } unit.inputs = append(unit.inputs, input) } diff --git a/models/running_input.go b/models/running_input.go index 89f42f28c..62ab73e70 100644 --- a/models/running_input.go +++ b/models/running_input.go @@ -112,7 +112,7 @@ func (r *RunningInput) LogName() string { func (r *RunningInput) Init() error { switch r.Config.StartupErrorBehavior { - case "", "error", "retry", "ignore": + case "", "error", "retry", "ignore", "probe": default: return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) } @@ -161,7 +161,7 @@ func (r *RunningInput) Start(acc telegraf.Accumulator) error { } r.log.Infof("Startup failed: %v; retrying...", err) return nil - case "ignore": + case "ignore", "probe": return &internal.FatalError{Err: serr} default: r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) @@ -170,6 +170,14 @@ func (r *RunningInput) Start(acc telegraf.Accumulator) error { return err } +func (r *RunningInput) Probe() error { + p, ok := r.Input.(telegraf.ProbePlugin) + if !ok || r.Config.StartupErrorBehavior != "probe" { + return nil + } + return p.Probe() +} + func (r *RunningInput) Stop() { if plugin, ok := r.Input.(telegraf.ServiceInput); ok { plugin.Stop() diff --git a/models/running_input_test.go b/models/running_input_test.go index 8c60987d7..0e306634d 100644 --- a/models/running_input_test.go +++ b/models/running_input_test.go @@ -1,6 +1,7 @@ package models import ( + "errors" "testing" "time" @@ -496,3 +497,64 @@ func (*mockInput) SampleConfig() string { func (*mockInput) Gather(telegraf.Accumulator) error { return nil } + +func TestRunningInputProbingFailure(t *testing.T) { + ri := NewRunningInput(&mockProbingInput{ + probeReturn: errors.New("probing error"), + }, &InputConfig{ + Name: "TestRunningInput", + StartupErrorBehavior: "probe", + }) + ri.log = testutil.Logger{} + require.Error(t, ri.Probe()) +} + +func TestRunningInputProbingSuccess(t *testing.T) { + probeErr := errors.New("probing error") + for _, tt := range []struct { + name string + input telegraf.Input + startupErrorBehavior string + }{ + { + name: "non-probing plugin with probe value set", + input: &mockInput{}, + startupErrorBehavior: "probe", + }, + { + name: "non-probing plugin with probe value not set", + input: &mockInput{}, + startupErrorBehavior: "ignore", + }, + { + name: "probing plugin with probe value not set", + input: &mockProbingInput{probeErr}, + startupErrorBehavior: "ignore", + }, + } { + t.Run(tt.name, func(t *testing.T) { + ri := NewRunningInput(tt.input, &InputConfig{ + Name: "TestRunningInput", + StartupErrorBehavior: tt.startupErrorBehavior, + }) + ri.log = testutil.Logger{} + require.NoError(t, ri.Probe()) + }) + } +} + +type mockProbingInput struct { + probeReturn error +} + +func (m *mockProbingInput) SampleConfig() string { + return "" +} + +func (m *mockProbingInput) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (m *mockProbingInput) Probe() error { + return m.probeReturn +} diff --git a/plugin.go b/plugin.go index 171b446e6..197b59d2f 100644 --- a/plugin.go +++ b/plugin.go @@ -56,3 +56,9 @@ type StatefulPlugin interface { // initialization (after Init() function). SetState(state interface{}) error } + +// ProbePlugin is an interface that all input/output plugins need to +// implement in order to support the `probe` value of `startup_error_behavior` +type ProbePlugin interface { + Probe() error +}