Execd processor (#7640)

This commit is contained in:
Steven Soroka 2020-06-26 18:18:19 -04:00 committed by GitHub
commit d75ca67e47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 700 additions and 163 deletions

View File

@ -510,6 +510,7 @@ func (a *Agent) runProcessors(
for m := range unit.src {
if err := unit.processor.Add(m, acc); err != nil {
acc.AddError(err)
m.Drop()
}
}
unit.processor.Stop()

165
internal/process/process.go Normal file
View File

@ -0,0 +1,165 @@
package process
import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"os/exec"
"sync"
"time"
"github.com/influxdata/telegraf"
)
// Process is a long-running process manager that will restart processes if they stop.
type Process struct {
Cmd *exec.Cmd
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
ReadStdoutFn func(io.Reader)
ReadStderrFn func(io.Reader)
RestartDelay time.Duration
Log telegraf.Logger
cancel context.CancelFunc
mainLoopWg sync.WaitGroup
}
// New creates a new process wrapper
func New(command []string) (*Process, error) {
if len(command) == 0 {
return nil, errors.New("no command")
}
p := &Process{
RestartDelay: 5 * time.Second,
}
if len(command) > 1 {
p.Cmd = exec.Command(command[0], command[1:]...)
} else {
p.Cmd = exec.Command(command[0])
}
var err error
p.Stdin, err = p.Cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("error opening stdin pipe: %w", err)
}
p.Stdout, err = p.Cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("error opening stdout pipe: %w", err)
}
p.Stderr, err = p.Cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("error opening stderr pipe: %w", err)
}
return p, nil
}
// Start the process
func (p *Process) Start() error {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
if err := p.cmdStart(); err != nil {
return err
}
p.mainLoopWg.Add(1)
go func() {
if err := p.cmdLoop(ctx); err != nil {
p.Log.Errorf("Process quit with message: %v", err)
}
p.mainLoopWg.Done()
}()
return nil
}
func (p *Process) Stop() {
if p.cancel != nil {
p.cancel()
}
p.mainLoopWg.Wait()
}
func (p *Process) cmdStart() error {
p.Log.Infof("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args)
if err := p.Cmd.Start(); err != nil {
return fmt.Errorf("error starting process: %s", err)
}
return nil
}
// cmdLoop watches an already running process, restarting it when appropriate.
func (p *Process) cmdLoop(ctx context.Context) error {
go func() {
<-ctx.Done()
if p.Stdin != nil {
p.Stdin.Close()
gracefulStop(p.Cmd, 5*time.Second)
}
}()
for {
err := p.cmdWait()
if isQuitting(ctx) {
p.Log.Infof("Process %s shut down", p.Cmd.Path)
return nil
}
p.Log.Errorf("Process %s exited: %v", p.Cmd.Path, err)
p.Log.Infof("Restarting in %s...", time.Duration(p.RestartDelay))
select {
case <-ctx.Done():
return nil
case <-time.After(time.Duration(p.RestartDelay)):
// Continue the loop and restart the process
if err := p.cmdStart(); err != nil {
return err
}
}
}
}
func (p *Process) cmdWait() error {
var wg sync.WaitGroup
if p.ReadStdoutFn == nil {
p.ReadStdoutFn = defaultReadPipe
}
if p.ReadStderrFn == nil {
p.ReadStderrFn = defaultReadPipe
}
wg.Add(1)
go func() {
p.ReadStdoutFn(p.Stdout)
wg.Done()
}()
wg.Add(1)
go func() {
p.ReadStderrFn(p.Stderr)
wg.Done()
}()
wg.Wait()
return p.Cmd.Wait()
}
func isQuitting(ctx context.Context) bool {
return ctx.Err() != nil
}
func defaultReadPipe(r io.Reader) {
io.Copy(ioutil.Discard, r)
}

View File

@ -0,0 +1,28 @@
// +build !windows
package process
import (
"os/exec"
"syscall"
"time"
)
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
})
}
})
}

View File

@ -0,0 +1,19 @@
// +build windows
package process
import (
"os/exec"
"time"
)
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
})
}

View File

@ -2,16 +2,14 @@ package execd
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"log"
"os/exec"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/process"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
@ -45,15 +43,11 @@ type Execd struct {
Command []string
Signal string
RestartDelay config.Duration
Log telegraf.Logger
acc telegraf.Accumulator
cmd *exec.Cmd
parser parsers.Parser
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
cancel context.CancelFunc
mainLoopWg sync.WaitGroup
process *process.Process
acc telegraf.Accumulator
parser parsers.Parser
}
func (e *Execd) SampleConfig() string {
@ -70,131 +64,25 @@ func (e *Execd) SetParser(parser parsers.Parser) {
func (e *Execd) Start(acc telegraf.Accumulator) error {
e.acc = acc
if len(e.Command) == 0 {
return fmt.Errorf("FATAL no command specified")
var err error
e.process, err = process.New(e.Command)
if err != nil {
return fmt.Errorf("error creating new process: %w", err)
}
e.process.Log = e.Log
e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.ReadStdoutFn = e.cmdReadOut
e.process.ReadStderrFn = e.cmdReadErr
e.mainLoopWg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel
if err := e.cmdStart(); err != nil {
return err
if err = e.process.Start(); err != nil {
return fmt.Errorf("failed to start process %s: %w", e.Command, err)
}
go func() {
if err := e.cmdLoop(ctx); err != nil {
log.Printf("Process quit with message: %s", err.Error())
}
e.mainLoopWg.Done()
}()
return nil
}
func (e *Execd) Stop() {
// don't try to stop before all stream readers have started.
e.cancel()
e.mainLoopWg.Wait()
}
// cmdLoop watches an already running process, restarting it when appropriate.
func (e *Execd) cmdLoop(ctx context.Context) error {
for {
// Use a buffered channel to ensure goroutine below can exit
// if `ctx.Done` is selected and nothing reads on `done` anymore
done := make(chan error, 1)
go func() {
done <- e.cmdWait()
}()
select {
case <-ctx.Done():
if e.stdin != nil {
e.stdin.Close()
gracefulStop(e.cmd, 5*time.Second)
}
return nil
case err := <-done:
log.Printf("Process %s terminated: %s", e.Command, err)
if isQuitting(ctx) {
return err
}
}
log.Printf("Restarting in %s...", time.Duration(e.RestartDelay))
select {
case <-ctx.Done():
return nil
case <-time.After(time.Duration(e.RestartDelay)):
// Continue the loop and restart the process
if err := e.cmdStart(); err != nil {
return err
}
}
}
}
func isQuitting(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
func (e *Execd) cmdStart() (err error) {
if len(e.Command) > 1 {
e.cmd = exec.Command(e.Command[0], e.Command[1:]...)
} else {
e.cmd = exec.Command(e.Command[0])
}
e.stdin, err = e.cmd.StdinPipe()
if err != nil {
return fmt.Errorf("Error opening stdin pipe: %s", err)
}
e.stdout, err = e.cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("Error opening stdout pipe: %s", err)
}
e.stderr, err = e.cmd.StderrPipe()
if err != nil {
return fmt.Errorf("Error opening stderr pipe: %s", err)
}
log.Printf("Starting process: %s", e.Command)
err = e.cmd.Start()
if err != nil {
return fmt.Errorf("Error starting process: %s", err)
}
return nil
}
func (e *Execd) cmdWait() error {
var wg sync.WaitGroup
wg.Add(2)
go func() {
e.cmdReadOut(e.stdout)
wg.Done()
}()
go func() {
e.cmdReadErr(e.stderr)
wg.Done()
}()
wg.Wait()
return e.cmd.Wait()
e.process.Stop()
}
func (e *Execd) cmdReadOut(out io.Reader) {
@ -209,7 +97,7 @@ func (e *Execd) cmdReadOut(out io.Reader) {
for scanner.Scan() {
metrics, err := e.parser.Parse(scanner.Bytes())
if err != nil {
e.acc.AddError(fmt.Errorf("Parse error: %s", err))
e.acc.AddError(fmt.Errorf("parse error: %w", err))
}
for _, metric := range metrics {
@ -218,7 +106,7 @@ func (e *Execd) cmdReadOut(out io.Reader) {
}
if err := scanner.Err(); err != nil {
e.acc.AddError(fmt.Errorf("Error reading stdout: %s", err))
e.acc.AddError(fmt.Errorf("error reading stdout: %w", err))
}
}
@ -249,14 +137,21 @@ func (e *Execd) cmdReadErr(out io.Reader) {
scanner := bufio.NewScanner(out)
for scanner.Scan() {
log.Printf("stderr: %q", scanner.Text())
e.Log.Errorf("stderr: %q", scanner.Text())
}
if err := scanner.Err(); err != nil {
e.acc.AddError(fmt.Errorf("Error reading stderr: %s", err))
e.acc.AddError(fmt.Errorf("error reading stderr: %w", err))
}
}
func (e *Execd) Init() error {
if len(e.Command) == 0 {
return errors.New("no command specified")
}
return nil
}
func init() {
inputs.Add("execd", func() telegraf.Input {
return &Execd{

View File

@ -6,7 +6,6 @@ import (
"fmt"
"io"
"os"
"os/exec"
"syscall"
"time"
@ -14,22 +13,26 @@ import (
)
func (e *Execd) Gather(acc telegraf.Accumulator) error {
if e.cmd == nil || e.cmd.Process == nil {
if e.process == nil || e.process.Cmd == nil {
return nil
}
osProcess := e.process.Cmd.Process
if osProcess == nil {
return nil
}
switch e.Signal {
case "SIGHUP":
e.cmd.Process.Signal(syscall.SIGHUP)
osProcess.Signal(syscall.SIGHUP)
case "SIGUSR1":
e.cmd.Process.Signal(syscall.SIGUSR1)
osProcess.Signal(syscall.SIGUSR1)
case "SIGUSR2":
e.cmd.Process.Signal(syscall.SIGUSR2)
osProcess.Signal(syscall.SIGUSR2)
case "STDIN":
if osStdin, ok := e.stdin.(*os.File); ok {
if osStdin, ok := e.process.Stdin.(*os.File); ok {
osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second))
}
if _, err := io.WriteString(e.stdin, "\n"); err != nil {
if _, err := io.WriteString(e.process.Stdin, "\n"); err != nil {
return fmt.Errorf("Error writing to stdin: %s", err)
}
case "none":
@ -39,11 +42,3 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error {
return nil
}
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
cmd.Process.Signal(syscall.SIGTERM)
go func() {
<-time.NewTimer(timeout).C
cmd.Process.Kill()
}()
}

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers"
@ -19,15 +20,16 @@ import (
)
func TestExternalInputWorks(t *testing.T) {
jsonParser, err := parsers.NewInfluxParser()
influxParser, err := parsers.NewInfluxParser()
require.NoError(t, err)
e := &Execd{
Command: []string{shell(), fileShellScriptPath()},
RestartDelay: config.Duration(5 * time.Second),
parser: jsonParser,
parser: influxParser,
Signal: "STDIN",
}
e.Log = testutil.Logger{}
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
@ -64,6 +66,7 @@ func TestParsesLinesContainingNewline(t *testing.T) {
Signal: "STDIN",
acc: acc,
}
e.Log = testutil.Logger{}
cases := []struct {
Name string

View File

@ -6,23 +6,22 @@ import (
"fmt"
"io"
"os"
"os/exec"
"time"
"github.com/influxdata/telegraf"
)
func (e *Execd) Gather(acc telegraf.Accumulator) error {
if e.cmd == nil || e.cmd.Process == nil {
if e.process == nil {
return nil
}
switch e.Signal {
case "STDIN":
if osStdin, ok := e.stdin.(*os.File); ok {
if osStdin, ok := e.process.Stdin.(*os.File); ok {
osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second))
}
if _, err := io.WriteString(e.stdin, "\n"); err != nil {
if _, err := io.WriteString(e.process.Stdin, "\n"); err != nil {
return fmt.Errorf("Error writing to stdin: %s", err)
}
case "none":
@ -32,7 +31,3 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error {
return nil
}
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
cmd.Process.Kill()
}

View File

@ -235,10 +235,6 @@ func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool)
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
// Clear
acc.ClearMetrics()
acc.Errors = make([]error, 0)
// Connect
conn, err := net.Dial(protocol, address)
require.NotNil(t, conn)

View File

@ -7,6 +7,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/processors/dedup"
_ "github.com/influxdata/telegraf/plugins/processors/defaults"
_ "github.com/influxdata/telegraf/plugins/processors/enum"
_ "github.com/influxdata/telegraf/plugins/processors/execd"
_ "github.com/influxdata/telegraf/plugins/processors/filepath"
_ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/parser"

View File

@ -0,0 +1,110 @@
# Execd Processor Plugin
The `execd` processor plugin runs an external program as a separate process and
pipes metrics in to the process's STDIN and reads processed metrics from its STDOUT.
The programs must accept influx line protocol on standard in (STDIN) and output
metrics in influx line protocol to standard output (STDOUT).
Program output on standard error is mirrored to the telegraf log.
### Caveats
- Metrics with tracking will be considered "delivered" as soon as they are passed
to the external process. There is currently no way to match up which metric
coming out of the execd process relates to which metric going in (keep in mind
that processors can add and drop metrics, and that this is all done
asynchronously).
- it's not currently possible to use a data_format other than "influx", due to
the requirement that it is serialize-parse symmetrical and does not lose any
critical type data.
### Configuration:
```toml
[[processor.execd]]
## Program to run as daemon
## eg: command = ["/path/to/your_program", "arg1", "arg2"]
command = ["cat"]
## Delay before the process is restarted after an unexpected termination
# restart_delay = "10s"
```
### Example
#### Go daemon example
This go daemon reads a metric from stdin, multiplies the "count" field by 2,
and writes the metric back out.
```go
package main
import (
"fmt"
"os"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
)
func main() {
parser := influx.NewStreamParser(os.Stdin)
serializer, _ := serializers.NewInfluxSerializer()
for {
metric, err := parser.Next()
if err != nil {
if err == influx.EOF {
return // stream ended
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
os.Exit(1)
}
c, found := metric.GetField("count")
if !found {
fmt.Fprintf(os.Stderr, "metric has no count field\n")
os.Exit(1)
}
switch t := c.(type) {
case float64:
t *= 2
metric.AddField("count", t)
case int64:
t *= 2
metric.AddField("count", t)
default:
fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c)
os.Exit(1)
}
b, err := serializer.Serialize(metric)
if err != nil {
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
os.Exit(1)
}
fmt.Fprint(os.Stdout, string(b))
}
}
```
to run it, you'd build the binary using go, eg `go build -o multiplier.exe main.go`
```toml
[[processors.execd]]
command = ["multiplier.exe"]
```
#### Ruby daemon using SIGHUP
- See [Ruby daemon](./examples/multiplier_line_protocol/multiplier_line_protocol.rb)
```toml
[[processors.execd]]
command = ["ruby", "plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb"]
```

View File

@ -0,0 +1,14 @@
[agent]
interval = "10s"
[[inputs.execd]]
command = ["ruby", "plugins/inputs/execd/examples/count.rb"]
[[processors.execd]]
command = ["ruby", "plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb"]
[[outputs.file]]
files = ["stdout"]
data_format = "influx"

View File

@ -0,0 +1,27 @@
#!/usr/bin/env ruby
loop do
# example input: "counter_ruby count=0 1586302128978187000"
line = STDIN.readline.chomp
# parse out influx line protocol sections with a really simple hand-rolled parser that doesn't support escaping.
# for a full line parser in ruby, check out something like the influxdb-lineprotocol-parser gem.
parts = line.split(" ")
case parts.size
when 3
measurement, fields, timestamp = parts
when 4
measurement, tags, fields, timestamp = parts
else
STDERR.puts "Unable to parse line protocol"
exit 1
end
fields = fields.split(",").map{|t|
k,v = t.split("=")
if k == "count"
v = v.to_i * 2 # multiple count metric by two
end
"#{k}=#{v}"
}.join(",")
puts [measurement, tags, fields, timestamp].select{|s| s && s.size != 0 }.join(" ")
STDOUT.flush
end

View File

@ -0,0 +1,153 @@
package execd
import (
"bufio"
"errors"
"fmt"
"io"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/process"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
)
const sampleConfig = `
## Program to run as daemon
## eg: command = ["/path/to/your_program", "arg1", "arg2"]
command = ["cat"]
## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"
`
type Execd struct {
Command []string `toml:"command"`
RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger
parserConfig *parsers.Config
parser parsers.Parser
serializerConfig *serializers.Config
serializer serializers.Serializer
acc telegraf.Accumulator
process *process.Process
}
func New() *Execd {
return &Execd{
RestartDelay: config.Duration(10 * time.Second),
parserConfig: &parsers.Config{
DataFormat: "influx",
},
serializerConfig: &serializers.Config{
DataFormat: "influx",
},
}
}
func (e *Execd) SampleConfig() string {
return sampleConfig
}
func (e *Execd) Description() string {
return "Run executable as long-running processor plugin"
}
func (e *Execd) Start(acc telegraf.Accumulator) error {
var err error
e.parser, err = parsers.NewParser(e.parserConfig)
if err != nil {
return fmt.Errorf("error creating parser: %w", err)
}
e.serializer, err = serializers.NewSerializer(e.serializerConfig)
if err != nil {
return fmt.Errorf("error creating serializer: %w", err)
}
e.acc = acc
e.process, err = process.New(e.Command)
if err != nil {
return fmt.Errorf("error creating new process: %w", err)
}
e.process.Log = e.Log
e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.ReadStdoutFn = e.cmdReadOut
e.process.ReadStderrFn = e.cmdReadErr
if err = e.process.Start(); err != nil {
return fmt.Errorf("failed to start process %s: %w", e.Command, err)
}
return nil
}
func (e *Execd) Add(m telegraf.Metric, acc telegraf.Accumulator) error {
b, err := e.serializer.Serialize(m)
if err != nil {
return fmt.Errorf("metric serializing error: %w", err)
}
_, err = e.process.Stdin.Write(b)
if err != nil {
return fmt.Errorf("error writing to process stdin: %w", err)
}
// We cannot maintain tracking metrics at the moment because input/output
// is done asynchronously and we don't have any metric metadata to tie the
// output metric back to the original input metric.
m.Drop()
return nil
}
func (e *Execd) Stop() error {
e.process.Stop()
return nil
}
func (e *Execd) cmdReadOut(out io.Reader) {
scanner := bufio.NewScanner(out)
for scanner.Scan() {
metrics, err := e.parser.Parse(scanner.Bytes())
if err != nil {
e.Log.Errorf("Parse error: %s", err)
}
for _, metric := range metrics {
e.acc.AddMetric(metric)
}
}
if err := scanner.Err(); err != nil {
e.Log.Errorf("Error reading stdout: %s", err)
}
}
func (e *Execd) cmdReadErr(out io.Reader) {
scanner := bufio.NewScanner(out)
for scanner.Scan() {
e.Log.Errorf("stderr: %q", scanner.Text())
}
if err := scanner.Err(); err != nil {
e.Log.Errorf("Error reading stderr: %s", err)
}
}
func (e *Execd) Init() error {
if len(e.Command) == 0 {
return errors.New("no command specified")
}
return nil
}
func init() {
processors.AddStreaming("execd", func() telegraf.StreamingProcessor {
return New()
})
}

View File

@ -0,0 +1,135 @@
package execd
import (
"flag"
"fmt"
"os"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestExternalProcessorWorks(t *testing.T) {
e := New()
e.Log = testutil.Logger{}
exe, err := os.Executable()
require.NoError(t, err)
t.Log(exe)
e.Command = []string{exe, "-countmultiplier"}
e.RestartDelay = config.Duration(5 * time.Second)
acc := &testutil.Accumulator{}
require.NoError(t, e.Start(acc))
now := time.Now()
orig := now
metrics := []telegraf.Metric{}
for i := 0; i < 10; i++ {
m, err := metric.New("test",
map[string]string{
"city": "Toronto",
},
map[string]interface{}{
"population": 6000000,
"count": 1,
},
now)
require.NoError(t, err)
metrics = append(metrics, m)
now = now.Add(1)
e.Add(m, acc)
}
acc.Wait(1)
require.NoError(t, e.Stop())
metrics = acc.GetTelegrafMetrics()
m := metrics[0]
expected := testutil.MustMetric("test",
map[string]string{
"city": "Toronto",
},
map[string]interface{}{
"population": 6000000,
"count": 2,
},
orig,
)
testutil.RequireMetricEqual(t, expected, m)
metricTime := m.Time().UnixNano()
// read the other 9 and make sure they're ordered properly
for i := 0; i < 9; i++ {
m = metrics[i+1]
require.EqualValues(t, metricTime+1, m.Time().UnixNano())
metricTime = m.Time().UnixNano()
}
}
var countmultiplier = flag.Bool("countmultiplier", false,
"if true, act like line input program instead of test")
func TestMain(m *testing.M) {
flag.Parse()
if *countmultiplier {
runCountMultiplierProgram()
os.Exit(0)
}
code := m.Run()
os.Exit(code)
}
func runCountMultiplierProgram() {
parser := influx.NewStreamParser(os.Stdin)
serializer, _ := serializers.NewInfluxSerializer()
for {
metric, err := parser.Next()
if err != nil {
if err == influx.EOF {
return // stream ended
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
os.Exit(1)
}
c, found := metric.GetField("count")
if !found {
fmt.Fprintf(os.Stderr, "metric has no count field\n")
os.Exit(1)
}
switch t := c.(type) {
case float64:
t *= 2
metric.AddField("count", t)
case int64:
t *= 2
metric.AddField("count", t)
default:
fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c)
os.Exit(1)
}
b, err := serializer.Serialize(metric)
if err != nil {
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
os.Exit(1)
}
fmt.Fprint(os.Stdout, string(b))
}
}

View File

@ -323,13 +323,13 @@ func (a *Accumulator) NFields() int {
// Wait waits for the given number of metrics to be added to the accumulator.
func (a *Accumulator) Wait(n int) {
a.Lock()
defer a.Unlock()
if a.Cond == nil {
a.Cond = sync.NewCond(&a.Mutex)
}
for int(a.NMetrics()) < n {
a.Cond.Wait()
}
a.Unlock()
}
// WaitError waits for the given number of errors to be added to the accumulator.