execd output (#7761)

This commit is contained in:
Steven Soroka 2020-07-02 11:59:29 -04:00 committed by GitHub
parent 1b1382cabf
commit 0efcca3c33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 539 additions and 75 deletions

View File

@ -188,7 +188,7 @@ For documentation on the latest development code see the [documentation index][d
* [ethtool](./plugins/inputs/ethtool)
* [eventhub_consumer](./plugins/inputs/eventhub_consumer) (Azure Event Hubs \& Azure IoT Hub)
* [exec](./plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios)
* [execd](./plugins/inputs/execd)
* [execd](./plugins/inputs/execd) (generic executable "daemon" processes)
* [fail2ban](./plugins/inputs/fail2ban)
* [fibaro](./plugins/inputs/fibaro)
* [file](./plugins/inputs/file)
@ -368,6 +368,7 @@ For documentation on the latest development code see the [documentation index][d
* [dedup](/plugins/processors/dedup)
* [defaults](/plugins/processors/defaults)
* [enum](/plugins/processors/enum)
* [execd](/plugins/processors/execd)
* [filepath](/plugins/processors/filepath)
* [override](/plugins/processors/override)
* [parser](/plugins/processors/parser)
@ -408,6 +409,7 @@ For documentation on the latest development code see the [documentation index][d
* [discard](./plugins/outputs/discard)
* [elasticsearch](./plugins/outputs/elasticsearch)
* [exec](./plugins/outputs/exec)
* [execd](./plugins/outputs/execd)
* [file](./plugins/outputs/file)
* [graphite](./plugins/outputs/graphite)
* [graylog](./plugins/outputs/graylog)

View File

@ -8,6 +8,7 @@ import (
"io/ioutil"
"os/exec"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/telegraf"
@ -24,6 +25,9 @@ type Process struct {
RestartDelay time.Duration
Log telegraf.Logger
name string
args []string
pid int32
cancel context.CancelFunc
mainLoopWg sync.WaitGroup
}
@ -36,32 +40,19 @@ func New(command []string) (*Process, error) {
p := &Process{
RestartDelay: 5 * time.Second,
name: command[0],
args: []string{},
}
if len(command) > 1 {
p.Cmd = exec.Command(command[0], command[1:]...)
} else {
p.Cmd = exec.Command(command[0])
}
var err error
p.Stdin, err = p.Cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("error opening stdin pipe: %w", err)
}
p.Stdout, err = p.Cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("error opening stdout pipe: %w", err)
}
p.Stderr, err = p.Cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("error opening stderr pipe: %w", err)
p.args = command[1:]
}
return p, nil
}
// Start the process
// Start the process. A &Process can only be started once. It will restart itself
// as necessary.
func (p *Process) Start() error {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
@ -81,35 +72,54 @@ func (p *Process) Start() error {
return nil
}
// Stop is called when the process isn't needed anymore
func (p *Process) Stop() {
if p.cancel != nil {
// signal our intent to shutdown and not restart the process
p.cancel()
}
// close stdin so the app can shut down gracefully.
p.Stdin.Close()
p.mainLoopWg.Wait()
}
func (p *Process) cmdStart() error {
p.Log.Infof("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args)
p.Cmd = exec.Command(p.name, p.args...)
var err error
p.Stdin, err = p.Cmd.StdinPipe()
if err != nil {
return fmt.Errorf("error opening stdin pipe: %w", err)
}
p.Stdout, err = p.Cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("error opening stdout pipe: %w", err)
}
p.Stderr, err = p.Cmd.StderrPipe()
if err != nil {
return fmt.Errorf("error opening stderr pipe: %w", err)
}
p.Log.Infof("Starting process: %s %s", p.name, p.args)
if err := p.Cmd.Start(); err != nil {
return fmt.Errorf("error starting process: %s", err)
}
atomic.StoreInt32(&p.pid, int32(p.Cmd.Process.Pid))
return nil
}
func (p *Process) Pid() int {
pid := atomic.LoadInt32(&p.pid)
return int(pid)
}
// cmdLoop watches an already running process, restarting it when appropriate.
func (p *Process) cmdLoop(ctx context.Context) error {
go func() {
<-ctx.Done()
if p.Stdin != nil {
p.Stdin.Close()
gracefulStop(p.Cmd, 5*time.Second)
}
}()
for {
err := p.cmdWait()
err := p.cmdWait(ctx)
if isQuitting(ctx) {
p.Log.Infof("Process %s shut down", p.Cmd.Path)
return nil
@ -130,7 +140,8 @@ func (p *Process) cmdLoop(ctx context.Context) error {
}
}
func (p *Process) cmdWait() error {
// cmdWait waits for the process to finish.
func (p *Process) cmdWait(ctx context.Context) error {
var wg sync.WaitGroup
if p.ReadStdoutFn == nil {
@ -140,6 +151,9 @@ func (p *Process) cmdWait() error {
p.ReadStderrFn = defaultReadPipe
}
processCtx, processCancel := context.WithCancel(context.Background())
defer processCancel()
wg.Add(1)
go func() {
p.ReadStdoutFn(p.Stdout)
@ -152,8 +166,20 @@ func (p *Process) cmdWait() error {
wg.Done()
}()
wg.Add(1)
go func() {
select {
case <-ctx.Done():
gracefulStop(processCtx, p.Cmd, 5*time.Second)
case <-processCtx.Done():
}
wg.Done()
}()
err := p.Cmd.Wait()
processCancel()
wg.Wait()
return p.Cmd.Wait()
return err
}
func isQuitting(ctx context.Context) bool {

View File

@ -3,26 +3,21 @@
package process
import (
"context"
"os/exec"
"syscall"
"time"
)
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
})
}
})
func gracefulStop(ctx context.Context, cmd *exec.Cmd, timeout time.Duration) {
select {
case <-time.After(timeout):
cmd.Process.Signal(syscall.SIGTERM)
case <-ctx.Done():
}
select {
case <-time.After(timeout):
cmd.Process.Kill()
case <-ctx.Done():
}
}

View File

@ -0,0 +1,74 @@
// +build !windows
package process
import (
"bufio"
"flag"
"fmt"
"io"
"os"
"sync/atomic"
"syscall"
"testing"
"time"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
// test that a restarting process resets pipes properly
func TestRestartingRebindsPipes(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)
p, err := New([]string{exe, "-external"})
p.RestartDelay = 100 * time.Nanosecond
p.Log = testutil.Logger{}
require.NoError(t, err)
linesRead := int64(0)
p.ReadStdoutFn = func(r io.Reader) {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
atomic.AddInt64(&linesRead, 1)
}
}
require.NoError(t, p.Start())
for atomic.LoadInt64(&linesRead) < 1 {
time.Sleep(1 * time.Millisecond)
}
syscall.Kill(p.Pid(), syscall.SIGKILL)
for atomic.LoadInt64(&linesRead) < 2 {
time.Sleep(1 * time.Millisecond)
}
p.Stop()
}
var external = flag.Bool("external", false,
"if true, run externalProcess instead of tests")
func TestMain(m *testing.M) {
flag.Parse()
if *external {
externalProcess()
os.Exit(0)
}
code := m.Run()
os.Exit(code)
}
// externalProcess is an external "misbehaving" process that won't exit
// cleanly.
func externalProcess() {
wait := make(chan int, 0)
fmt.Fprintln(os.Stdout, "started")
<-wait
os.Exit(2)
}

View File

@ -3,17 +3,15 @@
package process
import (
"context"
"os/exec"
"time"
)
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
})
func gracefulStop(ctx context.Context, cmd *exec.Cmd, timeout time.Duration) {
select {
case <-time.After(timeout):
cmd.Process.Kill()
case <-ctx.Done():
}
}

View File

@ -3,18 +3,23 @@
package execd
import (
"bufio"
"flag"
"fmt"
"os"
"strings"
"testing"
"time"
"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf"
)
@ -23,13 +28,16 @@ func TestExternalInputWorks(t *testing.T) {
influxParser, err := parsers.NewInfluxParser()
require.NoError(t, err)
exe, err := os.Executable()
require.NoError(t, err)
e := &Execd{
Command: []string{shell(), fileShellScriptPath()},
Command: []string{exe, "-counter"},
RestartDelay: config.Duration(5 * time.Second),
parser: influxParser,
Signal: "STDIN",
Log: testutil.Logger{},
}
e.Log = testutil.Logger{}
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
@ -43,12 +51,10 @@ func TestExternalInputWorks(t *testing.T) {
e.Stop()
require.Equal(t, "counter_bash", m.Name())
require.Equal(t, "counter", m.Name())
val, ok := m.GetField("count")
require.True(t, ok)
require.Equal(t, float64(0), val)
// test that a later gather will not panic
e.Gather(acc)
require.EqualValues(t, 0, val)
}
func TestParsesLinesContainingNewline(t *testing.T) {
@ -60,13 +66,12 @@ func TestParsesLinesContainingNewline(t *testing.T) {
acc := agent.NewAccumulator(&TestMetricMaker{}, metrics)
e := &Execd{
Command: []string{shell(), fileShellScriptPath()},
RestartDelay: config.Duration(5 * time.Second),
parser: parser,
Signal: "STDIN",
acc: acc,
Log: testutil.Logger{},
}
e.Log = testutil.Logger{}
cases := []struct {
Name string
@ -109,14 +114,6 @@ func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout tim
return nil
}
func fileShellScriptPath() string {
return "./examples/count.sh"
}
func shell() string {
return "sh"
}
type TestMetricMaker struct{}
func (tm *TestMetricMaker) Name() string {
@ -134,3 +131,45 @@ func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
func (tm *TestMetricMaker) Log() telegraf.Logger {
return models.NewLogger("TestPlugin", "test", "")
}
var counter = flag.Bool("counter", false,
"if true, act like line input program instead of test")
func TestMain(m *testing.M) {
flag.Parse()
if *counter {
runCounterProgram()
os.Exit(0)
}
code := m.Run()
os.Exit(code)
}
func runCounterProgram() {
i := 0
serializer, err := serializers.NewInfluxSerializer()
if err != nil {
fmt.Fprintln(os.Stderr, "ERR InfluxSerializer failed to load")
os.Exit(1)
}
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
metric, _ := metric.New("counter",
map[string]string{},
map[string]interface{}{
"count": i,
},
time.Now(),
)
i++
b, err := serializer.Serialize(metric)
if err != nil {
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
os.Exit(1)
}
fmt.Fprint(os.Stdout, string(b))
}
}

View File

@ -12,6 +12,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/discard"
_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/outputs/exec"
_ "github.com/influxdata/telegraf/plugins/outputs/execd"
_ "github.com/influxdata/telegraf/plugins/outputs/file"
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
_ "github.com/influxdata/telegraf/plugins/outputs/graylog"

View File

@ -0,0 +1,26 @@
# Execd Output Plugin
The `execd` plugin runs an external program as a daemon.
### Configuration:
```toml
[[outputs.execd]]
## Program to run as daemon
command = ["my-telegraf-output", "--some-flag", "value"]
## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"
## Data format to export.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
```
### Example
see [examples][]
[examples]: https://github.com/influxdata/telegraf/blob/master/plugins/outputs/execd/examples/

View File

@ -0,0 +1,5 @@
# Usage: sh file.sh output_filename.ext
# reads from stdin and writes out to a file named on the command line.
while read line; do
echo "$line" >> $1
done < /dev/stdin

View File

@ -0,0 +1,9 @@
[agent]
interval = "1s"
[[inputs.execd]]
command = ["ruby", "plugins/inputs/execd/examples/count.rb"]
[[outputs.execd]]
command = ["sh", "plugins/outputs/execd/examples/file/file.sh"]
data_format = "json"

View File

@ -0,0 +1,19 @@
#!/usr/bin/env ruby
#
# An example of funneling metrics to Redis pub/sub.
#
# to run this, you may need to:
# gem install redis
#
require 'redis'
r = Redis.new(host: "127.0.0.1", port: 6379, db: 1)
loop do
# example input: "counter_ruby count=0 1591741648101185000"
line = STDIN.readline.chomp
key = line.split(" ")[0]
key = key.split(",")[0]
r.publish(key, line)
end

View File

@ -0,0 +1,21 @@
#!/usr/bin/env ruby
#
# An example of funneling metrics to Redis pub/sub.
#
# to run this, you may need to:
# gem install redis
#
require 'redis'
require 'json'
r = Redis.new(host: "127.0.0.1", port: 6379, db: 1)
loop do
# example input: "{"fields":{"count":0},"name":"counter_ruby","tags":{"host":"localhost"},"timestamp":1586374982}"
line = STDIN.readline.chomp
l = JSON.parse(line)
key = l["name"]
r.publish(key, line)
end

View File

@ -0,0 +1,15 @@
[agent]
flush_interval = "1s"
interval = "1s"
[[inputs.execd]]
command = ["ruby", "plugins/inputs/execd/examples/count.rb"]
signal = "none"
[[outputs.execd]]
command = ["ruby", "plugins/outputs/execd/examples/redis/redis_influx.rb"]
data_format = "influx"
# [[outputs.file]]
# files = ["stdout"]
# data_format = "influx"

View File

@ -0,0 +1,121 @@
package execd
import (
"bufio"
"fmt"
"io"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/process"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
const sampleConfig = `
## Program to run as daemon
command = ["my-telegraf-output", "--some-flag", "value"]
## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"
## Data format to export.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`
type Execd struct {
Command []string `toml:"command"`
RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger
process *process.Process
serializer serializers.Serializer
}
func (e *Execd) SampleConfig() string {
return sampleConfig
}
func (e *Execd) Description() string {
return "Run executable as long-running output plugin"
}
func (e *Execd) SetSerializer(s serializers.Serializer) {
e.serializer = s
}
func (e *Execd) Init() error {
if len(e.Command) == 0 {
return fmt.Errorf("no command specified")
}
var err error
e.process, err = process.New(e.Command)
if err != nil {
return fmt.Errorf("error creating process %s: %w", e.Command, err)
}
e.process.Log = e.Log
e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.ReadStdoutFn = e.cmdReadOut
e.process.ReadStderrFn = e.cmdReadErr
return nil
}
func (e *Execd) Connect() error {
if err := e.process.Start(); err != nil {
return fmt.Errorf("failed to start process %s: %w", e.Command, err)
}
return nil
}
func (e *Execd) Close() error {
e.process.Stop()
return nil
}
func (e *Execd) Write(metrics []telegraf.Metric) error {
for _, m := range metrics {
b, err := e.serializer.Serialize(m)
if err != nil {
return fmt.Errorf("error serializing metrics: %s", err)
}
if _, err = e.process.Stdin.Write(b); err != nil {
return fmt.Errorf("error writing metrics %s", err)
}
}
return nil
}
func (e *Execd) cmdReadErr(out io.Reader) {
scanner := bufio.NewScanner(out)
for scanner.Scan() {
e.Log.Errorf("stderr: %s", scanner.Text())
}
if err := scanner.Err(); err != nil {
e.Log.Errorf("Error reading stderr: %s", err)
}
}
func (e *Execd) cmdReadOut(out io.Reader) {
scanner := bufio.NewScanner(out)
for scanner.Scan() {
e.Log.Info(scanner.Text())
}
}
func init() {
outputs.Add("execd", func() telegraf.Output {
return &Execd{}
})
}

View File

@ -0,0 +1,113 @@
package execd
import (
"bufio"
"flag"
"fmt"
"io"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var now = time.Date(2020, 6, 30, 16, 16, 0, 0, time.UTC)
func TestExternalOutputWorks(t *testing.T) {
influxSerializer, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
exe, err := os.Executable()
require.NoError(t, err)
e := &Execd{
Command: []string{exe, "-testoutput"},
RestartDelay: config.Duration(5 * time.Second),
serializer: influxSerializer,
Log: testutil.Logger{},
}
require.NoError(t, e.Init())
wg := &sync.WaitGroup{}
wg.Add(1)
e.process.ReadStderrFn = func(rstderr io.Reader) {
scanner := bufio.NewScanner(rstderr)
for scanner.Scan() {
t.Errorf("stderr: %q", scanner.Text())
}
if err := scanner.Err(); err != nil {
if !strings.HasSuffix(err.Error(), "already closed") {
t.Errorf("error reading stderr: %v", err)
}
}
wg.Done()
}
m, err := metric.New(
"cpu",
map[string]string{"name": "cpu1"},
map[string]interface{}{"idle": 50, "sys": 30},
now,
)
require.NoError(t, err)
require.NoError(t, e.Connect())
require.NoError(t, e.Write([]telegraf.Metric{m}))
require.NoError(t, e.Close())
wg.Wait()
}
var testoutput = flag.Bool("testoutput", false,
"if true, act like line input program instead of test")
func TestMain(m *testing.M) {
flag.Parse()
if *testoutput {
runOutputConsumerProgram()
os.Exit(0)
}
code := m.Run()
os.Exit(code)
}
func runOutputConsumerProgram() {
parser := influx.NewStreamParser(os.Stdin)
for {
metric, err := parser.Next()
if err != nil {
if err == influx.EOF {
return // stream ended
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
os.Exit(1)
}
expected := testutil.MustMetric("cpu",
map[string]string{"name": "cpu1"},
map[string]interface{}{"idle": 50, "sys": 30},
now,
)
if !testutil.MetricEqual(expected, metric) {
fmt.Fprintf(os.Stderr, "metric doesn't match expected\n")
os.Exit(1)
}
}
}