feat(inputs.execd): Add option to not restart program on error (#15271)

This commit is contained in:
Sven Rebhan 2024-05-23 14:42:34 -04:00 committed by GitHub
parent 5e830fb29a
commit 12ab6dfb33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 118 additions and 27 deletions

View File

@ -23,6 +23,7 @@ type Process struct {
ReadStdoutFn func(io.Reader)
ReadStderrFn func(io.Reader)
RestartDelay time.Duration
StopOnError bool
Log telegraf.Logger
name string
@ -31,6 +32,8 @@ type Process struct {
pid int32
cancel context.CancelFunc
mainLoopWg sync.WaitGroup
sync.Mutex
}
// New creates a new process wrapper
@ -65,10 +68,10 @@ func (p *Process) Start() error {
p.mainLoopWg.Add(1)
go func() {
defer p.mainLoopWg.Done()
if err := p.cmdLoop(ctx); err != nil {
p.Log.Errorf("Process quit with message: %v", err)
}
p.mainLoopWg.Done()
}()
return nil
@ -81,12 +84,24 @@ func (p *Process) Stop() {
p.cancel()
}
// close stdin so the app can shut down gracefully.
if err := p.Stdin.Close(); err != nil {
if err := p.Stdin.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
p.Log.Errorf("Stdin closed with message: %v", err)
}
p.mainLoopWg.Wait()
}
func (p *Process) Pid() int {
pid := atomic.LoadInt32(&p.pid)
return int(pid)
}
func (p *Process) State() (state *os.ProcessState, running bool) {
p.Lock()
defer p.Unlock()
return p.Cmd.ProcessState, p.Cmd.ProcessState.ExitCode() == -1
}
func (p *Process) cmdStart() error {
p.Cmd = exec.Command(p.name, p.args...)
@ -119,15 +134,13 @@ func (p *Process) cmdStart() error {
return nil
}
func (p *Process) Pid() int {
pid := atomic.LoadInt32(&p.pid)
return int(pid)
}
// cmdLoop watches an already running process, restarting it when appropriate.
func (p *Process) cmdLoop(ctx context.Context) error {
for {
err := p.cmdWait(ctx)
if err != nil && p.StopOnError {
return err
}
if isQuitting(ctx) {
p.Log.Infof("Process %s shut down", p.Cmd.Path)
return nil
@ -184,7 +197,9 @@ func (p *Process) cmdWait(ctx context.Context) error {
wg.Done()
}()
p.Lock()
err := p.Cmd.Wait()
p.Unlock()
processCancel()
wg.Wait()
return err

View File

@ -59,20 +59,24 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## "SIGHUP" : Send a HUP signal. Not available on Windows. (not recommended)
## "SIGUSR1" : Send a USR1 signal. Not available on Windows.
## "SIGUSR2" : Send a USR2 signal. Not available on Windows.
signal = "none"
# signal = "none"
## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"
# restart_delay = "10s"
## Buffer size used to read from the command output stream
## Optional parameter. Default is 64 Kib, minimum is 16 bytes
# buffer_size = "64Kib"
## Disable automatic restart of the program and stop if the program exits
## with an error (i.e. non-zero error code)
# stop_on_error = false
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
# data_format = "influx"
```
## Example

View File

@ -25,10 +25,11 @@ var sampleConfig string
type Execd struct {
Command []string `toml:"command"`
Environment []string `toml:"environment"`
BufferSize config.Size `toml:"buffer_size"`
Signal string `toml:"signal"`
RestartDelay config.Duration `toml:"restart_delay"`
StopOnError bool `toml:"stop_on_error"`
Log telegraf.Logger `toml:"-"`
BufferSize config.Size `toml:"buffer_size"`
process *process.Process
acc telegraf.Accumulator
@ -59,10 +60,11 @@ func (e *Execd) Start(acc telegraf.Accumulator) error {
if err != nil {
return fmt.Errorf("error creating new process: %w", err)
}
e.process.Log = e.Log
e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.ReadStdoutFn = e.outputReader
e.process.ReadStderrFn = e.cmdReadErr
e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.StopOnError = e.StopOnError
e.process.Log = e.Log
if err = e.process.Start(); err != nil {
// if there was only one argument, and it contained spaces, warn the user

View File

@ -51,7 +51,7 @@ func TestExternalInputWorks(t *testing.T) {
require.NoError(t, err)
e := &Execd{
Command: []string{exe, "-counter"},
Command: []string{exe, "-mode", "counter"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application", "METRIC_NAME=counter"},
RestartDelay: config.Duration(5 * time.Second),
Signal: "STDIN",
@ -159,6 +159,62 @@ test{handler="execd",quantile="0.5"} 42.0
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
}
func TestStopOnError(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)
plugin := &Execd{
Command: []string{exe, "-mode", "fail"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application"},
StopOnError: true,
RestartDelay: config.Duration(5 * time.Second),
Log: testutil.Logger{},
}
parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
require.NoError(t, parser.Init())
plugin.SetParser(parser)
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
require.Eventually(t, func() bool {
_, running := plugin.process.State()
return !running
}, 3*time.Second, 100*time.Millisecond)
state, running := plugin.process.State()
require.False(t, running)
require.Equal(t, 42, state.ExitCode())
}
func TestStopOnErrorSuccess(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)
plugin := &Execd{
Command: []string{exe, "-mode", "success"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application"},
StopOnError: true,
RestartDelay: config.Duration(100 * time.Millisecond),
Log: testutil.Logger{},
}
parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
require.NoError(t, parser.Init())
plugin.SetParser(parser)
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
// Wait for at least two metric as this indicates the process was restarted
require.Eventually(t, func() bool {
return acc.NMetrics() > 1
}, 3*time.Second, 100*time.Millisecond)
}
func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric {
to := time.NewTimer(timeout)
defer to.Stop()
@ -189,20 +245,32 @@ func (tm *TestMetricMaker) Log() telegraf.Logger {
return logger.NewLogger("TestPlugin", "test", "")
}
var counter = flag.Bool("counter", false,
"if true, act like line input program instead of test")
func TestMain(m *testing.M) {
var mode string
flag.StringVar(&mode, "mode", "counter", "determines the output when run as mockup program")
flag.Parse()
runMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE")
if *counter && runMode == "application" {
operationMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE")
if operationMode != "application" {
// Run the normal test mode
os.Exit(m.Run())
}
// Run as a mock program
switch mode {
case "counter":
if err := runCounterProgram(); err != nil {
os.Exit(1)
}
os.Exit(0)
case "fail":
os.Exit(42)
case "success":
fmt.Println("test value=42i")
os.Exit(0)
}
code := m.Run()
os.Exit(code)
os.Exit(23)
}
func runCounterProgram() error {
@ -217,9 +285,7 @@ func runCounterProgram() error {
for scanner.Scan() {
m := metric.New(envMetricName,
map[string]string{},
map[string]interface{}{
"count": i,
},
map[string]interface{}{"count": i},
time.Now(),
)
i++

View File

@ -18,17 +18,21 @@
## "SIGHUP" : Send a HUP signal. Not available on Windows. (not recommended)
## "SIGUSR1" : Send a USR1 signal. Not available on Windows.
## "SIGUSR2" : Send a USR2 signal. Not available on Windows.
signal = "none"
# signal = "none"
## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"
# restart_delay = "10s"
## Buffer size used to read from the command output stream
## Optional parameter. Default is 64 Kib, minimum is 16 bytes
# buffer_size = "64Kib"
## Disable automatic restart of the program and stop if the program exits
## with an error (i.e. non-zero error code)
# stop_on_error = false
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
# data_format = "influx"