feat(inputs.exec): Allow to get untruncated errors in debug mode (#16501)

This commit is contained in:
Sven Rebhan 2025-03-05 19:17:54 +01:00 committed by GitHub
parent b122159245
commit d60bcd38d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 325 additions and 314 deletions

View File

@ -4,11 +4,8 @@ package exec
import ( import (
"bytes" "bytes"
_ "embed" _ "embed"
"errors"
"fmt" "fmt"
"io"
"path/filepath" "path/filepath"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -48,18 +45,31 @@ type Exec struct {
type exitCodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric type exitCodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric
type runner interface { type runner interface {
run(string, []string, time.Duration) ([]byte, []byte, error) run(string) ([]byte, []byte, error)
} }
type commandRunner struct { type commandRunner struct {
debug bool environment []string
timeout time.Duration
debug bool
} }
func (*Exec) SampleConfig() string { func (*Exec) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (*Exec) Init() error { func (e *Exec) Init() error {
// Legacy single command support
if e.Command != "" {
e.Commands = append(e.Commands, e.Command)
}
e.runner = &commandRunner{
environment: e.Environment,
timeout: time.Duration(e.Timeout),
debug: e.Log.Level().Includes(telegraf.Debug),
}
return nil return nil
} }
@ -68,60 +78,93 @@ func (e *Exec) SetParser(parser telegraf.Parser) {
unwrapped, ok := parser.(*models.RunningParser) unwrapped, ok := parser.(*models.RunningParser)
if ok { if ok {
if _, ok := unwrapped.Parser.(*nagios.Parser); ok { if _, ok := unwrapped.Parser.(*nagios.Parser); ok {
e.exitCodeHandler = nagiosHandler e.exitCodeHandler = func(metrics []telegraf.Metric, err error, msg []byte) []telegraf.Metric {
return nagios.AddState(err, msg, metrics)
}
e.parseDespiteError = true e.parseDespiteError = true
} }
} }
} }
func (e *Exec) Gather(acc telegraf.Accumulator) error { func (e *Exec) Gather(acc telegraf.Accumulator) error {
commands := e.updateRunners()
var wg sync.WaitGroup var wg sync.WaitGroup
// Legacy single command support for _, cmd := range commands {
if e.Command != "" { wg.Add(1)
e.Commands = append(e.Commands, e.Command)
e.Command = ""
}
commands := make([]string, 0, len(e.Commands)) go func(c string) {
for _, pattern := range e.Commands { defer wg.Done()
cmdAndArgs := strings.SplitN(pattern, " ", 2) acc.AddError(e.processCommand(acc, c))
if len(cmdAndArgs) == 0 { }(cmd)
continue
}
matches, err := filepath.Glob(cmdAndArgs[0])
if err != nil {
acc.AddError(err)
continue
}
if len(matches) == 0 {
// There were no matches with the glob pattern, so let's assume
// that the command is in PATH and just run it as it is
commands = append(commands, pattern)
} else {
// There were matches, so we'll append each match together with
// the arguments to the commands slice
for _, match := range matches {
if len(cmdAndArgs) == 1 {
commands = append(commands, match)
} else {
commands = append(commands,
strings.Join([]string{match, cmdAndArgs[1]}, " "))
}
}
}
}
wg.Add(len(commands))
for _, command := range commands {
go e.processCommand(command, acc, &wg)
} }
wg.Wait() wg.Wait()
return nil return nil
} }
func truncate(buf bytes.Buffer) bytes.Buffer { func (e *Exec) updateRunners() []string {
commands := make([]string, 0, len(e.Commands))
for _, pattern := range e.Commands {
if pattern == "" {
continue
}
// Try to expand globbing expressions
cmd, args, found := strings.Cut(pattern, " ")
matches, err := filepath.Glob(cmd)
if err != nil {
e.Log.Errorf("Matching command %q failed: %v", cmd, err)
continue
}
if len(matches) == 0 {
// There were no matches with the glob pattern, so let's assume
// the command is in PATH and just run it as it is
commands = append(commands, pattern)
} else {
// There were matches, so we'll append each match together with
// the arguments to the commands slice
for _, match := range matches {
if found {
match += " " + args
}
commands = append(commands, match)
}
}
}
return commands
}
func (e *Exec) processCommand(acc telegraf.Accumulator, cmd string) error {
out, errBuf, runErr := e.runner.run(cmd)
if !e.IgnoreError && !e.parseDespiteError && runErr != nil {
return fmt.Errorf("exec: %w for command %q: %s", runErr, cmd, string(errBuf))
}
metrics, err := e.parser.Parse(out)
if err != nil {
return err
}
if len(metrics) == 0 {
once.Do(func() {
e.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
if e.exitCodeHandler != nil {
metrics = e.exitCodeHandler(metrics, runErr, errBuf)
}
for _, m := range metrics {
acc.AddMetric(m)
}
return nil
}
func truncate(buf *bytes.Buffer) {
// Limit the number of bytes. // Limit the number of bytes.
didTruncate := false didTruncate := false
if buf.Len() > maxStderrBytes { if buf.Len() > maxStderrBytes {
@ -138,72 +181,12 @@ func truncate(buf bytes.Buffer) bytes.Buffer {
if didTruncate { if didTruncate {
buf.WriteString("...") buf.WriteString("...")
} }
return buf
}
// removeWindowsCarriageReturns removes all carriage returns from the input if the
// OS is Windows. It does not return any errors.
func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer {
if runtime.GOOS == "windows" {
var buf bytes.Buffer
for {
byt, err := b.ReadBytes(0x0D)
byt = bytes.TrimRight(byt, "\x0d")
if len(byt) > 0 {
buf.Write(byt)
}
if errors.Is(err, io.EOF) {
return buf
}
}
}
return b
}
func (e *Exec) processCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer wg.Done()
out, errBuf, runErr := e.runner.run(command, e.Environment, time.Duration(e.Timeout))
if !e.IgnoreError && !e.parseDespiteError && runErr != nil {
err := fmt.Errorf("exec: %w for command %q: %s", runErr, command, string(errBuf))
acc.AddError(err)
return
}
metrics, err := e.parser.Parse(out)
if err != nil {
acc.AddError(err)
return
}
if len(metrics) == 0 {
once.Do(func() {
e.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
if e.exitCodeHandler != nil {
metrics = e.exitCodeHandler(metrics, runErr, errBuf)
}
for _, m := range metrics {
acc.AddMetric(m)
}
}
func nagiosHandler(metrics []telegraf.Metric, err error, msg []byte) []telegraf.Metric {
return nagios.AddState(err, msg, metrics)
}
func newExec() *Exec {
return &Exec{
runner: commandRunner{},
Timeout: config.Duration(time.Second * 5),
}
} }
func init() { func init() {
inputs.Add("exec", func() telegraf.Input { inputs.Add("exec", func() telegraf.Input {
return newExec() return &Exec{
Timeout: config.Duration(5 * time.Second),
}
}) })
} }

View File

@ -8,7 +8,7 @@ package exec
import ( import (
"bytes" "bytes"
"errors" "errors"
"runtime" "strings"
"testing" "testing"
"time" "time"
@ -44,243 +44,293 @@ const malformedJSON = `
"status": "green", "status": "green",
` `
type carriageReturnTest struct {
input []byte
output []byte
}
var crTests = []carriageReturnTest{
{[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0d, 0x0a, 0x4c, 0x69,
0x6e, 0x65, 0x20, 0x32, 0x0d, 0x0a, 0x4c, 0x69, 0x6e, 0x65,
0x20, 0x33},
[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0a, 0x4c, 0x69, 0x6e,
0x65, 0x20, 0x32, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x33}},
{[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0a, 0x4c, 0x69, 0x6e,
0x65, 0x20, 0x32, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x33},
[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0a, 0x4c, 0x69, 0x6e,
0x65, 0x20, 0x32, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x33}},
{[]byte{0x54, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x6c,
0x6c, 0x20, 0x6f, 0x6e, 0x65, 0x20, 0x62, 0x69, 0x67, 0x20,
0x6c, 0x69, 0x6e, 0x65},
[]byte{0x54, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x6c,
0x6c, 0x20, 0x6f, 0x6e, 0x65, 0x20, 0x62, 0x69, 0x67, 0x20,
0x6c, 0x69, 0x6e, 0x65}},
}
type runnerMock struct { type runnerMock struct {
out []byte out []byte
errout []byte errout []byte
err error err error
} }
func newRunnerMock(out, errout []byte, err error) runner { func (r runnerMock) run(string) (out, errout []byte, err error) {
return &runnerMock{
out: out,
errout: errout,
err: err,
}
}
func (r runnerMock) run(string, []string, time.Duration) (out, errout []byte, err error) {
return r.out, r.errout, r.err return r.out, r.errout, r.err
} }
func TestExec(t *testing.T) { func TestExec(t *testing.T) {
// Setup parser
parser := &json.Parser{MetricName: "exec"} parser := &json.Parser{MetricName: "exec"}
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := &Exec{
Log: testutil.Logger{}, // Setup plugin
runner: newRunnerMock([]byte(validJSON), nil, nil), plugin := &Exec{
Commands: []string{"testcommand arg1"}, Commands: []string{"testcommand arg1"},
parser: parser, Log: testutil.Logger{},
} }
plugin.SetParser(parser)
require.NoError(t, plugin.Init())
plugin.runner = &runnerMock{out: []byte(validJSON)}
// Gather the metrics and check the result
var acc testutil.Accumulator var acc testutil.Accumulator
err := acc.GatherError(e.Gather) require.NoError(t, acc.GatherError(plugin.Gather))
require.NoError(t, err)
require.Equal(t, 8, acc.NFields(), "non-numeric measurements should be ignored")
fields := map[string]interface{}{ expected := []telegraf.Metric{
"num_processes": float64(82), metric.New(
"cpu_used": float64(8234), "exec",
"cpu_free": float64(32), map[string]string{},
"percent": float64(0.81), map[string]interface{}{
"users_0": float64(0), "num_processes": float64(82),
"users_1": float64(1), "cpu_used": float64(8234),
"users_2": float64(2), "cpu_free": float64(32),
"users_3": float64(3), "percent": float64(0.81),
"users_0": float64(0),
"users_1": float64(1),
"users_2": float64(2),
"users_3": float64(3),
},
time.Unix(0, 0),
),
} }
acc.AssertContainsFields(t, "exec", fields) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
} }
func TestExecMalformed(t *testing.T) { func TestExecMalformed(t *testing.T) {
// Setup parser
parser := &json.Parser{MetricName: "exec"} parser := &json.Parser{MetricName: "exec"}
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := &Exec{
Log: testutil.Logger{},
runner: newRunnerMock([]byte(malformedJSON), nil, nil),
Commands: []string{"badcommand arg1"},
parser: parser,
}
// Setup plugin
plugin := &Exec{
Commands: []string{"badcommand arg1"},
Log: testutil.Logger{},
}
plugin.SetParser(parser)
require.NoError(t, plugin.Init())
plugin.runner = &runnerMock{out: []byte(malformedJSON)}
// Gather the metrics and check the result
var acc testutil.Accumulator var acc testutil.Accumulator
require.Error(t, acc.GatherError(e.Gather)) require.ErrorContains(t, acc.GatherError(plugin.Gather), "unexpected end of JSON input")
require.Equal(t, 0, acc.NFields(), "No new points should have been added") require.Empty(t, acc.GetTelegrafMetrics())
} }
func TestCommandError(t *testing.T) { func TestCommandError(t *testing.T) {
// Setup parser
parser := &json.Parser{MetricName: "exec"} parser := &json.Parser{MetricName: "exec"}
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := &Exec{
Log: testutil.Logger{},
runner: newRunnerMock(nil, nil, errors.New("exit status code 1")),
Commands: []string{"badcommand"},
parser: parser,
}
// Setup plugin
plugin := &Exec{
Commands: []string{"badcommand"},
Log: testutil.Logger{},
}
plugin.SetParser(parser)
require.NoError(t, plugin.Init())
plugin.runner = &runnerMock{err: errors.New("exit status code 1")}
// Gather the metrics and check the result
var acc testutil.Accumulator var acc testutil.Accumulator
require.Error(t, acc.GatherError(e.Gather)) require.ErrorContains(t, acc.GatherError(plugin.Gather), "exit status code 1 for command")
require.Equal(t, 0, acc.NFields(), "No new points should have been added") require.Equal(t, 0, acc.NFields(), "No new points should have been added")
} }
func TestCommandIgnoreError(t *testing.T) { func TestCommandIgnoreError(t *testing.T) {
// Setup parser
parser := &json.Parser{MetricName: "exec"} parser := &json.Parser{MetricName: "exec"}
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := &Exec{
Log: testutil.Logger{}, // Setup plugin
runner: newRunnerMock([]byte(validJSON), []byte("error"), errors.New("exit status code 1")), plugin := &Exec{
Commands: []string{"badcommand"}, Commands: []string{"badcommand"},
IgnoreError: true, IgnoreError: true,
parser: parser, Log: testutil.Logger{},
}
plugin.SetParser(parser)
require.NoError(t, plugin.Init())
plugin.runner = &runnerMock{
out: []byte(validJSON),
errout: []byte("error"),
err: errors.New("exit status code 1"),
} }
// Gather the metrics and check the result
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, acc.GatherError(e.Gather)) require.NoError(t, acc.GatherError(plugin.Gather))
require.Equal(t, 8, acc.NFields(), "non-numeric measurements should be ignored")
fields := map[string]interface{}{ expected := []telegraf.Metric{
"num_processes": float64(82), metric.New(
"cpu_used": float64(8234), "exec",
"cpu_free": float64(32), map[string]string{},
"percent": float64(0.81), map[string]interface{}{
"users_0": float64(0), "num_processes": float64(82),
"users_1": float64(1), "cpu_used": float64(8234),
"users_2": float64(2), "cpu_free": float64(32),
"users_3": float64(3), "percent": float64(0.81),
"users_0": float64(0),
"users_1": float64(1),
"users_2": float64(2),
"users_3": float64(3),
},
time.Unix(0, 0),
),
} }
acc.AssertContainsFields(t, "exec", fields) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
} }
func TestExecCommandWithGlob(t *testing.T) { func TestExecCommandWithGlob(t *testing.T) {
// Setup parser
parser := value.Parser{ parser := value.Parser{
MetricName: "metric", MetricName: "metric",
DataType: "string", DataType: "string",
} }
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := newExec() // Setup plugin
e.Commands = []string{"/bin/ech* metric_value"} plugin := &Exec{
e.SetParser(&parser) Commands: []string{"/bin/ech* metric_value"},
Timeout: config.Duration(5 * time.Second),
var acc testutil.Accumulator Log: testutil.Logger{},
require.NoError(t, acc.GatherError(e.Gather))
fields := map[string]interface{}{
"value": "metric_value",
} }
acc.AssertContainsFields(t, "metric", fields) plugin.SetParser(&parser)
require.NoError(t, plugin.Init())
// Gather the metrics and check the result
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))
expected := []telegraf.Metric{
metric.New(
"metric",
map[string]string{},
map[string]interface{}{
"value": "metric_value",
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
} }
func TestExecCommandWithoutGlob(t *testing.T) { func TestExecCommandWithoutGlob(t *testing.T) {
// Setup parser
parser := value.Parser{ parser := value.Parser{
MetricName: "metric", MetricName: "metric",
DataType: "string", DataType: "string",
} }
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := newExec() // Setup plugin
e.Commands = []string{"/bin/echo metric_value"} plugin := &Exec{
e.SetParser(&parser) Commands: []string{"/bin/echo metric_value"},
Timeout: config.Duration(5 * time.Second),
var acc testutil.Accumulator Log: testutil.Logger{},
require.NoError(t, acc.GatherError(e.Gather))
fields := map[string]interface{}{
"value": "metric_value",
} }
acc.AssertContainsFields(t, "metric", fields) plugin.SetParser(&parser)
require.NoError(t, plugin.Init())
// Gather the metrics and check the result
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))
expected := []telegraf.Metric{
metric.New(
"metric",
map[string]string{},
map[string]interface{}{
"value": "metric_value",
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
} }
func TestExecCommandWithoutGlobAndPath(t *testing.T) { func TestExecCommandWithoutGlobAndPath(t *testing.T) {
// Setup parser
parser := value.Parser{ parser := value.Parser{
MetricName: "metric", MetricName: "metric",
DataType: "string", DataType: "string",
} }
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := newExec()
e.Commands = []string{"echo metric_value"}
e.SetParser(&parser)
var acc testutil.Accumulator // Setup plugin
require.NoError(t, acc.GatherError(e.Gather)) plugin := &Exec{
Commands: []string{"echo metric_value"},
fields := map[string]interface{}{ Timeout: config.Duration(5 * time.Second),
"value": "metric_value", Log: testutil.Logger{},
} }
acc.AssertContainsFields(t, "metric", fields) plugin.SetParser(&parser)
require.NoError(t, plugin.Init())
// Gather the metrics and check the result
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))
expected := []telegraf.Metric{
metric.New(
"metric",
map[string]string{},
map[string]interface{}{
"value": "metric_value",
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
} }
func TestExecCommandWithEnv(t *testing.T) { func TestExecCommandWithEnv(t *testing.T) {
// Setup parser
parser := value.Parser{ parser := value.Parser{
MetricName: "metric", MetricName: "metric",
DataType: "string", DataType: "string",
} }
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := newExec()
e.Commands = []string{"/bin/sh -c 'echo ${METRIC_NAME}'"}
e.Environment = []string{"METRIC_NAME=metric_value"}
e.SetParser(&parser)
var acc testutil.Accumulator // Setup plugin
require.NoError(t, acc.GatherError(e.Gather)) plugin := &Exec{
Commands: []string{"/bin/sh -c 'echo ${METRIC_NAME}'"},
fields := map[string]interface{}{ Environment: []string{"METRIC_NAME=metric_value"},
"value": "metric_value", Timeout: config.Duration(5 * time.Second),
Log: testutil.Logger{},
} }
acc.AssertContainsFields(t, "metric", fields) plugin.SetParser(&parser)
require.NoError(t, plugin.Init())
// Gather the metrics and check the result
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))
expected := []telegraf.Metric{
metric.New(
"metric",
map[string]string{},
map[string]interface{}{
"value": "metric_value",
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
} }
func TestTruncate(t *testing.T) { func TestTruncate(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
bufF func() *bytes.Buffer bufF func() *bytes.Buffer
expF func() *bytes.Buffer expected string
}{ }{
{ {
name: "should not truncate", name: "should not truncate",
bufF: func() *bytes.Buffer { bufF: func() *bytes.Buffer {
var b bytes.Buffer return bytes.NewBufferString("hello world")
b.WriteString("hello world")
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world")
return &b
}, },
expected: "hello world",
}, },
{ {
name: "should truncate up to the new line", name: "should truncate up to the new line",
bufF: func() *bytes.Buffer { bufF: func() *bytes.Buffer {
var b bytes.Buffer return bytes.NewBufferString("hello world\nand all the people")
b.WriteString("hello world\nand all the people")
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world...")
return &b
}, },
expected: "hello world...",
}, },
{ {
name: "should truncate to the maxStderrBytes", name: "should truncate to the maxStderrBytes",
@ -291,44 +341,19 @@ func TestTruncate(t *testing.T) {
} }
return &b return &b
}, },
expF: func() *bytes.Buffer { expected: strings.Repeat("b", maxStderrBytes) + "...",
var b bytes.Buffer
for i := 0; i < maxStderrBytes; i++ {
b.WriteByte('b')
}
b.WriteString("...")
return &b
},
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
res := truncate(*tt.bufF()) buf := tt.bufF()
require.Equal(t, tt.expF().Bytes(), res.Bytes()) truncate(buf)
require.Equal(t, tt.expected, buf.String())
}) })
} }
} }
func TestRemoveCarriageReturns(t *testing.T) {
//nolint:staticcheck // Silence linter for now as we plan to reenable tests for Windows later
if runtime.GOOS == "windows" {
// Test that all carriage returns are removed
for _, test := range crTests {
b := bytes.NewBuffer(test.input)
out := removeWindowsCarriageReturns(*b)
require.True(t, bytes.Equal(test.output, out.Bytes()))
}
} else {
// Test that the buffer is returned unaltered
for _, test := range crTests {
b := bytes.NewBuffer(test.input)
out := removeWindowsCarriageReturns(*b)
require.True(t, bytes.Equal(test.input, out.Bytes()))
}
}
}
func TestCSVBehavior(t *testing.T) { func TestCSVBehavior(t *testing.T) {
// Setup the CSV parser // Setup the CSV parser
parser := &csv.Parser{ parser := &csv.Parser{
@ -339,9 +364,11 @@ func TestCSVBehavior(t *testing.T) {
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
// Setup the plugin // Setup the plugin
plugin := newExec() plugin := &Exec{
plugin.Commands = []string{"echo \"a,b\n1,2\n3,4\""} Commands: []string{"echo \"a,b\n1,2\n3,4\""},
plugin.Log = testutil.Logger{} Timeout: config.Duration(5 * time.Second),
Log: testutil.Logger{},
}
plugin.SetParser(parser) plugin.SetParser(parser)
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
@ -407,7 +434,10 @@ func TestCSVBehavior(t *testing.T) {
func TestCases(t *testing.T) { func TestCases(t *testing.T) {
// Register the plugin // Register the plugin
inputs.Add("exec", func() telegraf.Input { inputs.Add("exec", func() telegraf.Input {
return newExec() return &Exec{
Timeout: config.Duration(5 * time.Second),
Log: testutil.Logger{},
}
}) })
// Setup the plugin // Setup the plugin

View File

@ -8,43 +8,33 @@ import (
"os" "os"
"os/exec" "os/exec"
"syscall" "syscall"
"time"
"github.com/kballard/go-shellquote" "github.com/kballard/go-shellquote"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
) )
func (c commandRunner) run( func (c *commandRunner) run(command string) (out, errout []byte, err error) {
command string,
environments []string,
timeout time.Duration,
) (out, errout []byte, err error) {
splitCmd, err := shellquote.Split(command) splitCmd, err := shellquote.Split(command)
if err != nil || len(splitCmd) == 0 { if err != nil || len(splitCmd) == 0 {
return nil, nil, fmt.Errorf("exec: unable to parse command: %w", err) return nil, nil, fmt.Errorf("exec: unable to parse command %q: %w", command, err)
} }
cmd := exec.Command(splitCmd[0], splitCmd[1:]...) cmd := exec.Command(splitCmd[0], splitCmd[1:]...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if len(environments) > 0 { if len(c.environment) > 0 {
cmd.Env = append(os.Environ(), environments...) cmd.Env = append(os.Environ(), c.environment...)
} }
var ( var outbuf, stderr bytes.Buffer
outbuf bytes.Buffer
stderr bytes.Buffer
)
cmd.Stdout = &outbuf cmd.Stdout = &outbuf
cmd.Stderr = &stderr cmd.Stderr = &stderr
runErr := internal.RunTimeout(cmd, timeout) runErr := internal.RunTimeout(cmd, c.timeout)
outbuf = removeWindowsCarriageReturns(outbuf)
if stderr.Len() > 0 && !c.debug { if stderr.Len() > 0 && !c.debug {
stderr = removeWindowsCarriageReturns(stderr) truncate(&stderr)
stderr = truncate(stderr)
} }
return outbuf.Bytes(), stderr.Bytes(), runErr return outbuf.Bytes(), stderr.Bytes(), runErr

View File

@ -4,22 +4,19 @@ package exec
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"io"
"os" "os"
"os/exec" "os/exec"
"syscall" "syscall"
"time"
"github.com/kballard/go-shellquote" "github.com/kballard/go-shellquote"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
) )
func (c commandRunner) run( func (c *commandRunner) run(command string) (out, errout []byte, err error) {
command string,
environments []string,
timeout time.Duration,
) (out, errout []byte, err error) {
splitCmd, err := shellquote.Split(command) splitCmd, err := shellquote.Split(command)
if err != nil || len(splitCmd) == 0 { if err != nil || len(splitCmd) == 0 {
return nil, nil, fmt.Errorf("exec: unable to parse command: %w", err) return nil, nil, fmt.Errorf("exec: unable to parse command: %w", err)
@ -30,24 +27,35 @@ func (c commandRunner) run(
CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
} }
if len(environments) > 0 { if len(c.environment) > 0 {
cmd.Env = append(os.Environ(), environments...) cmd.Env = append(os.Environ(), c.environment...)
} }
var ( var outbuf, stderr bytes.Buffer
outbuf bytes.Buffer
stderr bytes.Buffer
)
cmd.Stdout = &outbuf cmd.Stdout = &outbuf
cmd.Stderr = &stderr cmd.Stderr = &stderr
runErr := internal.RunTimeout(cmd, timeout) runErr := internal.RunTimeout(cmd, c.timeout)
outbuf = removeWindowsCarriageReturns(outbuf) outbuf = removeWindowsCarriageReturns(outbuf)
stderr = removeWindowsCarriageReturns(stderr)
if stderr.Len() > 0 && !c.debug { if stderr.Len() > 0 && !c.debug {
stderr = removeWindowsCarriageReturns(stderr) truncate(&stderr)
stderr = truncate(stderr)
} }
return outbuf.Bytes(), stderr.Bytes(), runErr return outbuf.Bytes(), stderr.Bytes(), runErr
} }
func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer {
var buf bytes.Buffer
for {
byt, err := b.ReadBytes(0x0D)
byt = bytes.TrimRight(byt, "\x0d")
if len(byt) > 0 {
buf.Write(byt)
}
if errors.Is(err, io.EOF) {
return buf
}
}
}