diff --git a/agent/agent.go b/agent/agent.go index a48ed590a..0ff589378 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -510,6 +510,7 @@ func (a *Agent) runProcessors( for m := range unit.src { if err := unit.processor.Add(m, acc); err != nil { acc.AddError(err) + m.Drop() } } unit.processor.Stop() diff --git a/docs/PROCESSORS.md b/docs/PROCESSORS.md index 6ea82fdae..45a4eb277 100644 --- a/docs/PROCESSORS.md +++ b/docs/PROCESSORS.md @@ -64,6 +64,81 @@ func init() { } ``` +### Streaming Processors + +Streaming processors are a new processor type available to you. They are +particularly useful to implement processor types that use background processes +or goroutines to process multiple metrics at the same time. Some examples of this +are the execd processor, which pipes metrics out to an external process over stdin +and reads them back over stdout, and the reverse_dns processor, which does reverse +dns lookups on IP addresses in fields. While both of these come with a speed cost, +it would be significantly worse if you had to process one metric completely from +start to finish before handling the next metric, and thus they benefit +significantly from a streaming-pipe approach. + +Some differences from classic Processors: + +* Streaming processors must conform to the [telegraf.StreamingProcessor][] interface. +* Processors should call `processors.AddStreaming` in their `init` function to register + themselves. See below for a quick example. + +### Streaming Processor Example + +```go +package printer + +// printer.go + +import ( + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Printer struct { +} + +var sampleConfig = ` +` + +func (p *Printer) SampleConfig() string { + return sampleConfig +} + +func (p *Printer) Description() string { + return "Print all metrics that pass through this filter." +} + +func (p *Printer) Init() error { + return nil +} + +func (p *Printer) Start(acc telegraf.Accumulator) error { +} + +func (p *Printer) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { + // print! + fmt.Println(metric.String()) + + // pass the metric downstream, or metric.Drop() it. + // Metric will be dropped if this function returns an error. + acc.AddMetric(metric) + + return nil +} + +func (p *Printer) Stop() error { +} + +func init() { + processors.AddStreaming("printer", func() telegraf.StreamingProcessor { + return &Printer{} + }) +} +``` + [SampleConfig]: https://github.com/influxdata/telegraf/wiki/SampleConfig [CodeStyle]: https://github.com/influxdata/telegraf/wiki/CodeStyle [telegraf.Processor]: https://godoc.org/github.com/influxdata/telegraf#Processor +[telegraf.StreamingProcessor]: https://godoc.org/github.com/influxdata/telegraf#StreamingProcessor diff --git a/internal/process/process.go b/internal/process/process.go index 371c2cd70..b7fd77b92 100644 --- a/internal/process/process.go +++ b/internal/process/process.go @@ -2,13 +2,15 @@ package process import ( "context" + "errors" "fmt" "io" "io/ioutil" - "log" "os/exec" "sync" "time" + + "github.com/influxdata/telegraf" ) // Process is a long-running process manager that will restart processes if they stop. @@ -20,6 +22,7 @@ type Process struct { ReadStdoutFn func(io.Reader) ReadStderrFn func(io.Reader) RestartDelay time.Duration + Log telegraf.Logger cancel context.CancelFunc mainLoopWg sync.WaitGroup @@ -27,6 +30,10 @@ type Process struct { // New creates a new process wrapper func New(command []string) (*Process, error) { + if len(command) == 0 { + return nil, errors.New("no command") + } + p := &Process{ RestartDelay: 5 * time.Second, } @@ -56,8 +63,6 @@ func New(command []string) (*Process, error) { // Start the process func (p *Process) Start() error { - p.mainLoopWg.Add(1) - ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel @@ -65,9 +70,10 @@ func (p *Process) Start() error { return err } + p.mainLoopWg.Add(1) go func() { if err := p.cmdLoop(ctx); err != nil { - log.Printf("E! [agent] Process quit with message: %v", err) + p.Log.Errorf("Process quit with message: %v", err) } p.mainLoopWg.Done() }() @@ -83,10 +89,10 @@ func (p *Process) Stop() { } func (p *Process) cmdStart() error { - log.Printf("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args) + p.Log.Infof("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args) if err := p.Cmd.Start(); err != nil { - return fmt.Errorf("Error starting process: %s", err) + return fmt.Errorf("error starting process: %s", err) } return nil @@ -105,12 +111,12 @@ func (p *Process) cmdLoop(ctx context.Context) error { for { err := p.cmdWait() if isQuitting(ctx) { - log.Printf("Process %s shut down", p.Cmd.Path) + p.Log.Infof("Process %s shut down", p.Cmd.Path) return nil } - log.Printf("Process %s terminated: %v", p.Cmd.Path, err) - log.Printf("Restarting in %s...", time.Duration(p.RestartDelay)) + p.Log.Errorf("Process %s exited: %v", p.Cmd.Path, err) + p.Log.Infof("Restarting in %s...", time.Duration(p.RestartDelay)) select { case <-ctx.Done(): diff --git a/internal/process/process_posix.go b/internal/process/process_posix.go index ab8342422..f459e00e2 100644 --- a/internal/process/process_posix.go +++ b/internal/process/process_posix.go @@ -10,13 +10,13 @@ import ( func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { time.AfterFunc(timeout, func() { - if cmd == nil || cmd.ProcessState == nil { + if cmd.ProcessState == nil { return } if !cmd.ProcessState.Exited() { cmd.Process.Signal(syscall.SIGTERM) time.AfterFunc(timeout, func() { - if cmd == nil || cmd.ProcessState == nil { + if cmd.ProcessState == nil { return } if !cmd.ProcessState.Exited() { diff --git a/internal/process/process_windows.go b/internal/process/process_windows.go index 55b78f881..fc1108415 100644 --- a/internal/process/process_windows.go +++ b/internal/process/process_windows.go @@ -9,7 +9,7 @@ import ( func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { time.AfterFunc(timeout, func() { - if cmd == nil || cmd.ProcessState == nil { + if cmd.ProcessState == nil { return } if !cmd.ProcessState.Exited() { diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index f44f7648e..00479cb3e 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -2,9 +2,9 @@ package execd import ( "bufio" + "errors" "fmt" "io" - "log" "time" "github.com/influxdata/telegraf" @@ -43,6 +43,7 @@ type Execd struct { Command []string Signal string RestartDelay config.Duration + Log telegraf.Logger process *process.Process acc telegraf.Accumulator @@ -63,16 +64,12 @@ func (e *Execd) SetParser(parser parsers.Parser) { func (e *Execd) Start(acc telegraf.Accumulator) error { e.acc = acc - if len(e.Command) == 0 { - return fmt.Errorf("FATAL no command specified") - } - var err error e.process, err = process.New(e.Command) if err != nil { - return fmt.Errorf("Error creating new process: %w", err) + return fmt.Errorf("error creating new process: %w", err) } - + e.process.Log = e.Log e.process.RestartDelay = time.Duration(e.RestartDelay) e.process.ReadStdoutFn = e.cmdReadOut e.process.ReadStderrFn = e.cmdReadErr @@ -100,7 +97,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { for scanner.Scan() { metrics, err := e.parser.Parse(scanner.Bytes()) if err != nil { - e.acc.AddError(fmt.Errorf("Parse error: %s", err)) + e.acc.AddError(fmt.Errorf("parse error: %w", err)) } for _, metric := range metrics { @@ -109,7 +106,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { } if err := scanner.Err(); err != nil { - e.acc.AddError(fmt.Errorf("Error reading stdout: %s", err)) + e.acc.AddError(fmt.Errorf("error reading stdout: %w", err)) } } @@ -140,14 +137,21 @@ func (e *Execd) cmdReadErr(out io.Reader) { scanner := bufio.NewScanner(out) for scanner.Scan() { - log.Printf("[inputs.execd] stderr: %q", scanner.Text()) + e.Log.Errorf("stderr: %q", scanner.Text()) } if err := scanner.Err(); err != nil { - e.acc.AddError(fmt.Errorf("Error reading stderr: %s", err)) + e.acc.AddError(fmt.Errorf("error reading stderr: %w", err)) } } +func (e *Execd) Init() error { + if len(e.Command) == 0 { + return errors.New("no command specified") + } + return nil +} + func init() { inputs.Add("execd", func() telegraf.Input { return &Execd{ diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index 1cdbfdc5f..ce046568c 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/models" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/parsers" @@ -28,6 +29,7 @@ func TestExternalInputWorks(t *testing.T) { parser: influxParser, Signal: "STDIN", } + e.Log = testutil.Logger{} metrics := make(chan telegraf.Metric, 10) defer close(metrics) @@ -64,6 +66,7 @@ func TestParsesLinesContainingNewline(t *testing.T) { Signal: "STDIN", acc: acc, } + e.Log = testutil.Logger{} cases := []struct { Name string diff --git a/plugins/processors/execd/README.md b/plugins/processors/execd/README.md index 5e779521a..f1fdb0b85 100644 --- a/plugins/processors/execd/README.md +++ b/plugins/processors/execd/README.md @@ -23,7 +23,8 @@ Program output on standard error is mirrored to the telegraf log. ```toml [[processor.execd]] ## Program to run as daemon - command = ["/path/to/your_program", "arg1", "arg2"] + ## eg: command = ["/path/to/your_program", "arg1", "arg2"] + command = ["cat"] ## Delay before the process is restarted after an unexpected termination # restart_delay = "10s" diff --git a/plugins/processors/execd/execd.go b/plugins/processors/execd/execd.go index f2026fd0e..5e4bbc53f 100644 --- a/plugins/processors/execd/execd.go +++ b/plugins/processors/execd/execd.go @@ -2,9 +2,9 @@ package execd import ( "bufio" + "errors" "fmt" "io" - "log" "time" "github.com/influxdata/telegraf" @@ -16,8 +16,9 @@ import ( ) const sampleConfig = ` - ## Program to run as daemon - command = ["telegraf-smartctl", "-d", "/dev/sda"] + ## Program to run as daemon + ## eg: command = ["/path/to/your_program", "arg1", "arg2"] + command = ["cat"] ## Delay before the process is restarted after an unexpected termination restart_delay = "10s" @@ -26,6 +27,7 @@ const sampleConfig = ` type Execd struct { Command []string `toml:"command"` RestartDelay config.Duration `toml:"restart_delay"` + Log telegraf.Logger parserConfig *parsers.Config parser parsers.Parser @@ -67,15 +69,11 @@ func (e *Execd) Start(acc telegraf.Accumulator) error { } e.acc = acc - if len(e.Command) == 0 { - return fmt.Errorf("no command specified") - } - e.process, err = process.New(e.Command) if err != nil { return fmt.Errorf("error creating new process: %w", err) } - + e.process.Log = e.Log e.process.RestartDelay = time.Duration(e.RestartDelay) e.process.ReadStdoutFn = e.cmdReadOut e.process.ReadStderrFn = e.cmdReadErr @@ -116,7 +114,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { for scanner.Scan() { metrics, err := e.parser.Parse(scanner.Bytes()) if err != nil { - log.Println(fmt.Errorf("Parse error: %s", err)) + e.Log.Errorf("Parse error: %s", err) } for _, metric := range metrics { @@ -125,7 +123,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { } if err := scanner.Err(); err != nil { - log.Println(fmt.Errorf("Error reading stdout: %s", err)) + e.Log.Errorf("Error reading stdout: %s", err) } } @@ -133,14 +131,21 @@ func (e *Execd) cmdReadErr(out io.Reader) { scanner := bufio.NewScanner(out) for scanner.Scan() { - log.Printf("stderr: %q", scanner.Text()) + e.Log.Errorf("stderr: %q", scanner.Text()) } if err := scanner.Err(); err != nil { - log.Println(fmt.Errorf("Error reading stderr: %s", err)) + e.Log.Errorf("Error reading stderr: %s", err) } } +func (e *Execd) Init() error { + if len(e.Command) == 0 { + return errors.New("no command specified") + } + return nil +} + func init() { processors.AddStreaming("execd", func() telegraf.StreamingProcessor { return New() diff --git a/plugins/processors/execd/execd_test.go b/plugins/processors/execd/execd_test.go index d25dece64..669d6601c 100644 --- a/plugins/processors/execd/execd_test.go +++ b/plugins/processors/execd/execd_test.go @@ -18,6 +18,8 @@ import ( func TestExternalProcessorWorks(t *testing.T) { e := New() + e.Log = testutil.Logger{} + exe, err := os.Executable() require.NoError(t, err) t.Log(exe) @@ -29,6 +31,7 @@ func TestExternalProcessorWorks(t *testing.T) { require.NoError(t, e.Start(acc)) now := time.Now() + orig := now metrics := []telegraf.Metric{} for i := 0; i < 10; i++ { m, err := metric.New("test", @@ -52,19 +55,17 @@ func TestExternalProcessorWorks(t *testing.T) { require.NoError(t, e.Stop()) - require.Equal(t, "test", m.Name()) - - city, ok := m.Tags()["city"] - require.True(t, ok) - require.EqualValues(t, "Toronto", city) - - val, ok := m.Fields()["population"] - require.True(t, ok) - require.EqualValues(t, 6000000, val) - - val, ok = m.Fields()["count"] - require.True(t, ok) - require.EqualValues(t, 2, val) + expected := testutil.MustMetric("test", + map[string]string{ + "city": "Toronto", + }, + map[string]interface{}{ + "population": 6000000, + "count": 2, + }, + orig, + ) + testutil.RequireMetricEqual(t, expected, m) metricTime := m.Time().UnixNano()