feat(agent): Add support for input probing (#16333)

This commit is contained in:
Landon Clipp 2025-01-21 09:21:25 -06:00 committed by GitHub
parent e57f48f608
commit 01aa1a39cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 84 additions and 2 deletions

View File

@ -375,6 +375,12 @@ func (*Agent) startInputs(dst chan<- telegraf.Metric, inputs []*models.RunningIn
return nil, fmt.Errorf("starting input %s: %w", input.LogName(), err)
}
if err := input.Probe(); err != nil {
// Probe failures are non-fatal to the agent but should only remove the plugin
log.Printf("I! [agent] Failed to probe %s, shutting down plugin: %s", input.LogName(), err)
input.Stop()
continue
}
unit.inputs = append(unit.inputs, input)
}

View File

@ -112,7 +112,7 @@ func (r *RunningInput) LogName() string {
func (r *RunningInput) Init() error {
switch r.Config.StartupErrorBehavior {
case "", "error", "retry", "ignore":
case "", "error", "retry", "ignore", "probe":
default:
return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}
@ -161,7 +161,7 @@ func (r *RunningInput) Start(acc telegraf.Accumulator) error {
}
r.log.Infof("Startup failed: %v; retrying...", err)
return nil
case "ignore":
case "ignore", "probe":
return &internal.FatalError{Err: serr}
default:
r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
@ -170,6 +170,14 @@ func (r *RunningInput) Start(acc telegraf.Accumulator) error {
return err
}
func (r *RunningInput) Probe() error {
p, ok := r.Input.(telegraf.ProbePlugin)
if !ok || r.Config.StartupErrorBehavior != "probe" {
return nil
}
return p.Probe()
}
func (r *RunningInput) Stop() {
if plugin, ok := r.Input.(telegraf.ServiceInput); ok {
plugin.Stop()

View File

@ -1,6 +1,7 @@
package models
import (
"errors"
"testing"
"time"
@ -496,3 +497,64 @@ func (*mockInput) SampleConfig() string {
func (*mockInput) Gather(telegraf.Accumulator) error {
return nil
}
func TestRunningInputProbingFailure(t *testing.T) {
ri := NewRunningInput(&mockProbingInput{
probeReturn: errors.New("probing error"),
}, &InputConfig{
Name: "TestRunningInput",
StartupErrorBehavior: "probe",
})
ri.log = testutil.Logger{}
require.Error(t, ri.Probe())
}
func TestRunningInputProbingSuccess(t *testing.T) {
probeErr := errors.New("probing error")
for _, tt := range []struct {
name string
input telegraf.Input
startupErrorBehavior string
}{
{
name: "non-probing plugin with probe value set",
input: &mockInput{},
startupErrorBehavior: "probe",
},
{
name: "non-probing plugin with probe value not set",
input: &mockInput{},
startupErrorBehavior: "ignore",
},
{
name: "probing plugin with probe value not set",
input: &mockProbingInput{probeErr},
startupErrorBehavior: "ignore",
},
} {
t.Run(tt.name, func(t *testing.T) {
ri := NewRunningInput(tt.input, &InputConfig{
Name: "TestRunningInput",
StartupErrorBehavior: tt.startupErrorBehavior,
})
ri.log = testutil.Logger{}
require.NoError(t, ri.Probe())
})
}
}
type mockProbingInput struct {
probeReturn error
}
func (m *mockProbingInput) SampleConfig() string {
return ""
}
func (m *mockProbingInput) Gather(_ telegraf.Accumulator) error {
return nil
}
func (m *mockProbingInput) Probe() error {
return m.probeReturn
}

View File

@ -56,3 +56,9 @@ type StatefulPlugin interface {
// initialization (after Init() function).
SetState(state interface{}) error
}
// ProbePlugin is an interface that all input/output plugins need to
// implement in order to support the `probe` value of `startup_error_behavior`
type ProbePlugin interface {
Probe() error
}