From b0d44d88dcf74eea9b74a40fb5c7bfdadbd894fa Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:00:19 +0200 Subject: [PATCH] feat(inputs.execd): Allow to provide logging prefixes on stderr (#15834) --- plugins/inputs/execd/README.md | 7 +- plugins/inputs/execd/execd.go | 16 ++- plugins/inputs/execd/execd_test.go | 156 +++++++++++++++++++++++++++++ testutil/capturelog.go | 5 +- 4 files changed, 179 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/execd/README.md b/plugins/inputs/execd/README.md index 5b5dc414d..c08015b1c 100644 --- a/plugins/inputs/execd/README.md +++ b/plugins/inputs/execd/README.md @@ -11,7 +11,12 @@ collection interval. This is used for when you want to have Telegraf notify the plugin when it's time to run collection. STDIN is recommended, which writes a new line to the process's STDIN. -STDERR from the process will be relayed to Telegraf as errors in the logs. +STDERR from the process will be relayed to Telegraf's logging facilities. By +default all messages on `stderr` will be logged as errors. However, you can +log to other levels by prefixing your message with `E!` for error, `W!` for +warning, `I!` for info, `D!` for debugging and `T!` for trace levels followed by +a space and the actual message. For example outputting `I! A log message` will +create a `info` log line in your Telegraf logging output. [Input Data Formats]: ../../../docs/DATA_FORMATS_INPUT.md [inputs.exec]: ../exec/README.md diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index ee7cbc621..774c063a9 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -146,7 +146,21 @@ func (e *Execd) cmdReadErr(out io.Reader) { scanner := bufio.NewScanner(out) for scanner.Scan() { - e.Log.Errorf("stderr: %q", scanner.Text()) + msg := scanner.Text() + switch { + case strings.HasPrefix(msg, "E! "): + e.Log.Error(msg[3:]) + case strings.HasPrefix(msg, "W! "): + e.Log.Warn(msg[3:]) + case strings.HasPrefix(msg, "I! "): + e.Log.Info(msg[3:]) + case strings.HasPrefix(msg, "D! "): + e.Log.Debug(msg[3:]) + case strings.HasPrefix(msg, "T! "): + e.Log.Trace(msg[3:]) + default: + e.Log.Errorf("stderr: %q", msg) + } } if err := scanner.Err(); err != nil { diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index 9a0ea715e..34a0f748d 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -215,6 +215,139 @@ func TestStopOnErrorSuccess(t *testing.T) { }, 3*time.Second, 100*time.Millisecond) } +func TestLoggingNoPrefix(t *testing.T) { + // Use own test as mocking executable + exe, err := os.Executable() + require.NoError(t, err) + + // Setup the plugin with a capturing logger + var l testutil.CaptureLogger + plugin := &Execd{ + Command: []string{exe, "-mode", "logging"}, + Environment: []string{ + "PLUGINS_INPUTS_EXECD_MODE=application", + "MESSAGE=this is an error", + }, + Signal: "STDIN", + StopOnError: true, + RestartDelay: config.Duration(100 * time.Millisecond), + Log: &l, + } + + parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{}) + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Run the plugin and trigger a report + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + require.NoError(t, plugin.Gather(&acc)) + plugin.Stop() + + // Wait for at least two metric as this indicates the process was restarted + require.Eventually(t, func() bool { + return acc.NMetrics() > 0 && l.NMessages() > 0 + }, 3*time.Second, 100*time.Millisecond) + + // Check the metric + expected := []telegraf.Metric{ + metric.New("test", map[string]string{}, map[string]interface{}{"value": int64(0)}, time.Unix(0, 0)), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) + + // Check the error message type + expectedLevel := byte(testutil.LevelError) + levels := make(map[byte]int, 0) + for _, m := range l.Messages() { + if strings.HasPrefix(m.Text, "Starting process") || strings.HasSuffix(m.Text, "shut down") { + continue + } + if m.Level != expectedLevel { + t.Logf("received msg %q (%s)", m.Text, string(m.Level)) + } else { + require.Equal(t, "stderr: \"this is an error\"", m.Text) + } + levels[m.Level]++ + } + require.Equal(t, 1, levels[testutil.LevelError]) + require.Len(t, levels, 1) +} + +func TestLoggingWithPrefix(t *testing.T) { + // Use own test as mocking executable + exe, err := os.Executable() + require.NoError(t, err) + + tests := []struct { + name string + level byte + }{ + {"error", testutil.LevelError}, + {"warn", testutil.LevelWarn}, + {"info", testutil.LevelInfo}, + {"debug", testutil.LevelDebug}, + {"trace", testutil.LevelTrace}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup the plugin with a capturing logger + var l testutil.CaptureLogger + plugin := &Execd{ + Command: []string{exe, "-mode", "logging"}, + Environment: []string{ + "PLUGINS_INPUTS_EXECD_MODE=application", + fmt.Sprintf("MESSAGE=%s! a log message", string(tt.level)), + }, + Signal: "STDIN", + StopOnError: true, + RestartDelay: config.Duration(100 * time.Millisecond), + Log: &l, + } + + parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{}) + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Run the plugin and trigger a report + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + require.NoError(t, plugin.Gather(&acc)) + plugin.Stop() + + // Wait for at least two metric as this indicates the process was restarted + require.Eventually(t, func() bool { + return acc.NMetrics() > 0 && l.NMessages() > 0 + }, 3*time.Second, 100*time.Millisecond) + + // Check the metric + expected := []telegraf.Metric{ + metric.New("test", map[string]string{}, map[string]interface{}{"value": int64(0)}, time.Unix(0, 0)), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) + + // Check the error message type + expectedLevel := tt.level + levels := make(map[byte]int, 0) + for _, m := range l.Messages() { + if strings.HasPrefix(m.Text, "Starting process") || strings.HasSuffix(m.Text, "shut down") { + continue + } + if m.Level != expectedLevel { + t.Logf("received msg %q (%s)", m.Text, string(m.Level)) + } else { + require.Equal(t, "a log message", m.Text) + } + levels[m.Level]++ + } + require.Equal(t, 1, levels[tt.level]) + require.Len(t, levels, 1) + }) + } +} + func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric { to := time.NewTimer(timeout) defer to.Stop() @@ -269,6 +402,11 @@ func TestMain(m *testing.M) { case "success": fmt.Println("test value=42i") os.Exit(0) + case "logging": + if err := runLoggingProgram(); err != nil { + os.Exit(1) + } + os.Exit(0) } os.Exit(23) } @@ -301,3 +439,21 @@ func runCounterProgram() error { } return nil } + +func runLoggingProgram() error { + msg := os.Getenv("MESSAGE") + + i := 0 + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + if _, err := fmt.Fprintf(os.Stdout, "test value=%di\n", i); err != nil { + return err + } + if msg != "" { + if _, err := fmt.Fprintln(os.Stderr, msg); err != nil { + return err + } + } + } + return nil +} diff --git a/testutil/capturelog.go b/testutil/capturelog.go index 0d204ead9..d8397ab17 100644 --- a/testutil/capturelog.go +++ b/testutil/capturelog.go @@ -50,9 +50,8 @@ func (l *CaptureLogger) loga(level byte, args ...any) { l.print(Entry{level, l.Name, fmt.Sprint(args...)}) } -// We always want to output at debug level during testing to find issues easier -func (*CaptureLogger) Level() telegraf.LogLevel { - return telegraf.Debug +func (l *CaptureLogger) Level() telegraf.LogLevel { + return telegraf.Trace } // Adding attributes is not supported by the test-logger