execd processor
This commit is contained in:
parent
a3aaa2f7bb
commit
36e584e92f
|
|
@ -0,0 +1,159 @@
|
|||
package process
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Process is a long-running process manager that will restart processes if they stop.
|
||||
type Process struct {
|
||||
Cmd *exec.Cmd
|
||||
Stdin io.WriteCloser
|
||||
Stdout io.ReadCloser
|
||||
Stderr io.ReadCloser
|
||||
ReadStdoutFn func(io.Reader)
|
||||
ReadStderrFn func(io.Reader)
|
||||
RestartDelay time.Duration
|
||||
|
||||
cancel context.CancelFunc
|
||||
mainLoopWg sync.WaitGroup
|
||||
}
|
||||
|
||||
// New creates a new process wrapper
|
||||
func New(command []string) (*Process, error) {
|
||||
p := &Process{
|
||||
RestartDelay: 5 * time.Second,
|
||||
}
|
||||
if len(command) > 1 {
|
||||
p.Cmd = exec.Command(command[0], command[1:]...)
|
||||
} else {
|
||||
p.Cmd = exec.Command(command[0])
|
||||
}
|
||||
var err error
|
||||
p.Stdin, err = p.Cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening stdin pipe: %w", err)
|
||||
}
|
||||
|
||||
p.Stdout, err = p.Cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening stdout pipe: %w", err)
|
||||
}
|
||||
|
||||
p.Stderr, err = p.Cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Start the process
|
||||
func (p *Process) Start() error {
|
||||
p.mainLoopWg.Add(1)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
p.cancel = cancel
|
||||
|
||||
if err := p.cmdStart(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := p.cmdLoop(ctx); err != nil {
|
||||
log.Printf("E! [agent] Process quit with message: %v", err)
|
||||
}
|
||||
p.mainLoopWg.Done()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Process) Stop() {
|
||||
if p.cancel != nil {
|
||||
p.cancel()
|
||||
}
|
||||
p.mainLoopWg.Wait()
|
||||
}
|
||||
|
||||
func (p *Process) cmdStart() error {
|
||||
log.Printf("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args)
|
||||
|
||||
if err := p.Cmd.Start(); err != nil {
|
||||
return fmt.Errorf("Error starting process: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cmdLoop watches an already running process, restarting it when appropriate.
|
||||
func (p *Process) cmdLoop(ctx context.Context) error {
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
if p.Stdin != nil {
|
||||
p.Stdin.Close()
|
||||
gracefulStop(p.Cmd, 5*time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
err := p.cmdWait()
|
||||
if isQuitting(ctx) {
|
||||
log.Printf("Process %s shut down", p.Cmd.Path)
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Printf("Process %s terminated: %v", p.Cmd.Path, err)
|
||||
log.Printf("Restarting in %s...", time.Duration(p.RestartDelay))
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-time.After(time.Duration(p.RestartDelay)):
|
||||
// Continue the loop and restart the process
|
||||
if err := p.cmdStart(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Process) cmdWait() error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
if p.ReadStdoutFn == nil {
|
||||
p.ReadStdoutFn = defaultReadPipe
|
||||
}
|
||||
if p.ReadStderrFn == nil {
|
||||
p.ReadStderrFn = defaultReadPipe
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
p.ReadStdoutFn(p.Stdout)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
p.ReadStderrFn(p.Stderr)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
return p.Cmd.Wait()
|
||||
}
|
||||
|
||||
func isQuitting(ctx context.Context) bool {
|
||||
return ctx.Err() != nil
|
||||
}
|
||||
|
||||
func defaultReadPipe(r io.Reader) {
|
||||
io.Copy(ioutil.Discard, r)
|
||||
}
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
// +build !windows
|
||||
|
||||
package process
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
|
||||
time.AfterFunc(timeout, func() {
|
||||
if cmd == nil || cmd.ProcessState == nil {
|
||||
return
|
||||
}
|
||||
if !cmd.ProcessState.Exited() {
|
||||
cmd.Process.Signal(syscall.SIGTERM)
|
||||
time.AfterFunc(timeout, func() {
|
||||
if cmd == nil || cmd.ProcessState == nil {
|
||||
return
|
||||
}
|
||||
if !cmd.ProcessState.Exited() {
|
||||
cmd.Process.Kill()
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
// +build windows
|
||||
|
||||
package process
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"time"
|
||||
)
|
||||
|
||||
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
|
||||
time.AfterFunc(timeout, func() {
|
||||
if cmd == nil || cmd.ProcessState == nil {
|
||||
return
|
||||
}
|
||||
if !cmd.ProcessState.Exited() {
|
||||
cmd.Process.Kill()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -2,16 +2,14 @@ package execd
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal/process"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
|
|
@ -46,14 +44,9 @@ type Execd struct {
|
|||
Signal string
|
||||
RestartDelay config.Duration
|
||||
|
||||
acc telegraf.Accumulator
|
||||
cmd *exec.Cmd
|
||||
parser parsers.Parser
|
||||
stdin io.WriteCloser
|
||||
stdout io.ReadCloser
|
||||
stderr io.ReadCloser
|
||||
cancel context.CancelFunc
|
||||
mainLoopWg sync.WaitGroup
|
||||
process *process.Process
|
||||
acc telegraf.Accumulator
|
||||
parser parsers.Parser
|
||||
}
|
||||
|
||||
func (e *Execd) SampleConfig() string {
|
||||
|
|
@ -70,131 +63,29 @@ func (e *Execd) SetParser(parser parsers.Parser) {
|
|||
|
||||
func (e *Execd) Start(acc telegraf.Accumulator) error {
|
||||
e.acc = acc
|
||||
|
||||
if len(e.Command) == 0 {
|
||||
return fmt.Errorf("FATAL no command specified")
|
||||
}
|
||||
|
||||
e.mainLoopWg.Add(1)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
e.cancel = cancel
|
||||
|
||||
if err := e.cmdStart(); err != nil {
|
||||
return err
|
||||
var err error
|
||||
e.process, err = process.New(e.Command)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error creating new process: %w", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := e.cmdLoop(ctx); err != nil {
|
||||
log.Printf("Process quit with message: %s", err.Error())
|
||||
}
|
||||
e.mainLoopWg.Done()
|
||||
}()
|
||||
e.process.RestartDelay = time.Duration(e.RestartDelay)
|
||||
e.process.ReadStdoutFn = e.cmdReadOut
|
||||
e.process.ReadStderrFn = e.cmdReadErr
|
||||
|
||||
if err = e.process.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start process %s: %w", e.Command, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Execd) Stop() {
|
||||
// don't try to stop before all stream readers have started.
|
||||
e.cancel()
|
||||
e.mainLoopWg.Wait()
|
||||
}
|
||||
|
||||
// cmdLoop watches an already running process, restarting it when appropriate.
|
||||
func (e *Execd) cmdLoop(ctx context.Context) error {
|
||||
for {
|
||||
// Use a buffered channel to ensure goroutine below can exit
|
||||
// if `ctx.Done` is selected and nothing reads on `done` anymore
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- e.cmdWait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if e.stdin != nil {
|
||||
e.stdin.Close()
|
||||
gracefulStop(e.cmd, 5*time.Second)
|
||||
}
|
||||
return nil
|
||||
case err := <-done:
|
||||
log.Printf("Process %s terminated: %s", e.Command, err)
|
||||
if isQuitting(ctx) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Restarting in %s...", time.Duration(e.RestartDelay))
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-time.After(time.Duration(e.RestartDelay)):
|
||||
// Continue the loop and restart the process
|
||||
if err := e.cmdStart(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isQuitting(ctx context.Context) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Execd) cmdStart() (err error) {
|
||||
if len(e.Command) > 1 {
|
||||
e.cmd = exec.Command(e.Command[0], e.Command[1:]...)
|
||||
} else {
|
||||
e.cmd = exec.Command(e.Command[0])
|
||||
}
|
||||
|
||||
e.stdin, err = e.cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error opening stdin pipe: %s", err)
|
||||
}
|
||||
|
||||
e.stdout, err = e.cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error opening stdout pipe: %s", err)
|
||||
}
|
||||
|
||||
e.stderr, err = e.cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error opening stderr pipe: %s", err)
|
||||
}
|
||||
|
||||
log.Printf("Starting process: %s", e.Command)
|
||||
|
||||
err = e.cmd.Start()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error starting process: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Execd) cmdWait() error {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
e.cmdReadOut(e.stdout)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
e.cmdReadErr(e.stderr)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
return e.cmd.Wait()
|
||||
e.process.Stop()
|
||||
}
|
||||
|
||||
func (e *Execd) cmdReadOut(out io.Reader) {
|
||||
|
|
@ -249,7 +140,7 @@ func (e *Execd) cmdReadErr(out io.Reader) {
|
|||
scanner := bufio.NewScanner(out)
|
||||
|
||||
for scanner.Scan() {
|
||||
log.Printf("stderr: %q", scanner.Text())
|
||||
log.Printf("[inputs.execd] stderr: %q", scanner.Text())
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
|
@ -14,22 +13,26 @@ import (
|
|||
)
|
||||
|
||||
func (e *Execd) Gather(acc telegraf.Accumulator) error {
|
||||
if e.cmd == nil || e.cmd.Process == nil {
|
||||
if e.process == nil || e.process.Cmd == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
osProcess := e.process.Cmd.Process
|
||||
if osProcess == nil {
|
||||
return nil
|
||||
}
|
||||
switch e.Signal {
|
||||
case "SIGHUP":
|
||||
e.cmd.Process.Signal(syscall.SIGHUP)
|
||||
osProcess.Signal(syscall.SIGHUP)
|
||||
case "SIGUSR1":
|
||||
e.cmd.Process.Signal(syscall.SIGUSR1)
|
||||
osProcess.Signal(syscall.SIGUSR1)
|
||||
case "SIGUSR2":
|
||||
e.cmd.Process.Signal(syscall.SIGUSR2)
|
||||
osProcess.Signal(syscall.SIGUSR2)
|
||||
case "STDIN":
|
||||
if osStdin, ok := e.stdin.(*os.File); ok {
|
||||
if osStdin, ok := e.process.Stdin.(*os.File); ok {
|
||||
osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second))
|
||||
}
|
||||
if _, err := io.WriteString(e.stdin, "\n"); err != nil {
|
||||
if _, err := io.WriteString(e.process.Stdin, "\n"); err != nil {
|
||||
return fmt.Errorf("Error writing to stdin: %s", err)
|
||||
}
|
||||
case "none":
|
||||
|
|
@ -39,11 +42,3 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
|
||||
cmd.Process.Signal(syscall.SIGTERM)
|
||||
go func() {
|
||||
<-time.NewTimer(timeout).C
|
||||
cmd.Process.Kill()
|
||||
}()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,13 +19,13 @@ import (
|
|||
)
|
||||
|
||||
func TestExternalInputWorks(t *testing.T) {
|
||||
jsonParser, err := parsers.NewInfluxParser()
|
||||
influxParser, err := parsers.NewInfluxParser()
|
||||
require.NoError(t, err)
|
||||
|
||||
e := &Execd{
|
||||
Command: []string{shell(), fileShellScriptPath()},
|
||||
RestartDelay: config.Duration(5 * time.Second),
|
||||
parser: jsonParser,
|
||||
parser: influxParser,
|
||||
Signal: "STDIN",
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,23 +6,22 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
func (e *Execd) Gather(acc telegraf.Accumulator) error {
|
||||
if e.cmd == nil || e.cmd.Process == nil {
|
||||
if e.process == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch e.Signal {
|
||||
case "STDIN":
|
||||
if osStdin, ok := e.stdin.(*os.File); ok {
|
||||
if osStdin, ok := e.process.Stdin.(*os.File); ok {
|
||||
osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second))
|
||||
}
|
||||
if _, err := io.WriteString(e.stdin, "\n"); err != nil {
|
||||
if _, err := io.WriteString(e.process.Stdin, "\n"); err != nil {
|
||||
return fmt.Errorf("Error writing to stdin: %s", err)
|
||||
}
|
||||
case "none":
|
||||
|
|
@ -32,7 +31,3 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
|
||||
cmd.Process.Kill()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/processors/dedup"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/defaults"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/enum"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/execd"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/filepath"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/parser"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,109 @@
|
|||
# Execd Processor Plugin
|
||||
|
||||
The `execd` processor plugin runs an external program as a separate process and
|
||||
pipes metrics in to the process's STDIN and reads processed metrics from its STDOUT.
|
||||
The programs must accept influx line protocol on standard in (STDIN) and output
|
||||
metrics in influx line protocol to standard output (STDOUT).
|
||||
|
||||
Program output on standard error is mirrored to the telegraf log.
|
||||
|
||||
### Caveats
|
||||
|
||||
- Metrics with tracking will be considered "delivered" as soon as they are passed
|
||||
to the external process. There is currently no way to match up which metric
|
||||
coming out of the execd process relates to which metric going in (keep in mind
|
||||
that processors can add and drop metrics, and that this is all done
|
||||
asynchronously).
|
||||
- it's not currently possible to use a data_format other than "influx", due to
|
||||
the requirement that it is serialize-parse symmetrical and does not lose any
|
||||
critical type data.
|
||||
|
||||
### Configuration:
|
||||
|
||||
```toml
|
||||
[[processor.execd]]
|
||||
## Program to run as daemon
|
||||
command = ["/path/to/your_program", "arg1", "arg2"]
|
||||
|
||||
## Delay before the process is restarted after an unexpected termination
|
||||
# restart_delay = "10s"
|
||||
```
|
||||
|
||||
### Example
|
||||
|
||||
#### Go daemon example
|
||||
|
||||
This go daemon reads a metric from stdin, multiplies the "count" field by 2,
|
||||
and writes the metric back out.
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
)
|
||||
|
||||
func main() {
|
||||
parser := influx.NewStreamParser(os.Stdin)
|
||||
serializer, _ := serializers.NewInfluxSerializer()
|
||||
|
||||
for {
|
||||
metric, err := parser.Next()
|
||||
if err != nil {
|
||||
if err == influx.EOF {
|
||||
return // stream ended
|
||||
}
|
||||
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
|
||||
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
c, found := metric.GetField("count")
|
||||
if !found {
|
||||
fmt.Fprintf(os.Stderr, "metric has no count field\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
switch t := c.(type) {
|
||||
case float64:
|
||||
t *= 2
|
||||
metric.AddField("count", t)
|
||||
case int64:
|
||||
t *= 2
|
||||
metric.AddField("count", t)
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c)
|
||||
os.Exit(1)
|
||||
}
|
||||
b, err := serializer.Serialize(metric)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Fprint(os.Stdout, string(b))
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
to run it, you'd build the binary using go, eg `go build -o multiplier.exe main.go`
|
||||
|
||||
```toml
|
||||
[[processors.execd]]
|
||||
command = ["multiplier.exe"]
|
||||
```
|
||||
|
||||
#### Ruby daemon using SIGHUP
|
||||
|
||||
- See [Ruby daemon](./examples/multiplier_line_protocol/multiplier_line_protocol.rb)
|
||||
|
||||
```toml
|
||||
[[processors.execd]]
|
||||
command = ["ruby", "plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb"]
|
||||
```
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package main
|
||||
|
||||
// Example using HUP signaling
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func main() {
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGHUP)
|
||||
|
||||
counter := 0
|
||||
|
||||
for {
|
||||
<-c
|
||||
|
||||
fmt.Printf("counter_go count=%d\n", counter)
|
||||
counter++
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
[agent]
|
||||
interval = "10s"
|
||||
|
||||
[[inputs.execd]]
|
||||
command = ["ruby", "plugins/inputs/execd/examples/count.rb"]
|
||||
|
||||
[[processors.execd]]
|
||||
command = ["ruby", "plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb"]
|
||||
|
||||
|
||||
[[outputs.file]]
|
||||
files = ["stdout"]
|
||||
data_format = "influx"
|
||||
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
#!/usr/bin/env ruby
|
||||
|
||||
loop do
|
||||
# example input: "counter_ruby count=0 1586302128978187000"
|
||||
line = STDIN.readline.chomp
|
||||
# parse out influx line protocol sections with a really simple hand-rolled parser that doesn't support escaping.
|
||||
# for a full line parser in ruby, check out something like the influxdb-lineprotocol-parser gem.
|
||||
parts = line.split(" ")
|
||||
case parts.size
|
||||
when 3
|
||||
measurement, fields, timestamp = parts
|
||||
when 4
|
||||
measurement, tags, fields, timestamp = parts
|
||||
else
|
||||
STDERR.puts "Unable to parse line protocol"
|
||||
exit 1
|
||||
end
|
||||
fields = fields.split(",").map{|t|
|
||||
k,v = t.split("=")
|
||||
if k == "count"
|
||||
v = v.to_i * 2 # multiple count metric by two
|
||||
end
|
||||
"#{k}=#{v}"
|
||||
}.join(",")
|
||||
puts [measurement, tags, fields, timestamp].select{|s| s && s.size != 0 }.join(" ")
|
||||
STDOUT.flush
|
||||
end
|
||||
|
|
@ -0,0 +1,149 @@
|
|||
package execd
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal/process"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/processors"
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
)
|
||||
|
||||
const sampleConfig = `
|
||||
## Program to run as daemon
|
||||
command = ["telegraf-smartctl", "-d", "/dev/sda"]
|
||||
|
||||
## Delay before the process is restarted after an unexpected termination
|
||||
restart_delay = "10s"
|
||||
`
|
||||
|
||||
type Execd struct {
|
||||
Command []string `toml:"command"`
|
||||
RestartDelay config.Duration `toml:"restart_delay"`
|
||||
|
||||
parserConfig *parsers.Config
|
||||
parser parsers.Parser
|
||||
serializerConfig *serializers.Config
|
||||
serializer serializers.Serializer
|
||||
acc telegraf.Accumulator
|
||||
process *process.Process
|
||||
}
|
||||
|
||||
func New() *Execd {
|
||||
return &Execd{
|
||||
RestartDelay: config.Duration(10 * time.Second),
|
||||
parserConfig: &parsers.Config{
|
||||
DataFormat: "influx",
|
||||
},
|
||||
serializerConfig: &serializers.Config{
|
||||
DataFormat: "influx",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Execd) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (e *Execd) Description() string {
|
||||
return "Run executable as long-running processor plugin"
|
||||
}
|
||||
|
||||
func (e *Execd) Start(acc telegraf.Accumulator) error {
|
||||
var err error
|
||||
e.parser, err = parsers.NewParser(e.parserConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating parser: %w", err)
|
||||
}
|
||||
e.serializer, err = serializers.NewSerializer(e.serializerConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating serializer: %w", err)
|
||||
}
|
||||
e.acc = acc
|
||||
|
||||
if len(e.Command) == 0 {
|
||||
return fmt.Errorf("no command specified")
|
||||
}
|
||||
|
||||
e.process, err = process.New(e.Command)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating new process: %w", err)
|
||||
}
|
||||
|
||||
e.process.RestartDelay = time.Duration(e.RestartDelay)
|
||||
e.process.ReadStdoutFn = e.cmdReadOut
|
||||
e.process.ReadStderrFn = e.cmdReadErr
|
||||
|
||||
if err = e.process.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start process %s: %w", e.Command, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Execd) Add(m telegraf.Metric, acc telegraf.Accumulator) {
|
||||
b, err := e.serializer.Serialize(m)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("metric serializing error: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
_, err = e.process.Stdin.Write(b)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("error writing to process stdin: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
// We cannot maintain tracking metrics at the moment because input/output
|
||||
// is done asynchronously and we don't have any metric metadata to tie the
|
||||
// output metric back to the original input metric.
|
||||
m.Drop()
|
||||
}
|
||||
|
||||
func (e *Execd) Stop() error {
|
||||
e.process.Stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Execd) cmdReadOut(out io.Reader) {
|
||||
scanner := bufio.NewScanner(out)
|
||||
|
||||
for scanner.Scan() {
|
||||
metrics, err := e.parser.Parse(scanner.Bytes())
|
||||
if err != nil {
|
||||
log.Println(fmt.Errorf("Parse error: %s", err))
|
||||
}
|
||||
|
||||
for _, metric := range metrics {
|
||||
e.acc.AddMetric(metric)
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
log.Println(fmt.Errorf("Error reading stdout: %s", err))
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Execd) cmdReadErr(out io.Reader) {
|
||||
scanner := bufio.NewScanner(out)
|
||||
|
||||
for scanner.Scan() {
|
||||
log.Printf("stderr: %q", scanner.Text())
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
log.Println(fmt.Errorf("Error reading stderr: %s", err))
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
processors.AddStreaming("execd", func() telegraf.StreamingProcessor {
|
||||
return New()
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,135 @@
|
|||
package execd
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestExternalProcessorWorks(t *testing.T) {
|
||||
e := New()
|
||||
exe, err := os.Executable()
|
||||
require.NoError(t, err)
|
||||
t.Log(exe)
|
||||
e.Command = []string{exe, "-countmultiplier"}
|
||||
e.RestartDelay = config.Duration(5 * time.Second)
|
||||
|
||||
acc := &testutil.Accumulator{}
|
||||
|
||||
require.NoError(t, e.Start(acc))
|
||||
|
||||
now := time.Now()
|
||||
metrics := []telegraf.Metric{}
|
||||
for i := 0; i < 10; i++ {
|
||||
m, err := metric.New("test",
|
||||
map[string]string{
|
||||
"city": "Toronto",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"population": 6000000,
|
||||
"count": 1,
|
||||
},
|
||||
now)
|
||||
require.NoError(t, err)
|
||||
metrics = append(metrics, m)
|
||||
now = now.Add(1)
|
||||
|
||||
e.Add(m, acc)
|
||||
}
|
||||
|
||||
acc.Wait(1)
|
||||
m := acc.GetTelegrafMetrics()[0]
|
||||
|
||||
require.NoError(t, e.Stop())
|
||||
|
||||
require.Equal(t, "test", m.Name())
|
||||
|
||||
city, ok := m.Tags()["city"]
|
||||
require.True(t, ok)
|
||||
require.EqualValues(t, "Toronto", city)
|
||||
|
||||
val, ok := m.Fields()["population"]
|
||||
require.True(t, ok)
|
||||
require.EqualValues(t, 6000000, val)
|
||||
|
||||
val, ok = m.Fields()["count"]
|
||||
require.True(t, ok)
|
||||
require.EqualValues(t, 2, val)
|
||||
|
||||
metricTime := m.Time().UnixNano()
|
||||
|
||||
// read the other 9 and make sure they're ordered properly
|
||||
acc.Wait(9)
|
||||
metrics = acc.GetTelegrafMetrics()
|
||||
for i := 0; i < 9; i++ {
|
||||
m = metrics[i+1]
|
||||
require.EqualValues(t, metricTime+1, m.Time().UnixNano())
|
||||
metricTime = m.Time().UnixNano()
|
||||
}
|
||||
}
|
||||
|
||||
var countmultiplier = flag.Bool("countmultiplier", false,
|
||||
"if true, act like line input program instead of test")
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Parse()
|
||||
if *countmultiplier {
|
||||
runCountMultiplierProgram()
|
||||
os.Exit(0)
|
||||
}
|
||||
code := m.Run()
|
||||
os.Exit(code)
|
||||
}
|
||||
|
||||
func runCountMultiplierProgram() {
|
||||
parser := influx.NewStreamParser(os.Stdin)
|
||||
serializer, _ := serializers.NewInfluxSerializer()
|
||||
|
||||
for {
|
||||
metric, err := parser.Next()
|
||||
if err != nil {
|
||||
if err == influx.EOF {
|
||||
return // stream ended
|
||||
}
|
||||
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
|
||||
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
c, found := metric.GetField("count")
|
||||
if !found {
|
||||
fmt.Fprintf(os.Stderr, "metric has no count field\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
switch t := c.(type) {
|
||||
case float64:
|
||||
t *= 2
|
||||
metric.AddField("count", t)
|
||||
case int64:
|
||||
t *= 2
|
||||
metric.AddField("count", t)
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c)
|
||||
os.Exit(1)
|
||||
}
|
||||
b, err := serializer.Serialize(metric)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Fprint(os.Stdout, string(b))
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue