diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index ae13081bf..cd34d7954 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -19,6 +19,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/nagios" @@ -29,16 +30,22 @@ var sampleConfig string const MaxStderrBytes int = 512 +type exitcodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric + type Exec struct { Commands []string `toml:"commands"` Command string `toml:"command"` Environment []string `toml:"environment"` Timeout config.Duration `toml:"timeout"` + Log telegraf.Logger `toml:"-"` parser parsers.Parser runner Runner - Log telegraf.Logger `toml:"-"` + + // Allow post processing of command exit codes + exitcodeHandler exitcodeHandlerFunc + parseDespiteError bool } func NewExec() *Exec { @@ -134,10 +141,9 @@ func (*Exec) SampleConfig() string { func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) { defer wg.Done() - _, isNagios := e.parser.(*nagios.Parser) out, errBuf, runErr := e.runner.Run(command, e.Environment, time.Duration(e.Timeout)) - if !isNagios && runErr != nil { + if !e.parseDespiteError && runErr != nil { err := fmt.Errorf("exec: %s for command '%s': %s", runErr, command, string(errBuf)) acc.AddError(err) return @@ -149,8 +155,8 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync return } - if isNagios { - metrics = nagios.AddState(runErr, errBuf, metrics) + if e.exitcodeHandler != nil { + metrics = e.exitcodeHandler(metrics, runErr, errBuf) } for _, m := range metrics { @@ -160,6 +166,13 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync func (e *Exec) SetParser(parser parsers.Parser) { e.parser = parser + unwrapped, ok := parser.(*models.RunningParser) + if ok { + if _, ok := unwrapped.Parser.(*nagios.Parser); ok { + e.exitcodeHandler = nagiosHandler + e.parseDespiteError = true + } + } } func (e *Exec) Gather(acc telegraf.Accumulator) error { @@ -213,6 +226,10 @@ func (e *Exec) Init() error { return nil } +func nagiosHandler(metrics []telegraf.Metric, err error, msg []byte) []telegraf.Metric { + return nagios.AddState(err, msg, metrics) +} + func init() { inputs.Add("exec", func() telegraf.Input { return NewExec() diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index 63f3179c0..813379875 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -13,10 +13,10 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal/process" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/influx" - "github.com/influxdata/telegraf/plugins/parsers/prometheus" ) //go:embed sample.conf @@ -29,9 +29,10 @@ type Execd struct { RestartDelay config.Duration `toml:"restart_delay"` Log telegraf.Logger `toml:"-"` - process *process.Process - acc telegraf.Accumulator - parser parsers.Parser + process *process.Process + acc telegraf.Accumulator + parser parsers.Parser + outputReader func(io.Reader) } func (*Execd) SampleConfig() string { @@ -40,6 +41,14 @@ func (*Execd) SampleConfig() string { func (e *Execd) SetParser(parser parsers.Parser) { e.parser = parser + e.outputReader = e.cmdReadOut + + unwrapped, ok := parser.(*models.RunningParser) + if ok { + if _, ok := unwrapped.Parser.(*influx.Parser); ok { + e.outputReader = e.cmdReadOutStream + } + } } func (e *Execd) Start(acc telegraf.Accumulator) error { @@ -51,7 +60,7 @@ func (e *Execd) Start(acc telegraf.Accumulator) error { } e.process.Log = e.Log e.process.RestartDelay = time.Duration(e.RestartDelay) - e.process.ReadStdoutFn = e.cmdReadOut + e.process.ReadStdoutFn = e.outputReader e.process.ReadStderrFn = e.cmdReadErr if err = e.process.Start(); err != nil { @@ -73,22 +82,10 @@ func (e *Execd) Stop() { } func (e *Execd) cmdReadOut(out io.Reader) { - if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser { - // work around the lack of built-in streaming parser. :( - e.cmdReadOutStream(out) - return - } - - _, isPrometheus := e.parser.(*prometheus.Parser) - scanner := bufio.NewScanner(out) for scanner.Scan() { data := scanner.Bytes() - if isPrometheus { - data = append(data, []byte("\n")...) - } - metrics, err := e.parser.Parse(data) if err != nil { e.acc.AddError(fmt.Errorf("parse error: %w", err)) diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index 500107e4d..45879551f 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/parsers/prometheus" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" ) @@ -42,7 +43,7 @@ func TestSettingConfigWorks(t *testing.T) { } func TestExternalInputWorks(t *testing.T) { - influxParser := &influx.Parser{} + influxParser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{}) require.NoError(t, influxParser.Init()) exe, err := os.Executable() @@ -52,10 +53,10 @@ func TestExternalInputWorks(t *testing.T) { Command: []string{exe, "-counter"}, Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application", "METRIC_NAME=counter"}, RestartDelay: config.Duration(5 * time.Second), - parser: influxParser, Signal: "STDIN", Log: testutil.Logger{}, } + e.SetParser(influxParser) metrics := make(chan telegraf.Metric, 10) defer close(metrics) @@ -76,7 +77,7 @@ func TestExternalInputWorks(t *testing.T) { } func TestParsesLinesContainingNewline(t *testing.T) { - parser := &influx.Parser{} + parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{}) require.NoError(t, parser.Init()) metrics := make(chan telegraf.Metric, 10) @@ -85,11 +86,11 @@ func TestParsesLinesContainingNewline(t *testing.T) { e := &Execd{ RestartDelay: config.Duration(5 * time.Second), - parser: parser, Signal: "STDIN", acc: acc, Log: testutil.Logger{}, } + e.SetParser(parser) cases := []struct { Name string @@ -108,7 +109,7 @@ func TestParsesLinesContainingNewline(t *testing.T) { t.Run(test.Name, func(t *testing.T) { line := fmt.Sprintf("event message=\"%v\" 1587128639239000000", test.Value) - e.cmdReadOut(strings.NewReader(line)) + e.outputReader(strings.NewReader(line)) m := readChanWithTimeout(t, metrics, 1*time.Second) @@ -120,6 +121,43 @@ func TestParsesLinesContainingNewline(t *testing.T) { } } +func TestParsesPrometheus(t *testing.T) { + parser := models.NewRunningParser(&prometheus.Parser{}, &models.ParserConfig{}) + require.NoError(t, parser.Init()) + + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + + var acc testutil.Accumulator + + e := &Execd{ + RestartDelay: config.Duration(5 * time.Second), + Signal: "STDIN", + acc: &acc, + Log: testutil.Logger{}, + } + e.SetParser(parser) + + lines := `# HELP This is just a test metric. +# TYPE test summary +test{handler="execd",quantile="0.5"} 42.0 +` + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{"handler": "execd", "quantile": "0.5"}, + map[string]interface{}{"test": float64(42.0)}, + time.Unix(0, 0), + ), + } + + e.outputReader(strings.NewReader(lines)) + check := func() bool { return acc.NMetrics() == uint64(len(expected)) } + require.Eventually(t, check, 1*time.Second, 100*time.Millisecond) + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) +} + func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric { to := time.NewTimer(timeout) defer to.Stop() diff --git a/plugins/parsers/prometheus/parser.go b/plugins/parsers/prometheus/parser.go index fb34311f2..c4b03d8e9 100644 --- a/plugins/parsers/prometheus/parser.go +++ b/plugins/parsers/prometheus/parser.go @@ -30,8 +30,13 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { var parser expfmt.TextParser var metrics []telegraf.Metric var err error - // parse even if the buffer begins with a newline + + // Make sure we have a finishing newline but no trailing one buf = bytes.TrimPrefix(buf, []byte("\n")) + if !bytes.HasSuffix(buf, []byte("\n")) { + buf = append(buf, []byte("\n")...) + } + // Read raw data buffer := bytes.NewBuffer(buf) reader := bufio.NewReader(buffer)