feat(inputs.execd): Allow to provide logging prefixes on stderr (#15834)

This commit is contained in:
Sven Rebhan 2024-09-09 18:00:19 +02:00 committed by GitHub
parent 53fb5adac2
commit b0d44d88dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 179 additions and 5 deletions

View File

@ -11,7 +11,12 @@ collection interval. This is used for when you want to have Telegraf notify the
plugin when it's time to run collection. STDIN is recommended, which writes a plugin when it's time to run collection. STDIN is recommended, which writes a
new line to the process's STDIN. new line to the process's STDIN.
STDERR from the process will be relayed to Telegraf as errors in the logs. STDERR from the process will be relayed to Telegraf's logging facilities. By
default all messages on `stderr` will be logged as errors. However, you can
log to other levels by prefixing your message with `E!` for error, `W!` for
warning, `I!` for info, `D!` for debugging and `T!` for trace levels followed by
a space and the actual message. For example outputting `I! A log message` will
create a `info` log line in your Telegraf logging output.
[Input Data Formats]: ../../../docs/DATA_FORMATS_INPUT.md [Input Data Formats]: ../../../docs/DATA_FORMATS_INPUT.md
[inputs.exec]: ../exec/README.md [inputs.exec]: ../exec/README.md

View File

@ -146,7 +146,21 @@ func (e *Execd) cmdReadErr(out io.Reader) {
scanner := bufio.NewScanner(out) scanner := bufio.NewScanner(out)
for scanner.Scan() { for scanner.Scan() {
e.Log.Errorf("stderr: %q", scanner.Text()) msg := scanner.Text()
switch {
case strings.HasPrefix(msg, "E! "):
e.Log.Error(msg[3:])
case strings.HasPrefix(msg, "W! "):
e.Log.Warn(msg[3:])
case strings.HasPrefix(msg, "I! "):
e.Log.Info(msg[3:])
case strings.HasPrefix(msg, "D! "):
e.Log.Debug(msg[3:])
case strings.HasPrefix(msg, "T! "):
e.Log.Trace(msg[3:])
default:
e.Log.Errorf("stderr: %q", msg)
}
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {

View File

@ -215,6 +215,139 @@ func TestStopOnErrorSuccess(t *testing.T) {
}, 3*time.Second, 100*time.Millisecond) }, 3*time.Second, 100*time.Millisecond)
} }
func TestLoggingNoPrefix(t *testing.T) {
// Use own test as mocking executable
exe, err := os.Executable()
require.NoError(t, err)
// Setup the plugin with a capturing logger
var l testutil.CaptureLogger
plugin := &Execd{
Command: []string{exe, "-mode", "logging"},
Environment: []string{
"PLUGINS_INPUTS_EXECD_MODE=application",
"MESSAGE=this is an error",
},
Signal: "STDIN",
StopOnError: true,
RestartDelay: config.Duration(100 * time.Millisecond),
Log: &l,
}
parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Run the plugin and trigger a report
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
require.NoError(t, plugin.Gather(&acc))
plugin.Stop()
// Wait for at least two metric as this indicates the process was restarted
require.Eventually(t, func() bool {
return acc.NMetrics() > 0 && l.NMessages() > 0
}, 3*time.Second, 100*time.Millisecond)
// Check the metric
expected := []telegraf.Metric{
metric.New("test", map[string]string{}, map[string]interface{}{"value": int64(0)}, time.Unix(0, 0)),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
// Check the error message type
expectedLevel := byte(testutil.LevelError)
levels := make(map[byte]int, 0)
for _, m := range l.Messages() {
if strings.HasPrefix(m.Text, "Starting process") || strings.HasSuffix(m.Text, "shut down") {
continue
}
if m.Level != expectedLevel {
t.Logf("received msg %q (%s)", m.Text, string(m.Level))
} else {
require.Equal(t, "stderr: \"this is an error\"", m.Text)
}
levels[m.Level]++
}
require.Equal(t, 1, levels[testutil.LevelError])
require.Len(t, levels, 1)
}
func TestLoggingWithPrefix(t *testing.T) {
// Use own test as mocking executable
exe, err := os.Executable()
require.NoError(t, err)
tests := []struct {
name string
level byte
}{
{"error", testutil.LevelError},
{"warn", testutil.LevelWarn},
{"info", testutil.LevelInfo},
{"debug", testutil.LevelDebug},
{"trace", testutil.LevelTrace},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup the plugin with a capturing logger
var l testutil.CaptureLogger
plugin := &Execd{
Command: []string{exe, "-mode", "logging"},
Environment: []string{
"PLUGINS_INPUTS_EXECD_MODE=application",
fmt.Sprintf("MESSAGE=%s! a log message", string(tt.level)),
},
Signal: "STDIN",
StopOnError: true,
RestartDelay: config.Duration(100 * time.Millisecond),
Log: &l,
}
parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Run the plugin and trigger a report
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
require.NoError(t, plugin.Gather(&acc))
plugin.Stop()
// Wait for at least two metric as this indicates the process was restarted
require.Eventually(t, func() bool {
return acc.NMetrics() > 0 && l.NMessages() > 0
}, 3*time.Second, 100*time.Millisecond)
// Check the metric
expected := []telegraf.Metric{
metric.New("test", map[string]string{}, map[string]interface{}{"value": int64(0)}, time.Unix(0, 0)),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
// Check the error message type
expectedLevel := tt.level
levels := make(map[byte]int, 0)
for _, m := range l.Messages() {
if strings.HasPrefix(m.Text, "Starting process") || strings.HasSuffix(m.Text, "shut down") {
continue
}
if m.Level != expectedLevel {
t.Logf("received msg %q (%s)", m.Text, string(m.Level))
} else {
require.Equal(t, "a log message", m.Text)
}
levels[m.Level]++
}
require.Equal(t, 1, levels[tt.level])
require.Len(t, levels, 1)
})
}
}
func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric { func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric {
to := time.NewTimer(timeout) to := time.NewTimer(timeout)
defer to.Stop() defer to.Stop()
@ -269,6 +402,11 @@ func TestMain(m *testing.M) {
case "success": case "success":
fmt.Println("test value=42i") fmt.Println("test value=42i")
os.Exit(0) os.Exit(0)
case "logging":
if err := runLoggingProgram(); err != nil {
os.Exit(1)
}
os.Exit(0)
} }
os.Exit(23) os.Exit(23)
} }
@ -301,3 +439,21 @@ func runCounterProgram() error {
} }
return nil return nil
} }
func runLoggingProgram() error {
msg := os.Getenv("MESSAGE")
i := 0
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
if _, err := fmt.Fprintf(os.Stdout, "test value=%di\n", i); err != nil {
return err
}
if msg != "" {
if _, err := fmt.Fprintln(os.Stderr, msg); err != nil {
return err
}
}
}
return nil
}

View File

@ -50,9 +50,8 @@ func (l *CaptureLogger) loga(level byte, args ...any) {
l.print(Entry{level, l.Name, fmt.Sprint(args...)}) l.print(Entry{level, l.Name, fmt.Sprint(args...)})
} }
// We always want to output at debug level during testing to find issues easier func (l *CaptureLogger) Level() telegraf.LogLevel {
func (*CaptureLogger) Level() telegraf.LogLevel { return telegraf.Trace
return telegraf.Debug
} }
// Adding attributes is not supported by the test-logger // Adding attributes is not supported by the test-logger