fix(parsers): Unwrap parser and remove some special handling (#11826)

This commit is contained in:
Sven Rebhan 2022-09-19 17:57:24 +02:00 committed by GitHub
parent b3fc1b7631
commit 2b7cafcdbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 85 additions and 28 deletions

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
@ -29,16 +30,22 @@ var sampleConfig string
const MaxStderrBytes int = 512
type exitcodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric
type Exec struct {
Commands []string `toml:"commands"`
Command string `toml:"command"`
Environment []string `toml:"environment"`
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`
parser parsers.Parser
runner Runner
Log telegraf.Logger `toml:"-"`
// Allow post processing of command exit codes
exitcodeHandler exitcodeHandlerFunc
parseDespiteError bool
}
func NewExec() *Exec {
@ -134,10 +141,9 @@ func (*Exec) SampleConfig() string {
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer wg.Done()
_, isNagios := e.parser.(*nagios.Parser)
out, errBuf, runErr := e.runner.Run(command, e.Environment, time.Duration(e.Timeout))
if !isNagios && runErr != nil {
if !e.parseDespiteError && runErr != nil {
err := fmt.Errorf("exec: %s for command '%s': %s", runErr, command, string(errBuf))
acc.AddError(err)
return
@ -149,8 +155,8 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync
return
}
if isNagios {
metrics = nagios.AddState(runErr, errBuf, metrics)
if e.exitcodeHandler != nil {
metrics = e.exitcodeHandler(metrics, runErr, errBuf)
}
for _, m := range metrics {
@ -160,6 +166,13 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync
func (e *Exec) SetParser(parser parsers.Parser) {
e.parser = parser
unwrapped, ok := parser.(*models.RunningParser)
if ok {
if _, ok := unwrapped.Parser.(*nagios.Parser); ok {
e.exitcodeHandler = nagiosHandler
e.parseDespiteError = true
}
}
}
func (e *Exec) Gather(acc telegraf.Accumulator) error {
@ -213,6 +226,10 @@ func (e *Exec) Init() error {
return nil
}
func nagiosHandler(metrics []telegraf.Metric, err error, msg []byte) []telegraf.Metric {
return nagios.AddState(err, msg, metrics)
}
func init() {
inputs.Add("exec", func() telegraf.Input {
return NewExec()

View File

@ -13,10 +13,10 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/process"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
)
//go:embed sample.conf
@ -29,9 +29,10 @@ type Execd struct {
RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger `toml:"-"`
process *process.Process
acc telegraf.Accumulator
parser parsers.Parser
process *process.Process
acc telegraf.Accumulator
parser parsers.Parser
outputReader func(io.Reader)
}
func (*Execd) SampleConfig() string {
@ -40,6 +41,14 @@ func (*Execd) SampleConfig() string {
func (e *Execd) SetParser(parser parsers.Parser) {
e.parser = parser
e.outputReader = e.cmdReadOut
unwrapped, ok := parser.(*models.RunningParser)
if ok {
if _, ok := unwrapped.Parser.(*influx.Parser); ok {
e.outputReader = e.cmdReadOutStream
}
}
}
func (e *Execd) Start(acc telegraf.Accumulator) error {
@ -51,7 +60,7 @@ func (e *Execd) Start(acc telegraf.Accumulator) error {
}
e.process.Log = e.Log
e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.ReadStdoutFn = e.cmdReadOut
e.process.ReadStdoutFn = e.outputReader
e.process.ReadStderrFn = e.cmdReadErr
if err = e.process.Start(); err != nil {
@ -73,22 +82,10 @@ func (e *Execd) Stop() {
}
func (e *Execd) cmdReadOut(out io.Reader) {
if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser {
// work around the lack of built-in streaming parser. :(
e.cmdReadOutStream(out)
return
}
_, isPrometheus := e.parser.(*prometheus.Parser)
scanner := bufio.NewScanner(out)
for scanner.Scan() {
data := scanner.Bytes()
if isPrometheus {
data = append(data, []byte("\n")...)
}
metrics, err := e.parser.Parse(data)
if err != nil {
e.acc.AddError(fmt.Errorf("parse error: %w", err))

View File

@ -17,6 +17,7 @@ import (
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
)
@ -42,7 +43,7 @@ func TestSettingConfigWorks(t *testing.T) {
}
func TestExternalInputWorks(t *testing.T) {
influxParser := &influx.Parser{}
influxParser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
require.NoError(t, influxParser.Init())
exe, err := os.Executable()
@ -52,10 +53,10 @@ func TestExternalInputWorks(t *testing.T) {
Command: []string{exe, "-counter"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application", "METRIC_NAME=counter"},
RestartDelay: config.Duration(5 * time.Second),
parser: influxParser,
Signal: "STDIN",
Log: testutil.Logger{},
}
e.SetParser(influxParser)
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
@ -76,7 +77,7 @@ func TestExternalInputWorks(t *testing.T) {
}
func TestParsesLinesContainingNewline(t *testing.T) {
parser := &influx.Parser{}
parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
require.NoError(t, parser.Init())
metrics := make(chan telegraf.Metric, 10)
@ -85,11 +86,11 @@ func TestParsesLinesContainingNewline(t *testing.T) {
e := &Execd{
RestartDelay: config.Duration(5 * time.Second),
parser: parser,
Signal: "STDIN",
acc: acc,
Log: testutil.Logger{},
}
e.SetParser(parser)
cases := []struct {
Name string
@ -108,7 +109,7 @@ func TestParsesLinesContainingNewline(t *testing.T) {
t.Run(test.Name, func(t *testing.T) {
line := fmt.Sprintf("event message=\"%v\" 1587128639239000000", test.Value)
e.cmdReadOut(strings.NewReader(line))
e.outputReader(strings.NewReader(line))
m := readChanWithTimeout(t, metrics, 1*time.Second)
@ -120,6 +121,43 @@ func TestParsesLinesContainingNewline(t *testing.T) {
}
}
func TestParsesPrometheus(t *testing.T) {
parser := models.NewRunningParser(&prometheus.Parser{}, &models.ParserConfig{})
require.NoError(t, parser.Init())
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
var acc testutil.Accumulator
e := &Execd{
RestartDelay: config.Duration(5 * time.Second),
Signal: "STDIN",
acc: &acc,
Log: testutil.Logger{},
}
e.SetParser(parser)
lines := `# HELP This is just a test metric.
# TYPE test summary
test{handler="execd",quantile="0.5"} 42.0
`
expected := []telegraf.Metric{
testutil.MustMetric(
"prometheus",
map[string]string{"handler": "execd", "quantile": "0.5"},
map[string]interface{}{"test": float64(42.0)},
time.Unix(0, 0),
),
}
e.outputReader(strings.NewReader(lines))
check := func() bool { return acc.NMetrics() == uint64(len(expected)) }
require.Eventually(t, check, 1*time.Second, 100*time.Millisecond)
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
}
func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric {
to := time.NewTimer(timeout)
defer to.Stop()

View File

@ -30,8 +30,13 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
var parser expfmt.TextParser
var metrics []telegraf.Metric
var err error
// parse even if the buffer begins with a newline
// Make sure we have a finishing newline but no trailing one
buf = bytes.TrimPrefix(buf, []byte("\n"))
if !bytes.HasSuffix(buf, []byte("\n")) {
buf = append(buf, []byte("\n")...)
}
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)