address feedback

This commit is contained in:
Steven Soroka 2020-06-26 16:38:07 -04:00
parent 88b09cf18c
commit 9785d3c2c6
10 changed files with 145 additions and 49 deletions

View File

@ -510,6 +510,7 @@ func (a *Agent) runProcessors(
for m := range unit.src { for m := range unit.src {
if err := unit.processor.Add(m, acc); err != nil { if err := unit.processor.Add(m, acc); err != nil {
acc.AddError(err) acc.AddError(err)
m.Drop()
} }
} }
unit.processor.Stop() unit.processor.Stop()

View File

@ -64,6 +64,81 @@ func init() {
} }
``` ```
### Streaming Processors
Streaming processors are a new processor type available to you. They are
particularly useful to implement processor types that use background processes
or goroutines to process multiple metrics at the same time. Some examples of this
are the execd processor, which pipes metrics out to an external process over stdin
and reads them back over stdout, and the reverse_dns processor, which does reverse
dns lookups on IP addresses in fields. While both of these come with a speed cost,
it would be significantly worse if you had to process one metric completely from
start to finish before handling the next metric, and thus they benefit
significantly from a streaming-pipe approach.
Some differences from classic Processors:
* Streaming processors must conform to the [telegraf.StreamingProcessor][] interface.
* Processors should call `processors.AddStreaming` in their `init` function to register
themselves. See below for a quick example.
### Streaming Processor Example
```go
package printer
// printer.go
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
type Printer struct {
}
var sampleConfig = `
`
func (p *Printer) SampleConfig() string {
return sampleConfig
}
func (p *Printer) Description() string {
return "Print all metrics that pass through this filter."
}
func (p *Printer) Init() error {
return nil
}
func (p *Printer) Start(acc telegraf.Accumulator) error {
}
func (p *Printer) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
// print!
fmt.Println(metric.String())
// pass the metric downstream, or metric.Drop() it.
// Metric will be dropped if this function returns an error.
acc.AddMetric(metric)
return nil
}
func (p *Printer) Stop() error {
}
func init() {
processors.AddStreaming("printer", func() telegraf.StreamingProcessor {
return &Printer{}
})
}
```
[SampleConfig]: https://github.com/influxdata/telegraf/wiki/SampleConfig [SampleConfig]: https://github.com/influxdata/telegraf/wiki/SampleConfig
[CodeStyle]: https://github.com/influxdata/telegraf/wiki/CodeStyle [CodeStyle]: https://github.com/influxdata/telegraf/wiki/CodeStyle
[telegraf.Processor]: https://godoc.org/github.com/influxdata/telegraf#Processor [telegraf.Processor]: https://godoc.org/github.com/influxdata/telegraf#Processor
[telegraf.StreamingProcessor]: https://godoc.org/github.com/influxdata/telegraf#StreamingProcessor

View File

@ -2,13 +2,15 @@ package process
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"log"
"os/exec" "os/exec"
"sync" "sync"
"time" "time"
"github.com/influxdata/telegraf"
) )
// Process is a long-running process manager that will restart processes if they stop. // Process is a long-running process manager that will restart processes if they stop.
@ -20,6 +22,7 @@ type Process struct {
ReadStdoutFn func(io.Reader) ReadStdoutFn func(io.Reader)
ReadStderrFn func(io.Reader) ReadStderrFn func(io.Reader)
RestartDelay time.Duration RestartDelay time.Duration
Log telegraf.Logger
cancel context.CancelFunc cancel context.CancelFunc
mainLoopWg sync.WaitGroup mainLoopWg sync.WaitGroup
@ -27,6 +30,10 @@ type Process struct {
// New creates a new process wrapper // New creates a new process wrapper
func New(command []string) (*Process, error) { func New(command []string) (*Process, error) {
if len(command) == 0 {
return nil, errors.New("no command")
}
p := &Process{ p := &Process{
RestartDelay: 5 * time.Second, RestartDelay: 5 * time.Second,
} }
@ -56,8 +63,6 @@ func New(command []string) (*Process, error) {
// Start the process // Start the process
func (p *Process) Start() error { func (p *Process) Start() error {
p.mainLoopWg.Add(1)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel p.cancel = cancel
@ -65,9 +70,10 @@ func (p *Process) Start() error {
return err return err
} }
p.mainLoopWg.Add(1)
go func() { go func() {
if err := p.cmdLoop(ctx); err != nil { if err := p.cmdLoop(ctx); err != nil {
log.Printf("E! [agent] Process quit with message: %v", err) p.Log.Errorf("Process quit with message: %v", err)
} }
p.mainLoopWg.Done() p.mainLoopWg.Done()
}() }()
@ -83,10 +89,10 @@ func (p *Process) Stop() {
} }
func (p *Process) cmdStart() error { func (p *Process) cmdStart() error {
log.Printf("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args) p.Log.Infof("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args)
if err := p.Cmd.Start(); err != nil { if err := p.Cmd.Start(); err != nil {
return fmt.Errorf("Error starting process: %s", err) return fmt.Errorf("error starting process: %s", err)
} }
return nil return nil
@ -105,12 +111,12 @@ func (p *Process) cmdLoop(ctx context.Context) error {
for { for {
err := p.cmdWait() err := p.cmdWait()
if isQuitting(ctx) { if isQuitting(ctx) {
log.Printf("Process %s shut down", p.Cmd.Path) p.Log.Infof("Process %s shut down", p.Cmd.Path)
return nil return nil
} }
log.Printf("Process %s terminated: %v", p.Cmd.Path, err) p.Log.Errorf("Process %s exited: %v", p.Cmd.Path, err)
log.Printf("Restarting in %s...", time.Duration(p.RestartDelay)) p.Log.Infof("Restarting in %s...", time.Duration(p.RestartDelay))
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -10,13 +10,13 @@ import (
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
time.AfterFunc(timeout, func() { time.AfterFunc(timeout, func() {
if cmd == nil || cmd.ProcessState == nil { if cmd.ProcessState == nil {
return return
} }
if !cmd.ProcessState.Exited() { if !cmd.ProcessState.Exited() {
cmd.Process.Signal(syscall.SIGTERM) cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(timeout, func() { time.AfterFunc(timeout, func() {
if cmd == nil || cmd.ProcessState == nil { if cmd.ProcessState == nil {
return return
} }
if !cmd.ProcessState.Exited() { if !cmd.ProcessState.Exited() {

View File

@ -9,7 +9,7 @@ import (
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
time.AfterFunc(timeout, func() { time.AfterFunc(timeout, func() {
if cmd == nil || cmd.ProcessState == nil { if cmd.ProcessState == nil {
return return
} }
if !cmd.ProcessState.Exited() { if !cmd.ProcessState.Exited() {

View File

@ -2,9 +2,9 @@ package execd
import ( import (
"bufio" "bufio"
"errors"
"fmt" "fmt"
"io" "io"
"log"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -43,6 +43,7 @@ type Execd struct {
Command []string Command []string
Signal string Signal string
RestartDelay config.Duration RestartDelay config.Duration
Log telegraf.Logger
process *process.Process process *process.Process
acc telegraf.Accumulator acc telegraf.Accumulator
@ -63,16 +64,12 @@ func (e *Execd) SetParser(parser parsers.Parser) {
func (e *Execd) Start(acc telegraf.Accumulator) error { func (e *Execd) Start(acc telegraf.Accumulator) error {
e.acc = acc e.acc = acc
if len(e.Command) == 0 {
return fmt.Errorf("FATAL no command specified")
}
var err error var err error
e.process, err = process.New(e.Command) e.process, err = process.New(e.Command)
if err != nil { if err != nil {
return fmt.Errorf("Error creating new process: %w", err) return fmt.Errorf("error creating new process: %w", err)
} }
e.process.Log = e.Log
e.process.RestartDelay = time.Duration(e.RestartDelay) e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.ReadStdoutFn = e.cmdReadOut e.process.ReadStdoutFn = e.cmdReadOut
e.process.ReadStderrFn = e.cmdReadErr e.process.ReadStderrFn = e.cmdReadErr
@ -100,7 +97,7 @@ func (e *Execd) cmdReadOut(out io.Reader) {
for scanner.Scan() { for scanner.Scan() {
metrics, err := e.parser.Parse(scanner.Bytes()) metrics, err := e.parser.Parse(scanner.Bytes())
if err != nil { if err != nil {
e.acc.AddError(fmt.Errorf("Parse error: %s", err)) e.acc.AddError(fmt.Errorf("parse error: %w", err))
} }
for _, metric := range metrics { for _, metric := range metrics {
@ -109,7 +106,7 @@ func (e *Execd) cmdReadOut(out io.Reader) {
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
e.acc.AddError(fmt.Errorf("Error reading stdout: %s", err)) e.acc.AddError(fmt.Errorf("error reading stdout: %w", err))
} }
} }
@ -140,14 +137,21 @@ func (e *Execd) cmdReadErr(out io.Reader) {
scanner := bufio.NewScanner(out) scanner := bufio.NewScanner(out)
for scanner.Scan() { for scanner.Scan() {
log.Printf("[inputs.execd] stderr: %q", scanner.Text()) e.Log.Errorf("stderr: %q", scanner.Text())
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
e.acc.AddError(fmt.Errorf("Error reading stderr: %s", err)) e.acc.AddError(fmt.Errorf("error reading stderr: %w", err))
} }
} }
func (e *Execd) Init() error {
if len(e.Command) == 0 {
return errors.New("no command specified")
}
return nil
}
func init() { func init() {
inputs.Add("execd", func() telegraf.Input { inputs.Add("execd", func() telegraf.Input {
return &Execd{ return &Execd{

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
@ -28,6 +29,7 @@ func TestExternalInputWorks(t *testing.T) {
parser: influxParser, parser: influxParser,
Signal: "STDIN", Signal: "STDIN",
} }
e.Log = testutil.Logger{}
metrics := make(chan telegraf.Metric, 10) metrics := make(chan telegraf.Metric, 10)
defer close(metrics) defer close(metrics)
@ -64,6 +66,7 @@ func TestParsesLinesContainingNewline(t *testing.T) {
Signal: "STDIN", Signal: "STDIN",
acc: acc, acc: acc,
} }
e.Log = testutil.Logger{}
cases := []struct { cases := []struct {
Name string Name string

View File

@ -23,7 +23,8 @@ Program output on standard error is mirrored to the telegraf log.
```toml ```toml
[[processor.execd]] [[processor.execd]]
## Program to run as daemon ## Program to run as daemon
command = ["/path/to/your_program", "arg1", "arg2"] ## eg: command = ["/path/to/your_program", "arg1", "arg2"]
command = ["cat"]
## Delay before the process is restarted after an unexpected termination ## Delay before the process is restarted after an unexpected termination
# restart_delay = "10s" # restart_delay = "10s"

View File

@ -2,9 +2,9 @@ package execd
import ( import (
"bufio" "bufio"
"errors"
"fmt" "fmt"
"io" "io"
"log"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -16,8 +16,9 @@ import (
) )
const sampleConfig = ` const sampleConfig = `
## Program to run as daemon ## Program to run as daemon
command = ["telegraf-smartctl", "-d", "/dev/sda"] ## eg: command = ["/path/to/your_program", "arg1", "arg2"]
command = ["cat"]
## Delay before the process is restarted after an unexpected termination ## Delay before the process is restarted after an unexpected termination
restart_delay = "10s" restart_delay = "10s"
@ -26,6 +27,7 @@ const sampleConfig = `
type Execd struct { type Execd struct {
Command []string `toml:"command"` Command []string `toml:"command"`
RestartDelay config.Duration `toml:"restart_delay"` RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger
parserConfig *parsers.Config parserConfig *parsers.Config
parser parsers.Parser parser parsers.Parser
@ -67,15 +69,11 @@ func (e *Execd) Start(acc telegraf.Accumulator) error {
} }
e.acc = acc e.acc = acc
if len(e.Command) == 0 {
return fmt.Errorf("no command specified")
}
e.process, err = process.New(e.Command) e.process, err = process.New(e.Command)
if err != nil { if err != nil {
return fmt.Errorf("error creating new process: %w", err) return fmt.Errorf("error creating new process: %w", err)
} }
e.process.Log = e.Log
e.process.RestartDelay = time.Duration(e.RestartDelay) e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.ReadStdoutFn = e.cmdReadOut e.process.ReadStdoutFn = e.cmdReadOut
e.process.ReadStderrFn = e.cmdReadErr e.process.ReadStderrFn = e.cmdReadErr
@ -116,7 +114,7 @@ func (e *Execd) cmdReadOut(out io.Reader) {
for scanner.Scan() { for scanner.Scan() {
metrics, err := e.parser.Parse(scanner.Bytes()) metrics, err := e.parser.Parse(scanner.Bytes())
if err != nil { if err != nil {
log.Println(fmt.Errorf("Parse error: %s", err)) e.Log.Errorf("Parse error: %s", err)
} }
for _, metric := range metrics { for _, metric := range metrics {
@ -125,7 +123,7 @@ func (e *Execd) cmdReadOut(out io.Reader) {
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
log.Println(fmt.Errorf("Error reading stdout: %s", err)) e.Log.Errorf("Error reading stdout: %s", err)
} }
} }
@ -133,14 +131,21 @@ func (e *Execd) cmdReadErr(out io.Reader) {
scanner := bufio.NewScanner(out) scanner := bufio.NewScanner(out)
for scanner.Scan() { for scanner.Scan() {
log.Printf("stderr: %q", scanner.Text()) e.Log.Errorf("stderr: %q", scanner.Text())
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
log.Println(fmt.Errorf("Error reading stderr: %s", err)) e.Log.Errorf("Error reading stderr: %s", err)
} }
} }
func (e *Execd) Init() error {
if len(e.Command) == 0 {
return errors.New("no command specified")
}
return nil
}
func init() { func init() {
processors.AddStreaming("execd", func() telegraf.StreamingProcessor { processors.AddStreaming("execd", func() telegraf.StreamingProcessor {
return New() return New()

View File

@ -18,6 +18,8 @@ import (
func TestExternalProcessorWorks(t *testing.T) { func TestExternalProcessorWorks(t *testing.T) {
e := New() e := New()
e.Log = testutil.Logger{}
exe, err := os.Executable() exe, err := os.Executable()
require.NoError(t, err) require.NoError(t, err)
t.Log(exe) t.Log(exe)
@ -29,6 +31,7 @@ func TestExternalProcessorWorks(t *testing.T) {
require.NoError(t, e.Start(acc)) require.NoError(t, e.Start(acc))
now := time.Now() now := time.Now()
orig := now
metrics := []telegraf.Metric{} metrics := []telegraf.Metric{}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
m, err := metric.New("test", m, err := metric.New("test",
@ -52,19 +55,17 @@ func TestExternalProcessorWorks(t *testing.T) {
require.NoError(t, e.Stop()) require.NoError(t, e.Stop())
require.Equal(t, "test", m.Name()) expected := testutil.MustMetric("test",
map[string]string{
city, ok := m.Tags()["city"] "city": "Toronto",
require.True(t, ok) },
require.EqualValues(t, "Toronto", city) map[string]interface{}{
"population": 6000000,
val, ok := m.Fields()["population"] "count": 2,
require.True(t, ok) },
require.EqualValues(t, 6000000, val) orig,
)
val, ok = m.Fields()["count"] testutil.RequireMetricEqual(t, expected, m)
require.True(t, ok)
require.EqualValues(t, 2, val)
metricTime := m.Time().UnixNano() metricTime := m.Time().UnixNano()