feat(exec, execd): add an option to pass a custom environment to their child process (#11049)
This commit is contained in:
parent
4a15e0aba8
commit
fed88fcb44
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
|
@ -26,13 +27,14 @@ type Process struct {
|
|||
|
||||
name string
|
||||
args []string
|
||||
envs []string
|
||||
pid int32
|
||||
cancel context.CancelFunc
|
||||
mainLoopWg sync.WaitGroup
|
||||
}
|
||||
|
||||
// New creates a new process wrapper
|
||||
func New(command []string) (*Process, error) {
|
||||
func New(command []string, envs []string) (*Process, error) {
|
||||
if len(command) == 0 {
|
||||
return nil, errors.New("no command")
|
||||
}
|
||||
|
|
@ -41,6 +43,7 @@ func New(command []string) (*Process, error) {
|
|||
RestartDelay: 5 * time.Second,
|
||||
name: command[0],
|
||||
args: []string{},
|
||||
envs: envs,
|
||||
}
|
||||
|
||||
if len(command) > 1 {
|
||||
|
|
@ -85,6 +88,10 @@ func (p *Process) Stop() {
|
|||
func (p *Process) cmdStart() error {
|
||||
p.Cmd = exec.Command(p.name, p.args...)
|
||||
|
||||
if len(p.envs) > 0 {
|
||||
p.Cmd.Env = append(os.Environ(), p.envs...)
|
||||
}
|
||||
|
||||
var err error
|
||||
p.Stdin, err = p.Cmd.StdinPipe()
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ func TestRestartingRebindsPipes(t *testing.T) {
|
|||
exe, err := os.Executable()
|
||||
require.NoError(t, err)
|
||||
|
||||
p, err := New([]string{exe, "-external"})
|
||||
p, err := New([]string{exe, "-external"}, []string{"INTERNAL_PROCESS_MODE=application"})
|
||||
p.RestartDelay = 100 * time.Nanosecond
|
||||
p.Log = testutil.Logger{}
|
||||
require.NoError(t, err)
|
||||
|
|
@ -62,7 +62,8 @@ var external = flag.Bool("external", false,
|
|||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Parse()
|
||||
if *external {
|
||||
runMode := os.Getenv("INTERNAL_PROCESS_MODE")
|
||||
if *external && runMode == "application" {
|
||||
externalProcess()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,12 @@ This plugin can be used to poll for custom metrics from any source.
|
|||
"/tmp/collect_*.sh"
|
||||
]
|
||||
|
||||
## Environment variables
|
||||
## Array of "key=value" pairs to pass as environment variables
|
||||
## e.g. "KEY=value", "USERNAME=John Doe",
|
||||
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
|
||||
# environment = []
|
||||
|
||||
## Timeout for each command to complete.
|
||||
timeout = "5s"
|
||||
|
||||
|
|
@ -55,7 +61,7 @@ It can be paired with the following configuration and will be run at the `interv
|
|||
|
||||
### My script works when I run it by hand, but not when Telegraf is running as a service
|
||||
|
||||
This may be related to the Telegraf service running as a different user. The
|
||||
This may be related to the Telegraf service running as a different user. The
|
||||
official packages run Telegraf as the `telegraf` user and group on Linux
|
||||
systems.
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
osExec "os/exec"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
|
|
@ -24,9 +25,10 @@ import (
|
|||
const MaxStderrBytes int = 512
|
||||
|
||||
type Exec struct {
|
||||
Commands []string `toml:"commands"`
|
||||
Command string `toml:"command"`
|
||||
Timeout config.Duration `toml:"timeout"`
|
||||
Commands []string `toml:"commands"`
|
||||
Command string `toml:"command"`
|
||||
Environment []string `toml:"environment"`
|
||||
Timeout config.Duration `toml:"timeout"`
|
||||
|
||||
parser parsers.Parser
|
||||
|
||||
|
|
@ -42,13 +44,14 @@ func NewExec() *Exec {
|
|||
}
|
||||
|
||||
type Runner interface {
|
||||
Run(string, time.Duration) ([]byte, []byte, error)
|
||||
Run(string, []string, time.Duration) ([]byte, []byte, error)
|
||||
}
|
||||
|
||||
type CommandRunner struct{}
|
||||
|
||||
func (c CommandRunner) Run(
|
||||
command string,
|
||||
environments []string,
|
||||
timeout time.Duration,
|
||||
) ([]byte, []byte, error) {
|
||||
splitCmd, err := shellquote.Split(command)
|
||||
|
|
@ -58,6 +61,10 @@ func (c CommandRunner) Run(
|
|||
|
||||
cmd := osExec.Command(splitCmd[0], splitCmd[1:]...)
|
||||
|
||||
if len(environments) > 0 {
|
||||
cmd.Env = append(os.Environ(), environments...)
|
||||
}
|
||||
|
||||
var (
|
||||
out bytes.Buffer
|
||||
stderr bytes.Buffer
|
||||
|
|
@ -120,7 +127,7 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync
|
|||
defer wg.Done()
|
||||
_, isNagios := e.parser.(*nagios.NagiosParser)
|
||||
|
||||
out, errbuf, runErr := e.runner.Run(command, time.Duration(e.Timeout))
|
||||
out, errbuf, runErr := e.runner.Run(command, e.Environment, time.Duration(e.Timeout))
|
||||
if !isNagios && runErr != nil {
|
||||
err := fmt.Errorf("exec: %s for command '%s': %s", runErr, command, string(errbuf))
|
||||
acc.AddError(err)
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ func newRunnerMock(out []byte, errout []byte, err error) Runner {
|
|||
}
|
||||
}
|
||||
|
||||
func (r runnerMock) Run(_ string, _ time.Duration) ([]byte, []byte, error) {
|
||||
func (r runnerMock) Run(_ string, _ []string, _ time.Duration) ([]byte, []byte, error) {
|
||||
return r.out, r.errout, r.err
|
||||
}
|
||||
|
||||
|
|
@ -191,6 +191,23 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) {
|
|||
acc.AssertContainsFields(t, "metric", fields)
|
||||
}
|
||||
|
||||
func TestExecCommandWithEnv(t *testing.T) {
|
||||
parser, _ := parsers.NewValueParser("metric", "string", "", nil)
|
||||
e := NewExec()
|
||||
e.Commands = []string{"/bin/sh -c 'echo ${METRIC_NAME}'"}
|
||||
e.Environment = []string{"METRIC_NAME=metric_value"}
|
||||
e.SetParser(parser)
|
||||
|
||||
var acc testutil.Accumulator
|
||||
err := acc.GatherError(e.Gather)
|
||||
require.NoError(t, err)
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"value": "metric_value",
|
||||
}
|
||||
acc.AssertContainsFields(t, "metric", fields)
|
||||
}
|
||||
|
||||
func TestTruncate(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
|
|||
|
|
@ -22,6 +22,12 @@ STDERR from the process will be relayed to Telegraf as errors in the logs.
|
|||
## NOTE: process and each argument should each be their own string
|
||||
command = ["telegraf-smartctl", "-d", "/dev/sda"]
|
||||
|
||||
## Environment variables
|
||||
## Array of "key=value" pairs to pass as environment variables
|
||||
## e.g. "KEY=value", "USERNAME=John Doe",
|
||||
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
|
||||
# environment = []
|
||||
|
||||
## Define how the process is signaled on each collection interval.
|
||||
## Valid values are:
|
||||
## "none" : Do not signal anything. (Recommended for service inputs)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
|
||||
type Execd struct {
|
||||
Command []string `toml:"command"`
|
||||
Environment []string `toml:"environment"`
|
||||
Signal string `toml:"signal"`
|
||||
RestartDelay config.Duration `toml:"restart_delay"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
|
@ -35,7 +36,7 @@ func (e *Execd) SetParser(parser parsers.Parser) {
|
|||
func (e *Execd) Start(acc telegraf.Accumulator) error {
|
||||
e.acc = acc
|
||||
var err error
|
||||
e.process, err = process.New(e.Command)
|
||||
e.process, err = process.New(e.Command, e.Environment)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating new process: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ func TestSettingConfigWorks(t *testing.T) {
|
|||
cfg := `
|
||||
[[inputs.execd]]
|
||||
command = ["a", "b", "c"]
|
||||
environment = ["d=e", "f=1"]
|
||||
restart_delay = "1m"
|
||||
signal = "SIGHUP"
|
||||
`
|
||||
|
|
@ -35,6 +36,7 @@ func TestSettingConfigWorks(t *testing.T) {
|
|||
inp, ok := conf.Inputs[0].Input.(*Execd)
|
||||
require.True(t, ok)
|
||||
require.EqualValues(t, []string{"a", "b", "c"}, inp.Command)
|
||||
require.EqualValues(t, []string{"d=e", "f=1"}, inp.Environment)
|
||||
require.EqualValues(t, 1*time.Minute, inp.RestartDelay)
|
||||
require.EqualValues(t, "SIGHUP", inp.Signal)
|
||||
}
|
||||
|
|
@ -48,6 +50,7 @@ func TestExternalInputWorks(t *testing.T) {
|
|||
|
||||
e := &Execd{
|
||||
Command: []string{exe, "-counter"},
|
||||
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application", "METRIC_NAME=counter"},
|
||||
RestartDelay: config.Duration(5 * time.Second),
|
||||
parser: influxParser,
|
||||
Signal: "STDIN",
|
||||
|
|
@ -152,7 +155,8 @@ var counter = flag.Bool("counter", false,
|
|||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Parse()
|
||||
if *counter {
|
||||
runMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE")
|
||||
if *counter && runMode == "application" {
|
||||
if err := runCounterProgram(); err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
|
@ -163,6 +167,7 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
|
||||
func runCounterProgram() error {
|
||||
envMetricName := os.Getenv("METRIC_NAME")
|
||||
i := 0
|
||||
serializer, err := serializers.NewInfluxSerializer()
|
||||
if err != nil {
|
||||
|
|
@ -173,7 +178,7 @@ func runCounterProgram() error {
|
|||
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
for scanner.Scan() {
|
||||
m := metric.New("counter",
|
||||
m := metric.New(envMetricName,
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"count": i,
|
||||
|
|
|
|||
|
|
@ -20,6 +20,12 @@ For better performance, consider execd, which runs continuously.
|
|||
## Command to ingest metrics via stdin.
|
||||
command = ["tee", "-a", "/dev/null"]
|
||||
|
||||
## Environment variables
|
||||
## Array of "key=value" pairs to pass as environment variables
|
||||
## e.g. "KEY=value", "USERNAME=John Doe",
|
||||
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
|
||||
# environment = []
|
||||
|
||||
## Timeout for command to complete.
|
||||
# timeout = "5s"
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"time"
|
||||
|
|
@ -19,9 +20,10 @@ const maxStderrBytes = 512
|
|||
|
||||
// Exec defines the exec output plugin.
|
||||
type Exec struct {
|
||||
Command []string `toml:"command"`
|
||||
Timeout config.Duration `toml:"timeout"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
Command []string `toml:"command"`
|
||||
Environment []string `toml:"environment"`
|
||||
Timeout config.Duration `toml:"timeout"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
runner Runner
|
||||
serializer serializers.Serializer
|
||||
|
|
@ -61,12 +63,12 @@ func (e *Exec) Write(metrics []telegraf.Metric) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
return e.runner.Run(time.Duration(e.Timeout), e.Command, &buffer)
|
||||
return e.runner.Run(time.Duration(e.Timeout), e.Command, e.Environment, &buffer)
|
||||
}
|
||||
|
||||
// Runner provides an interface for running exec.Cmd.
|
||||
type Runner interface {
|
||||
Run(time.Duration, []string, io.Reader) error
|
||||
Run(time.Duration, []string, []string, io.Reader) error
|
||||
}
|
||||
|
||||
// CommandRunner runs a command with the ability to kill the process before the timeout.
|
||||
|
|
@ -76,8 +78,11 @@ type CommandRunner struct {
|
|||
}
|
||||
|
||||
// Run runs the command.
|
||||
func (c *CommandRunner) Run(timeout time.Duration, command []string, buffer io.Reader) error {
|
||||
func (c *CommandRunner) Run(timeout time.Duration, command []string, environments []string, buffer io.Reader) error {
|
||||
cmd := exec.Command(command[0], command[1:]...)
|
||||
if len(environments) > 0 {
|
||||
cmd.Env = append(os.Environ(), environments...)
|
||||
}
|
||||
cmd.Stdin = buffer
|
||||
var stderr bytes.Buffer
|
||||
cmd.Stderr = &stderr
|
||||
|
|
|
|||
|
|
@ -13,6 +13,12 @@ Telegraf minimum version: Telegraf 1.15.0
|
|||
## NOTE: process and each argument should each be their own string
|
||||
command = ["my-telegraf-output", "--some-flag", "value"]
|
||||
|
||||
## Environment variables
|
||||
## Array of "key=value" pairs to pass as environment variables
|
||||
## e.g. "KEY=value", "USERNAME=John Doe",
|
||||
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
|
||||
# environment = []
|
||||
|
||||
## Delay before the process is restarted after an unexpected termination
|
||||
restart_delay = "10s"
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import (
|
|||
|
||||
type Execd struct {
|
||||
Command []string `toml:"command"`
|
||||
Environment []string `toml:"environment"`
|
||||
RestartDelay config.Duration `toml:"restart_delay"`
|
||||
Log telegraf.Logger
|
||||
|
||||
|
|
@ -34,7 +35,7 @@ func (e *Execd) Init() error {
|
|||
|
||||
var err error
|
||||
|
||||
e.process, err = process.New(e.Command)
|
||||
e.process, err = process.New(e.Command, e.Environment)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating process %s: %w", e.Command, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ func TestExternalOutputWorks(t *testing.T) {
|
|||
|
||||
e := &Execd{
|
||||
Command: []string{exe, "-testoutput"},
|
||||
Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"},
|
||||
RestartDelay: config.Duration(5 * time.Second),
|
||||
serializer: influxSerializer,
|
||||
Log: testutil.Logger{},
|
||||
|
|
@ -74,7 +75,8 @@ var testoutput = flag.Bool("testoutput", false,
|
|||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Parse()
|
||||
if *testoutput {
|
||||
runMode := os.Getenv("PLUGINS_OUTPUTS_EXECD_MODE")
|
||||
if *testoutput && runMode == "application" {
|
||||
runOutputConsumerProgram()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
|
@ -83,6 +85,7 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
|
||||
func runOutputConsumerProgram() {
|
||||
metricName := os.Getenv("METRIC_NAME")
|
||||
parser := influx.NewStreamParser(os.Stdin)
|
||||
|
||||
for {
|
||||
|
|
@ -103,7 +106,7 @@ func runOutputConsumerProgram() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
expected := testutil.MustMetric("cpu",
|
||||
expected := testutil.MustMetric(metricName,
|
||||
map[string]string{"name": "cpu1"},
|
||||
map[string]interface{}{"idle": 50, "sys": 30},
|
||||
now,
|
||||
|
|
|
|||
|
|
@ -30,6 +30,12 @@ Telegraf minimum version: Telegraf 1.15.0
|
|||
## eg: command = ["/path/to/your_program", "arg1", "arg2"]
|
||||
command = ["cat"]
|
||||
|
||||
## Environment variables
|
||||
## Array of "key=value" pairs to pass as environment variables
|
||||
## e.g. "KEY=value", "USERNAME=John Doe",
|
||||
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
|
||||
# environment = []
|
||||
|
||||
## Delay before the process is restarted after an unexpected termination
|
||||
# restart_delay = "10s"
|
||||
```
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
|
||||
type Execd struct {
|
||||
Command []string `toml:"command"`
|
||||
Environment []string `toml:"environment"`
|
||||
RestartDelay config.Duration `toml:"restart_delay"`
|
||||
Log telegraf.Logger
|
||||
|
||||
|
|
@ -54,7 +55,7 @@ func (e *Execd) Start(acc telegraf.Accumulator) error {
|
|||
}
|
||||
e.acc = acc
|
||||
|
||||
e.process, err = process.New(e.Command)
|
||||
e.process, err = process.New(e.Command, e.Environment)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating new process: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ func TestExternalProcessorWorks(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
t.Log(exe)
|
||||
e.Command = []string{exe, "-countmultiplier"}
|
||||
e.Environment = []string{"PLUGINS_PROCESSORS_EXECD_MODE=application", "FIELD_NAME=count"}
|
||||
e.RestartDelay = config.Duration(5 * time.Second)
|
||||
|
||||
acc := &testutil.Accumulator{}
|
||||
|
|
@ -84,6 +85,7 @@ func TestParseLinesWithNewLines(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
t.Log(exe)
|
||||
e.Command = []string{exe, "-countmultiplier"}
|
||||
e.Environment = []string{"PLUGINS_PROCESSORS_EXECD_MODE=application", "FIELD_NAME=count"}
|
||||
e.RestartDelay = config.Duration(5 * time.Second)
|
||||
|
||||
acc := &testutil.Accumulator{}
|
||||
|
|
@ -129,7 +131,8 @@ var countmultiplier = flag.Bool("countmultiplier", false,
|
|||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Parse()
|
||||
if *countmultiplier {
|
||||
runMode := os.Getenv("PLUGINS_PROCESSORS_EXECD_MODE")
|
||||
if *countmultiplier && runMode == "application" {
|
||||
runCountMultiplierProgram()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
|
@ -138,6 +141,7 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
|
||||
func runCountMultiplierProgram() {
|
||||
fieldName := os.Getenv("FIELD_NAME")
|
||||
parser := influx.NewStreamParser(os.Stdin)
|
||||
serializer, _ := serializers.NewInfluxSerializer()
|
||||
|
||||
|
|
@ -159,23 +163,23 @@ func runCountMultiplierProgram() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
c, found := m.GetField("count")
|
||||
c, found := m.GetField(fieldName)
|
||||
if !found {
|
||||
//nolint:errcheck,revive // Test will fail anyway
|
||||
fmt.Fprintf(os.Stderr, "metric has no count field\n")
|
||||
fmt.Fprintf(os.Stderr, "metric has no %s field\n", fieldName)
|
||||
//nolint:revive // os.Exit called intentionally
|
||||
os.Exit(1)
|
||||
}
|
||||
switch t := c.(type) {
|
||||
case float64:
|
||||
t *= 2
|
||||
m.AddField("count", t)
|
||||
m.AddField(fieldName, t)
|
||||
case int64:
|
||||
t *= 2
|
||||
m.AddField("count", t)
|
||||
m.AddField(fieldName, t)
|
||||
default:
|
||||
//nolint:errcheck,revive // Test will fail anyway
|
||||
fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c)
|
||||
fmt.Fprintf(os.Stderr, "%s is not an unknown type, it's a %T\n", fieldName, c)
|
||||
//nolint:revive // os.Exit called intentionally
|
||||
os.Exit(1)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue