diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index 66dc7eea0..ca87944c9 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -4,11 +4,8 @@ package exec import ( "bytes" _ "embed" - "errors" "fmt" - "io" "path/filepath" - "runtime" "strings" "sync" "time" @@ -48,18 +45,31 @@ type Exec struct { type exitCodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric type runner interface { - run(string, []string, time.Duration) ([]byte, []byte, error) + run(string) ([]byte, []byte, error) } type commandRunner struct { - debug bool + environment []string + timeout time.Duration + debug bool } func (*Exec) SampleConfig() string { return sampleConfig } -func (*Exec) Init() error { +func (e *Exec) Init() error { + // Legacy single command support + if e.Command != "" { + e.Commands = append(e.Commands, e.Command) + } + + e.runner = &commandRunner{ + environment: e.Environment, + timeout: time.Duration(e.Timeout), + debug: e.Log.Level().Includes(telegraf.Debug), + } + return nil } @@ -68,60 +78,93 @@ func (e *Exec) SetParser(parser telegraf.Parser) { unwrapped, ok := parser.(*models.RunningParser) if ok { if _, ok := unwrapped.Parser.(*nagios.Parser); ok { - e.exitCodeHandler = nagiosHandler + e.exitCodeHandler = func(metrics []telegraf.Metric, err error, msg []byte) []telegraf.Metric { + return nagios.AddState(err, msg, metrics) + } e.parseDespiteError = true } } } func (e *Exec) Gather(acc telegraf.Accumulator) error { + commands := e.updateRunners() + var wg sync.WaitGroup - // Legacy single command support - if e.Command != "" { - e.Commands = append(e.Commands, e.Command) - e.Command = "" - } + for _, cmd := range commands { + wg.Add(1) - commands := make([]string, 0, len(e.Commands)) - for _, pattern := range e.Commands { - cmdAndArgs := strings.SplitN(pattern, " ", 2) - if len(cmdAndArgs) == 0 { - continue - } - - matches, err := filepath.Glob(cmdAndArgs[0]) - if err != nil { - acc.AddError(err) - continue - } - - if len(matches) == 0 { - // There were no matches with the glob pattern, so let's assume - // that the command is in PATH and just run it as it is - commands = append(commands, pattern) - } else { - // There were matches, so we'll append each match together with - // the arguments to the commands slice - for _, match := range matches { - if len(cmdAndArgs) == 1 { - commands = append(commands, match) - } else { - commands = append(commands, - strings.Join([]string{match, cmdAndArgs[1]}, " ")) - } - } - } - } - - wg.Add(len(commands)) - for _, command := range commands { - go e.processCommand(command, acc, &wg) + go func(c string) { + defer wg.Done() + acc.AddError(e.processCommand(acc, c)) + }(cmd) } wg.Wait() return nil } -func truncate(buf bytes.Buffer) bytes.Buffer { +func (e *Exec) updateRunners() []string { + commands := make([]string, 0, len(e.Commands)) + for _, pattern := range e.Commands { + if pattern == "" { + continue + } + + // Try to expand globbing expressions + cmd, args, found := strings.Cut(pattern, " ") + matches, err := filepath.Glob(cmd) + if err != nil { + e.Log.Errorf("Matching command %q failed: %v", cmd, err) + continue + } + + if len(matches) == 0 { + // There were no matches with the glob pattern, so let's assume + // the command is in PATH and just run it as it is + commands = append(commands, pattern) + } else { + // There were matches, so we'll append each match together with + // the arguments to the commands slice + for _, match := range matches { + if found { + match += " " + args + } + commands = append(commands, match) + } + } + } + + return commands +} + +func (e *Exec) processCommand(acc telegraf.Accumulator, cmd string) error { + out, errBuf, runErr := e.runner.run(cmd) + if !e.IgnoreError && !e.parseDespiteError && runErr != nil { + return fmt.Errorf("exec: %w for command %q: %s", runErr, cmd, string(errBuf)) + } + + metrics, err := e.parser.Parse(out) + if err != nil { + return err + } + + if len(metrics) == 0 { + once.Do(func() { + e.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + + if e.exitCodeHandler != nil { + metrics = e.exitCodeHandler(metrics, runErr, errBuf) + } + + for _, m := range metrics { + acc.AddMetric(m) + } + + return nil +} + +func truncate(buf *bytes.Buffer) { // Limit the number of bytes. didTruncate := false if buf.Len() > maxStderrBytes { @@ -138,72 +181,12 @@ func truncate(buf bytes.Buffer) bytes.Buffer { if didTruncate { buf.WriteString("...") } - return buf -} - -// removeWindowsCarriageReturns removes all carriage returns from the input if the -// OS is Windows. It does not return any errors. -func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer { - if runtime.GOOS == "windows" { - var buf bytes.Buffer - for { - byt, err := b.ReadBytes(0x0D) - byt = bytes.TrimRight(byt, "\x0d") - if len(byt) > 0 { - buf.Write(byt) - } - if errors.Is(err, io.EOF) { - return buf - } - } - } - return b -} - -func (e *Exec) processCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) { - defer wg.Done() - - out, errBuf, runErr := e.runner.run(command, e.Environment, time.Duration(e.Timeout)) - if !e.IgnoreError && !e.parseDespiteError && runErr != nil { - err := fmt.Errorf("exec: %w for command %q: %s", runErr, command, string(errBuf)) - acc.AddError(err) - return - } - - metrics, err := e.parser.Parse(out) - if err != nil { - acc.AddError(err) - return - } - - if len(metrics) == 0 { - once.Do(func() { - e.Log.Debug(internal.NoMetricsCreatedMsg) - }) - } - - if e.exitCodeHandler != nil { - metrics = e.exitCodeHandler(metrics, runErr, errBuf) - } - - for _, m := range metrics { - acc.AddMetric(m) - } -} - -func nagiosHandler(metrics []telegraf.Metric, err error, msg []byte) []telegraf.Metric { - return nagios.AddState(err, msg, metrics) -} - -func newExec() *Exec { - return &Exec{ - runner: commandRunner{}, - Timeout: config.Duration(time.Second * 5), - } } func init() { inputs.Add("exec", func() telegraf.Input { - return newExec() + return &Exec{ + Timeout: config.Duration(5 * time.Second), + } }) } diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 8d99079ec..ce631d406 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -8,7 +8,7 @@ package exec import ( "bytes" "errors" - "runtime" + "strings" "testing" "time" @@ -44,243 +44,293 @@ const malformedJSON = ` "status": "green", ` -type carriageReturnTest struct { - input []byte - output []byte -} - -var crTests = []carriageReturnTest{ - {[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0d, 0x0a, 0x4c, 0x69, - 0x6e, 0x65, 0x20, 0x32, 0x0d, 0x0a, 0x4c, 0x69, 0x6e, 0x65, - 0x20, 0x33}, - []byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0a, 0x4c, 0x69, 0x6e, - 0x65, 0x20, 0x32, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x33}}, - {[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0a, 0x4c, 0x69, 0x6e, - 0x65, 0x20, 0x32, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x33}, - []byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0a, 0x4c, 0x69, 0x6e, - 0x65, 0x20, 0x32, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x33}}, - {[]byte{0x54, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x6c, - 0x6c, 0x20, 0x6f, 0x6e, 0x65, 0x20, 0x62, 0x69, 0x67, 0x20, - 0x6c, 0x69, 0x6e, 0x65}, - []byte{0x54, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x6c, - 0x6c, 0x20, 0x6f, 0x6e, 0x65, 0x20, 0x62, 0x69, 0x67, 0x20, - 0x6c, 0x69, 0x6e, 0x65}}, -} - type runnerMock struct { out []byte errout []byte err error } -func newRunnerMock(out, errout []byte, err error) runner { - return &runnerMock{ - out: out, - errout: errout, - err: err, - } -} - -func (r runnerMock) run(string, []string, time.Duration) (out, errout []byte, err error) { +func (r runnerMock) run(string) (out, errout []byte, err error) { return r.out, r.errout, r.err } func TestExec(t *testing.T) { + // Setup parser parser := &json.Parser{MetricName: "exec"} require.NoError(t, parser.Init()) - e := &Exec{ - Log: testutil.Logger{}, - runner: newRunnerMock([]byte(validJSON), nil, nil), + + // Setup plugin + plugin := &Exec{ Commands: []string{"testcommand arg1"}, - parser: parser, + Log: testutil.Logger{}, } + plugin.SetParser(parser) + require.NoError(t, plugin.Init()) + plugin.runner = &runnerMock{out: []byte(validJSON)} + // Gather the metrics and check the result var acc testutil.Accumulator - err := acc.GatherError(e.Gather) - require.NoError(t, err) - require.Equal(t, 8, acc.NFields(), "non-numeric measurements should be ignored") + require.NoError(t, acc.GatherError(plugin.Gather)) - fields := map[string]interface{}{ - "num_processes": float64(82), - "cpu_used": float64(8234), - "cpu_free": float64(32), - "percent": float64(0.81), - "users_0": float64(0), - "users_1": float64(1), - "users_2": float64(2), - "users_3": float64(3), + expected := []telegraf.Metric{ + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "num_processes": float64(82), + "cpu_used": float64(8234), + "cpu_free": float64(32), + "percent": float64(0.81), + "users_0": float64(0), + "users_1": float64(1), + "users_2": float64(2), + "users_3": float64(3), + }, + time.Unix(0, 0), + ), } - acc.AssertContainsFields(t, "exec", fields) + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } func TestExecMalformed(t *testing.T) { + // Setup parser parser := &json.Parser{MetricName: "exec"} require.NoError(t, parser.Init()) - e := &Exec{ - Log: testutil.Logger{}, - runner: newRunnerMock([]byte(malformedJSON), nil, nil), - Commands: []string{"badcommand arg1"}, - parser: parser, - } + // Setup plugin + plugin := &Exec{ + Commands: []string{"badcommand arg1"}, + Log: testutil.Logger{}, + } + plugin.SetParser(parser) + require.NoError(t, plugin.Init()) + plugin.runner = &runnerMock{out: []byte(malformedJSON)} + + // Gather the metrics and check the result var acc testutil.Accumulator - require.Error(t, acc.GatherError(e.Gather)) - require.Equal(t, 0, acc.NFields(), "No new points should have been added") + require.ErrorContains(t, acc.GatherError(plugin.Gather), "unexpected end of JSON input") + require.Empty(t, acc.GetTelegrafMetrics()) } func TestCommandError(t *testing.T) { + // Setup parser parser := &json.Parser{MetricName: "exec"} require.NoError(t, parser.Init()) - e := &Exec{ - Log: testutil.Logger{}, - runner: newRunnerMock(nil, nil, errors.New("exit status code 1")), - Commands: []string{"badcommand"}, - parser: parser, - } + // Setup plugin + plugin := &Exec{ + Commands: []string{"badcommand"}, + Log: testutil.Logger{}, + } + plugin.SetParser(parser) + require.NoError(t, plugin.Init()) + plugin.runner = &runnerMock{err: errors.New("exit status code 1")} + + // Gather the metrics and check the result var acc testutil.Accumulator - require.Error(t, acc.GatherError(e.Gather)) + require.ErrorContains(t, acc.GatherError(plugin.Gather), "exit status code 1 for command") require.Equal(t, 0, acc.NFields(), "No new points should have been added") } func TestCommandIgnoreError(t *testing.T) { + // Setup parser parser := &json.Parser{MetricName: "exec"} require.NoError(t, parser.Init()) - e := &Exec{ - Log: testutil.Logger{}, - runner: newRunnerMock([]byte(validJSON), []byte("error"), errors.New("exit status code 1")), + + // Setup plugin + plugin := &Exec{ Commands: []string{"badcommand"}, IgnoreError: true, - parser: parser, + Log: testutil.Logger{}, + } + plugin.SetParser(parser) + require.NoError(t, plugin.Init()) + plugin.runner = &runnerMock{ + out: []byte(validJSON), + errout: []byte("error"), + err: errors.New("exit status code 1"), } + // Gather the metrics and check the result var acc testutil.Accumulator - require.NoError(t, acc.GatherError(e.Gather)) - require.Equal(t, 8, acc.NFields(), "non-numeric measurements should be ignored") + require.NoError(t, acc.GatherError(plugin.Gather)) - fields := map[string]interface{}{ - "num_processes": float64(82), - "cpu_used": float64(8234), - "cpu_free": float64(32), - "percent": float64(0.81), - "users_0": float64(0), - "users_1": float64(1), - "users_2": float64(2), - "users_3": float64(3), + expected := []telegraf.Metric{ + metric.New( + "exec", + map[string]string{}, + map[string]interface{}{ + "num_processes": float64(82), + "cpu_used": float64(8234), + "cpu_free": float64(32), + "percent": float64(0.81), + "users_0": float64(0), + "users_1": float64(1), + "users_2": float64(2), + "users_3": float64(3), + }, + time.Unix(0, 0), + ), } - acc.AssertContainsFields(t, "exec", fields) + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } func TestExecCommandWithGlob(t *testing.T) { + // Setup parser parser := value.Parser{ MetricName: "metric", DataType: "string", } require.NoError(t, parser.Init()) - e := newExec() - e.Commands = []string{"/bin/ech* metric_value"} - e.SetParser(&parser) - - var acc testutil.Accumulator - require.NoError(t, acc.GatherError(e.Gather)) - - fields := map[string]interface{}{ - "value": "metric_value", + // Setup plugin + plugin := &Exec{ + Commands: []string{"/bin/ech* metric_value"}, + Timeout: config.Duration(5 * time.Second), + Log: testutil.Logger{}, } - acc.AssertContainsFields(t, "metric", fields) + plugin.SetParser(&parser) + require.NoError(t, plugin.Init()) + + // Gather the metrics and check the result + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + expected := []telegraf.Metric{ + metric.New( + "metric", + map[string]string{}, + map[string]interface{}{ + "value": "metric_value", + }, + time.Unix(0, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } func TestExecCommandWithoutGlob(t *testing.T) { + // Setup parser parser := value.Parser{ MetricName: "metric", DataType: "string", } require.NoError(t, parser.Init()) - e := newExec() - e.Commands = []string{"/bin/echo metric_value"} - e.SetParser(&parser) - - var acc testutil.Accumulator - require.NoError(t, acc.GatherError(e.Gather)) - - fields := map[string]interface{}{ - "value": "metric_value", + // Setup plugin + plugin := &Exec{ + Commands: []string{"/bin/echo metric_value"}, + Timeout: config.Duration(5 * time.Second), + Log: testutil.Logger{}, } - acc.AssertContainsFields(t, "metric", fields) + plugin.SetParser(&parser) + require.NoError(t, plugin.Init()) + + // Gather the metrics and check the result + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + expected := []telegraf.Metric{ + metric.New( + "metric", + map[string]string{}, + map[string]interface{}{ + "value": "metric_value", + }, + time.Unix(0, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } func TestExecCommandWithoutGlobAndPath(t *testing.T) { + // Setup parser parser := value.Parser{ MetricName: "metric", DataType: "string", } require.NoError(t, parser.Init()) - e := newExec() - e.Commands = []string{"echo metric_value"} - e.SetParser(&parser) - var acc testutil.Accumulator - require.NoError(t, acc.GatherError(e.Gather)) - - fields := map[string]interface{}{ - "value": "metric_value", + // Setup plugin + plugin := &Exec{ + Commands: []string{"echo metric_value"}, + Timeout: config.Duration(5 * time.Second), + Log: testutil.Logger{}, } - acc.AssertContainsFields(t, "metric", fields) + plugin.SetParser(&parser) + require.NoError(t, plugin.Init()) + + // Gather the metrics and check the result + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + expected := []telegraf.Metric{ + metric.New( + "metric", + map[string]string{}, + map[string]interface{}{ + "value": "metric_value", + }, + time.Unix(0, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } func TestExecCommandWithEnv(t *testing.T) { + // Setup parser parser := value.Parser{ MetricName: "metric", DataType: "string", } require.NoError(t, parser.Init()) - e := newExec() - e.Commands = []string{"/bin/sh -c 'echo ${METRIC_NAME}'"} - e.Environment = []string{"METRIC_NAME=metric_value"} - e.SetParser(&parser) - var acc testutil.Accumulator - require.NoError(t, acc.GatherError(e.Gather)) - - fields := map[string]interface{}{ - "value": "metric_value", + // Setup plugin + plugin := &Exec{ + Commands: []string{"/bin/sh -c 'echo ${METRIC_NAME}'"}, + Environment: []string{"METRIC_NAME=metric_value"}, + Timeout: config.Duration(5 * time.Second), + Log: testutil.Logger{}, } - acc.AssertContainsFields(t, "metric", fields) + plugin.SetParser(&parser) + require.NoError(t, plugin.Init()) + + // Gather the metrics and check the result + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + expected := []telegraf.Metric{ + metric.New( + "metric", + map[string]string{}, + map[string]interface{}{ + "value": "metric_value", + }, + time.Unix(0, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } func TestTruncate(t *testing.T) { tests := []struct { - name string - bufF func() *bytes.Buffer - expF func() *bytes.Buffer + name string + bufF func() *bytes.Buffer + expected string }{ { name: "should not truncate", bufF: func() *bytes.Buffer { - var b bytes.Buffer - b.WriteString("hello world") - return &b - }, - expF: func() *bytes.Buffer { - var b bytes.Buffer - b.WriteString("hello world") - return &b + return bytes.NewBufferString("hello world") }, + expected: "hello world", }, { name: "should truncate up to the new line", bufF: func() *bytes.Buffer { - var b bytes.Buffer - b.WriteString("hello world\nand all the people") - return &b - }, - expF: func() *bytes.Buffer { - var b bytes.Buffer - b.WriteString("hello world...") - return &b + return bytes.NewBufferString("hello world\nand all the people") }, + expected: "hello world...", }, { name: "should truncate to the maxStderrBytes", @@ -291,44 +341,19 @@ func TestTruncate(t *testing.T) { } return &b }, - expF: func() *bytes.Buffer { - var b bytes.Buffer - for i := 0; i < maxStderrBytes; i++ { - b.WriteByte('b') - } - b.WriteString("...") - return &b - }, + expected: strings.Repeat("b", maxStderrBytes) + "...", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - res := truncate(*tt.bufF()) - require.Equal(t, tt.expF().Bytes(), res.Bytes()) + buf := tt.bufF() + truncate(buf) + require.Equal(t, tt.expected, buf.String()) }) } } -func TestRemoveCarriageReturns(t *testing.T) { - //nolint:staticcheck // Silence linter for now as we plan to reenable tests for Windows later - if runtime.GOOS == "windows" { - // Test that all carriage returns are removed - for _, test := range crTests { - b := bytes.NewBuffer(test.input) - out := removeWindowsCarriageReturns(*b) - require.True(t, bytes.Equal(test.output, out.Bytes())) - } - } else { - // Test that the buffer is returned unaltered - for _, test := range crTests { - b := bytes.NewBuffer(test.input) - out := removeWindowsCarriageReturns(*b) - require.True(t, bytes.Equal(test.input, out.Bytes())) - } - } -} - func TestCSVBehavior(t *testing.T) { // Setup the CSV parser parser := &csv.Parser{ @@ -339,9 +364,11 @@ func TestCSVBehavior(t *testing.T) { require.NoError(t, parser.Init()) // Setup the plugin - plugin := newExec() - plugin.Commands = []string{"echo \"a,b\n1,2\n3,4\""} - plugin.Log = testutil.Logger{} + plugin := &Exec{ + Commands: []string{"echo \"a,b\n1,2\n3,4\""}, + Timeout: config.Duration(5 * time.Second), + Log: testutil.Logger{}, + } plugin.SetParser(parser) require.NoError(t, plugin.Init()) @@ -407,7 +434,10 @@ func TestCSVBehavior(t *testing.T) { func TestCases(t *testing.T) { // Register the plugin inputs.Add("exec", func() telegraf.Input { - return newExec() + return &Exec{ + Timeout: config.Duration(5 * time.Second), + Log: testutil.Logger{}, + } }) // Setup the plugin diff --git a/plugins/inputs/exec/run_notwindows.go b/plugins/inputs/exec/run_notwindows.go index b36599e65..1457f0a20 100644 --- a/plugins/inputs/exec/run_notwindows.go +++ b/plugins/inputs/exec/run_notwindows.go @@ -8,43 +8,33 @@ import ( "os" "os/exec" "syscall" - "time" "github.com/kballard/go-shellquote" "github.com/influxdata/telegraf/internal" ) -func (c commandRunner) run( - command string, - environments []string, - timeout time.Duration, -) (out, errout []byte, err error) { +func (c *commandRunner) run(command string) (out, errout []byte, err error) { splitCmd, err := shellquote.Split(command) if err != nil || len(splitCmd) == 0 { - return nil, nil, fmt.Errorf("exec: unable to parse command: %w", err) + return nil, nil, fmt.Errorf("exec: unable to parse command %q: %w", command, err) } cmd := exec.Command(splitCmd[0], splitCmd[1:]...) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - if len(environments) > 0 { - cmd.Env = append(os.Environ(), environments...) + if len(c.environment) > 0 { + cmd.Env = append(os.Environ(), c.environment...) } - var ( - outbuf bytes.Buffer - stderr bytes.Buffer - ) + var outbuf, stderr bytes.Buffer cmd.Stdout = &outbuf cmd.Stderr = &stderr - runErr := internal.RunTimeout(cmd, timeout) + runErr := internal.RunTimeout(cmd, c.timeout) - outbuf = removeWindowsCarriageReturns(outbuf) if stderr.Len() > 0 && !c.debug { - stderr = removeWindowsCarriageReturns(stderr) - stderr = truncate(stderr) + truncate(&stderr) } return outbuf.Bytes(), stderr.Bytes(), runErr diff --git a/plugins/inputs/exec/run_windows.go b/plugins/inputs/exec/run_windows.go index 8cef6e1e3..c07146fe9 100644 --- a/plugins/inputs/exec/run_windows.go +++ b/plugins/inputs/exec/run_windows.go @@ -4,22 +4,19 @@ package exec import ( "bytes" + "errors" "fmt" + "io" "os" "os/exec" "syscall" - "time" "github.com/kballard/go-shellquote" "github.com/influxdata/telegraf/internal" ) -func (c commandRunner) run( - command string, - environments []string, - timeout time.Duration, -) (out, errout []byte, err error) { +func (c *commandRunner) run(command string) (out, errout []byte, err error) { splitCmd, err := shellquote.Split(command) if err != nil || len(splitCmd) == 0 { return nil, nil, fmt.Errorf("exec: unable to parse command: %w", err) @@ -30,24 +27,35 @@ func (c commandRunner) run( CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, } - if len(environments) > 0 { - cmd.Env = append(os.Environ(), environments...) + if len(c.environment) > 0 { + cmd.Env = append(os.Environ(), c.environment...) } - var ( - outbuf bytes.Buffer - stderr bytes.Buffer - ) + var outbuf, stderr bytes.Buffer cmd.Stdout = &outbuf cmd.Stderr = &stderr - runErr := internal.RunTimeout(cmd, timeout) + runErr := internal.RunTimeout(cmd, c.timeout) outbuf = removeWindowsCarriageReturns(outbuf) + stderr = removeWindowsCarriageReturns(stderr) if stderr.Len() > 0 && !c.debug { - stderr = removeWindowsCarriageReturns(stderr) - stderr = truncate(stderr) + truncate(&stderr) } return outbuf.Bytes(), stderr.Bytes(), runErr } + +func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer { + var buf bytes.Buffer + for { + byt, err := b.ReadBytes(0x0D) + byt = bytes.TrimRight(byt, "\x0d") + if len(byt) > 0 { + buf.Write(byt) + } + if errors.Is(err, io.EOF) { + return buf + } + } +}