telegraf/plugins/inputs/execd/execd_test.go

190 lines
4.1 KiB
Go
Raw Normal View History

package execd
import (
2020-07-02 23:59:29 +08:00
"bufio"
"flag"
"fmt"
2020-07-02 23:59:29 +08:00
"os"
"strings"
"testing"
"time"
2020-11-23 23:40:32 +08:00
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/config"
2020-07-02 23:59:29 +08:00
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers"
2020-07-02 23:59:29 +08:00
"github.com/influxdata/telegraf/plugins/serializers"
2020-11-23 23:40:32 +08:00
"github.com/influxdata/telegraf/testutil"
)
func TestSettingConfigWorks(t *testing.T) {
cfg := `
[[inputs.execd]]
command = ["a", "b", "c"]
restart_delay = "1m"
signal = "SIGHUP"
`
conf := config.NewConfig()
require.NoError(t, conf.LoadConfigData([]byte(cfg)))
require.Len(t, conf.Inputs, 1)
inp, ok := conf.Inputs[0].Input.(*Execd)
require.True(t, ok)
require.EqualValues(t, []string{"a", "b", "c"}, inp.Command)
require.EqualValues(t, 1*time.Minute, inp.RestartDelay)
require.EqualValues(t, "SIGHUP", inp.Signal)
}
func TestExternalInputWorks(t *testing.T) {
2020-06-05 07:09:22 +08:00
influxParser, err := parsers.NewInfluxParser()
require.NoError(t, err)
2020-07-02 23:59:29 +08:00
exe, err := os.Executable()
require.NoError(t, err)
e := &Execd{
2020-07-02 23:59:29 +08:00
Command: []string{exe, "-counter"},
RestartDelay: config.Duration(5 * time.Second),
2020-06-05 07:09:22 +08:00
parser: influxParser,
Signal: "STDIN",
2020-07-02 23:59:29 +08:00
Log: testutil.Logger{},
}
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
acc := agent.NewAccumulator(&TestMetricMaker{}, metrics)
require.NoError(t, e.Start(acc))
require.NoError(t, e.Gather(acc))
// grab a metric and make sure it's a thing
m := readChanWithTimeout(t, metrics, 10*time.Second)
e.Stop()
2020-07-02 23:59:29 +08:00
require.Equal(t, "counter", m.Name())
val, ok := m.GetField("count")
require.True(t, ok)
2020-07-02 23:59:29 +08:00
require.EqualValues(t, 0, val)
}
func TestParsesLinesContainingNewline(t *testing.T) {
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
acc := agent.NewAccumulator(&TestMetricMaker{}, metrics)
e := &Execd{
RestartDelay: config.Duration(5 * time.Second),
parser: parser,
Signal: "STDIN",
acc: acc,
2020-07-02 23:59:29 +08:00
Log: testutil.Logger{},
}
cases := []struct {
Name string
Value string
}{
{
Name: "no-newline",
Value: "my message",
}, {
Name: "newline",
Value: "my\nmessage",
},
}
for _, test := range cases {
t.Run(test.Name, func(t *testing.T) {
line := fmt.Sprintf("event message=\"%v\" 1587128639239000000", test.Value)
e.cmdReadOut(strings.NewReader(line))
m := readChanWithTimeout(t, metrics, 1*time.Second)
require.Equal(t, "event", m.Name())
val, ok := m.GetField("message")
require.True(t, ok)
require.Equal(t, test.Value, val)
})
}
}
func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric {
to := time.NewTimer(timeout)
defer to.Stop()
select {
case m := <-metrics:
return m
case <-to.C:
require.FailNow(t, "timeout waiting for metric")
}
return nil
}
type TestMetricMaker struct{}
func (tm *TestMetricMaker) Name() string {
return "TestPlugin"
}
func (tm *TestMetricMaker) LogName() string {
return tm.Name()
}
func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric
}
func (tm *TestMetricMaker) Log() telegraf.Logger {
return models.NewLogger("TestPlugin", "test", "")
}
2020-07-02 23:59:29 +08:00
var counter = flag.Bool("counter", false,
"if true, act like line input program instead of test")
func TestMain(m *testing.M) {
flag.Parse()
if *counter {
runCounterProgram()
os.Exit(0)
}
code := m.Run()
os.Exit(code)
}
func runCounterProgram() {
i := 0
serializer, err := serializers.NewInfluxSerializer()
if err != nil {
fmt.Fprintln(os.Stderr, "ERR InfluxSerializer failed to load")
os.Exit(1)
}
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
metric, _ := metric.New("counter",
map[string]string{},
map[string]interface{}{
"count": i,
},
time.Now(),
)
i++
b, err := serializer.Serialize(metric)
if err != nil {
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
os.Exit(1)
}
fmt.Fprint(os.Stdout, string(b))
}
}