2020-05-05 02:09:10 +08:00
|
|
|
package execd
|
|
|
|
|
|
|
|
|
|
import (
|
2020-07-02 23:59:29 +08:00
|
|
|
"bufio"
|
|
|
|
|
"flag"
|
2020-05-06 05:43:45 +08:00
|
|
|
"fmt"
|
2020-07-02 23:59:29 +08:00
|
|
|
"os"
|
2020-05-06 05:43:45 +08:00
|
|
|
"strings"
|
2020-05-05 02:09:10 +08:00
|
|
|
"testing"
|
|
|
|
|
"time"
|
|
|
|
|
|
2020-11-23 23:40:32 +08:00
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
|
|
|
|
|
|
"github.com/influxdata/telegraf"
|
2020-05-05 02:09:10 +08:00
|
|
|
"github.com/influxdata/telegraf/agent"
|
|
|
|
|
"github.com/influxdata/telegraf/config"
|
2020-07-02 23:59:29 +08:00
|
|
|
"github.com/influxdata/telegraf/metric"
|
2020-05-05 02:09:10 +08:00
|
|
|
"github.com/influxdata/telegraf/models"
|
|
|
|
|
"github.com/influxdata/telegraf/plugins/parsers"
|
2020-07-02 23:59:29 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/serializers"
|
2020-11-23 23:40:32 +08:00
|
|
|
"github.com/influxdata/telegraf/testutil"
|
2020-05-05 02:09:10 +08:00
|
|
|
)
|
|
|
|
|
|
2020-07-22 01:06:33 +08:00
|
|
|
func TestSettingConfigWorks(t *testing.T) {
|
|
|
|
|
cfg := `
|
|
|
|
|
[[inputs.execd]]
|
|
|
|
|
command = ["a", "b", "c"]
|
|
|
|
|
restart_delay = "1m"
|
|
|
|
|
signal = "SIGHUP"
|
|
|
|
|
`
|
|
|
|
|
conf := config.NewConfig()
|
|
|
|
|
require.NoError(t, conf.LoadConfigData([]byte(cfg)))
|
|
|
|
|
|
|
|
|
|
require.Len(t, conf.Inputs, 1)
|
|
|
|
|
inp, ok := conf.Inputs[0].Input.(*Execd)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
require.EqualValues(t, []string{"a", "b", "c"}, inp.Command)
|
|
|
|
|
require.EqualValues(t, 1*time.Minute, inp.RestartDelay)
|
|
|
|
|
require.EqualValues(t, "SIGHUP", inp.Signal)
|
|
|
|
|
}
|
|
|
|
|
|
2020-05-05 02:09:10 +08:00
|
|
|
func TestExternalInputWorks(t *testing.T) {
|
2020-06-05 07:09:22 +08:00
|
|
|
influxParser, err := parsers.NewInfluxParser()
|
2020-05-05 02:09:10 +08:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
2020-07-02 23:59:29 +08:00
|
|
|
exe, err := os.Executable()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
2020-05-05 02:09:10 +08:00
|
|
|
e := &Execd{
|
2020-07-02 23:59:29 +08:00
|
|
|
Command: []string{exe, "-counter"},
|
2020-05-05 02:09:10 +08:00
|
|
|
RestartDelay: config.Duration(5 * time.Second),
|
2020-06-05 07:09:22 +08:00
|
|
|
parser: influxParser,
|
2020-05-05 02:09:10 +08:00
|
|
|
Signal: "STDIN",
|
2020-07-02 23:59:29 +08:00
|
|
|
Log: testutil.Logger{},
|
2020-05-05 02:09:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metrics := make(chan telegraf.Metric, 10)
|
|
|
|
|
defer close(metrics)
|
|
|
|
|
acc := agent.NewAccumulator(&TestMetricMaker{}, metrics)
|
|
|
|
|
|
|
|
|
|
require.NoError(t, e.Start(acc))
|
|
|
|
|
require.NoError(t, e.Gather(acc))
|
|
|
|
|
|
|
|
|
|
// grab a metric and make sure it's a thing
|
|
|
|
|
m := readChanWithTimeout(t, metrics, 10*time.Second)
|
|
|
|
|
|
2020-05-05 22:14:57 +08:00
|
|
|
e.Stop()
|
|
|
|
|
|
2020-07-02 23:59:29 +08:00
|
|
|
require.Equal(t, "counter", m.Name())
|
2020-05-05 02:09:10 +08:00
|
|
|
val, ok := m.GetField("count")
|
|
|
|
|
require.True(t, ok)
|
2020-07-02 23:59:29 +08:00
|
|
|
require.EqualValues(t, 0, val)
|
2020-05-05 02:09:10 +08:00
|
|
|
}
|
|
|
|
|
|
2020-05-06 05:43:45 +08:00
|
|
|
func TestParsesLinesContainingNewline(t *testing.T) {
|
|
|
|
|
parser, err := parsers.NewInfluxParser()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
metrics := make(chan telegraf.Metric, 10)
|
|
|
|
|
defer close(metrics)
|
|
|
|
|
acc := agent.NewAccumulator(&TestMetricMaker{}, metrics)
|
|
|
|
|
|
|
|
|
|
e := &Execd{
|
|
|
|
|
RestartDelay: config.Duration(5 * time.Second),
|
|
|
|
|
parser: parser,
|
|
|
|
|
Signal: "STDIN",
|
|
|
|
|
acc: acc,
|
2020-07-02 23:59:29 +08:00
|
|
|
Log: testutil.Logger{},
|
2020-05-06 05:43:45 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cases := []struct {
|
|
|
|
|
Name string
|
|
|
|
|
Value string
|
|
|
|
|
}{
|
|
|
|
|
{
|
|
|
|
|
Name: "no-newline",
|
|
|
|
|
Value: "my message",
|
|
|
|
|
}, {
|
|
|
|
|
Name: "newline",
|
|
|
|
|
Value: "my\nmessage",
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, test := range cases {
|
|
|
|
|
t.Run(test.Name, func(t *testing.T) {
|
|
|
|
|
line := fmt.Sprintf("event message=\"%v\" 1587128639239000000", test.Value)
|
|
|
|
|
|
|
|
|
|
e.cmdReadOut(strings.NewReader(line))
|
|
|
|
|
|
|
|
|
|
m := readChanWithTimeout(t, metrics, 1*time.Second)
|
|
|
|
|
|
|
|
|
|
require.Equal(t, "event", m.Name())
|
|
|
|
|
val, ok := m.GetField("message")
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
require.Equal(t, test.Value, val)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2020-05-05 02:09:10 +08:00
|
|
|
func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric {
|
|
|
|
|
to := time.NewTimer(timeout)
|
|
|
|
|
defer to.Stop()
|
|
|
|
|
select {
|
|
|
|
|
case m := <-metrics:
|
|
|
|
|
return m
|
|
|
|
|
case <-to.C:
|
|
|
|
|
require.FailNow(t, "timeout waiting for metric")
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type TestMetricMaker struct{}
|
|
|
|
|
|
|
|
|
|
func (tm *TestMetricMaker) Name() string {
|
|
|
|
|
return "TestPlugin"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tm *TestMetricMaker) LogName() string {
|
|
|
|
|
return tm.Name()
|
|
|
|
|
}
|
|
|
|
|
|
2021-06-21 23:07:52 +08:00
|
|
|
func (tm *TestMetricMaker) MakeMetric(aMetric telegraf.Metric) telegraf.Metric {
|
|
|
|
|
return aMetric
|
2020-05-05 02:09:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tm *TestMetricMaker) Log() telegraf.Logger {
|
|
|
|
|
return models.NewLogger("TestPlugin", "test", "")
|
|
|
|
|
}
|
2020-07-02 23:59:29 +08:00
|
|
|
|
|
|
|
|
var counter = flag.Bool("counter", false,
|
|
|
|
|
"if true, act like line input program instead of test")
|
|
|
|
|
|
|
|
|
|
func TestMain(m *testing.M) {
|
|
|
|
|
flag.Parse()
|
|
|
|
|
if *counter {
|
2021-04-23 05:08:03 +08:00
|
|
|
if err := runCounterProgram(); err != nil {
|
|
|
|
|
os.Exit(1)
|
|
|
|
|
}
|
2020-07-02 23:59:29 +08:00
|
|
|
os.Exit(0)
|
|
|
|
|
}
|
|
|
|
|
code := m.Run()
|
|
|
|
|
os.Exit(code)
|
|
|
|
|
}
|
|
|
|
|
|
2021-04-23 05:08:03 +08:00
|
|
|
func runCounterProgram() error {
|
2020-07-02 23:59:29 +08:00
|
|
|
i := 0
|
|
|
|
|
serializer, err := serializers.NewInfluxSerializer()
|
|
|
|
|
if err != nil {
|
2021-04-23 05:08:03 +08:00
|
|
|
//nolint:errcheck,revive // Test will fail anyway
|
2020-07-02 23:59:29 +08:00
|
|
|
fmt.Fprintln(os.Stderr, "ERR InfluxSerializer failed to load")
|
2021-04-23 05:08:03 +08:00
|
|
|
return err
|
2020-07-02 23:59:29 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
scanner := bufio.NewScanner(os.Stdin)
|
|
|
|
|
for scanner.Scan() {
|
2021-04-14 02:40:03 +08:00
|
|
|
m := metric.New("counter",
|
2020-07-02 23:59:29 +08:00
|
|
|
map[string]string{},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"count": i,
|
|
|
|
|
},
|
|
|
|
|
time.Now(),
|
|
|
|
|
)
|
|
|
|
|
i++
|
|
|
|
|
|
2021-04-14 02:40:03 +08:00
|
|
|
b, err := serializer.Serialize(m)
|
2020-07-02 23:59:29 +08:00
|
|
|
if err != nil {
|
2021-04-23 05:08:03 +08:00
|
|
|
//nolint:errcheck,revive // Test will fail anyway
|
2020-07-02 23:59:29 +08:00
|
|
|
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
|
2021-04-23 05:08:03 +08:00
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if _, err := fmt.Fprint(os.Stdout, string(b)); err != nil {
|
|
|
|
|
return err
|
2020-07-02 23:59:29 +08:00
|
|
|
}
|
|
|
|
|
}
|
2021-04-23 05:08:03 +08:00
|
|
|
return nil
|
2020-07-02 23:59:29 +08:00
|
|
|
}
|