feat(outputs.exec): Add ability to exec command once per metric (#13672)
This commit is contained in:
parent
a8af81c8c9
commit
7673624bcd
|
|
@ -38,6 +38,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## Timeout for command to complete.
|
## Timeout for command to complete.
|
||||||
# timeout = "5s"
|
# timeout = "5s"
|
||||||
|
|
||||||
|
## Whether the command gets executed once per metric, or once per metric batch
|
||||||
|
## The serializer will also run in batch mode when this is true.
|
||||||
|
# use_batch_format = true
|
||||||
|
|
||||||
## Data format to output.
|
## Data format to output.
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Each data format has its own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
|
||||||
|
|
@ -26,10 +26,11 @@ const maxStderrBytes = 512
|
||||||
|
|
||||||
// Exec defines the exec output plugin.
|
// Exec defines the exec output plugin.
|
||||||
type Exec struct {
|
type Exec struct {
|
||||||
Command []string `toml:"command"`
|
Command []string `toml:"command"`
|
||||||
Environment []string `toml:"environment"`
|
Environment []string `toml:"environment"`
|
||||||
Timeout config.Duration `toml:"timeout"`
|
Timeout config.Duration `toml:"timeout"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
UseBatchFormat bool `toml:"use_batch_format"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
runner Runner
|
runner Runner
|
||||||
serializer serializers.Serializer
|
serializer serializers.Serializer
|
||||||
|
|
@ -63,17 +64,32 @@ func (e *Exec) Close() error {
|
||||||
// Write writes the metrics to the configured command.
|
// Write writes the metrics to the configured command.
|
||||||
func (e *Exec) Write(metrics []telegraf.Metric) error {
|
func (e *Exec) Write(metrics []telegraf.Metric) error {
|
||||||
var buffer bytes.Buffer
|
var buffer bytes.Buffer
|
||||||
serializedMetrics, err := e.serializer.SerializeBatch(metrics)
|
if e.UseBatchFormat {
|
||||||
if err != nil {
|
serializedMetrics, err := e.serializer.SerializeBatch(metrics)
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
buffer.Write(serializedMetrics)
|
}
|
||||||
|
buffer.Write(serializedMetrics)
|
||||||
|
|
||||||
if buffer.Len() <= 0 {
|
if buffer.Len() <= 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.runner.Run(time.Duration(e.Timeout), e.Command, e.Environment, &buffer)
|
return e.runner.Run(time.Duration(e.Timeout), e.Command, e.Environment, &buffer)
|
||||||
|
}
|
||||||
|
errs := make([]error, 0, len(metrics))
|
||||||
|
for _, metric := range metrics {
|
||||||
|
serializedMetric, err := e.serializer.Serialize(metric)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
buffer.Reset()
|
||||||
|
buffer.Write(serializedMetric)
|
||||||
|
|
||||||
|
err = e.runner.Run(time.Duration(e.Timeout), e.Command, e.Environment, &buffer)
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
return errors.Join(errs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runner provides an interface for running exec.Cmd.
|
// Runner provides an interface for running exec.Cmd.
|
||||||
|
|
@ -149,7 +165,8 @@ func (c *CommandRunner) truncate(buf bytes.Buffer) string {
|
||||||
func init() {
|
func init() {
|
||||||
outputs.Add("exec", func() telegraf.Output {
|
outputs.Add("exec", func() telegraf.Output {
|
||||||
return &Exec{
|
return &Exec{
|
||||||
Timeout: config.Duration(time.Second * 5),
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
UseBatchFormat: true,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,9 @@ package exec
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -10,10 +13,90 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
influxParser "github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var now = time.Date(2020, 6, 30, 16, 16, 0, 0, time.UTC)
|
||||||
|
|
||||||
|
type MockRunner struct {
|
||||||
|
runs []int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run runs the command.
|
||||||
|
func (c *MockRunner) Run(timeout time.Duration, command []string, environments []string, buffer io.Reader) error {
|
||||||
|
parser := influxParser.NewStreamParser(buffer)
|
||||||
|
numMetrics := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
_, err := parser.Next()
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, influxParser.EOF) {
|
||||||
|
break // stream ended
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
numMetrics++
|
||||||
|
}
|
||||||
|
|
||||||
|
c.runs = append(c.runs, numMetrics)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExternalOutputBatch(t *testing.T) {
|
||||||
|
serializer := &influx.Serializer{}
|
||||||
|
require.NoError(t, serializer.Init())
|
||||||
|
|
||||||
|
runner := MockRunner{}
|
||||||
|
|
||||||
|
e := &Exec{
|
||||||
|
UseBatchFormat: true,
|
||||||
|
serializer: serializer,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
runner: &runner,
|
||||||
|
}
|
||||||
|
|
||||||
|
m := metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"name": "cpu1"},
|
||||||
|
map[string]interface{}{"idle": 50, "sys": 30},
|
||||||
|
now,
|
||||||
|
)
|
||||||
|
|
||||||
|
require.NoError(t, e.Connect())
|
||||||
|
require.NoError(t, e.Write([]telegraf.Metric{m, m}))
|
||||||
|
// Make sure it executed the command once, with 2 metrics
|
||||||
|
require.Equal(t, []int{2}, runner.runs)
|
||||||
|
require.NoError(t, e.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExternalOutputNoBatch(t *testing.T) {
|
||||||
|
serializer := &influx.Serializer{}
|
||||||
|
require.NoError(t, serializer.Init())
|
||||||
|
runner := MockRunner{}
|
||||||
|
|
||||||
|
e := &Exec{
|
||||||
|
UseBatchFormat: false,
|
||||||
|
serializer: serializer,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
runner: &runner,
|
||||||
|
}
|
||||||
|
|
||||||
|
m := metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"name": "cpu1"},
|
||||||
|
map[string]interface{}{"idle": 50, "sys": 30},
|
||||||
|
now,
|
||||||
|
)
|
||||||
|
|
||||||
|
require.NoError(t, e.Connect())
|
||||||
|
require.NoError(t, e.Write([]telegraf.Metric{m, m}))
|
||||||
|
// Make sure it executed the command twice, both with a single metric
|
||||||
|
require.Equal(t, []int{1, 1}, runner.runs)
|
||||||
|
require.NoError(t, e.Close())
|
||||||
|
}
|
||||||
|
|
||||||
func TestExec(t *testing.T) {
|
func TestExec(t *testing.T) {
|
||||||
t.Skip("Skipping test due to OS/executable dependencies and race condition when ran as part of a test-all")
|
t.Skip("Skipping test due to OS/executable dependencies and race condition when ran as part of a test-all")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,10 @@
|
||||||
## Timeout for command to complete.
|
## Timeout for command to complete.
|
||||||
# timeout = "5s"
|
# timeout = "5s"
|
||||||
|
|
||||||
|
## Whether the command gets executed once per metric, or once per metric batch
|
||||||
|
## The serializer will also run in batch mode when this is true.
|
||||||
|
# use_batch_format = true
|
||||||
|
|
||||||
## Data format to output.
|
## Data format to output.
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Each data format has its own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue