chore(logging): Implement early logging (#15629)

This commit is contained in:
Sven Rebhan 2024-07-26 17:09:21 +02:00 committed by GitHub
parent 085acb23a9
commit ef41198481
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 548 additions and 446 deletions

View File

@ -3,7 +3,6 @@ package agent
import (
"bytes"
"errors"
"log"
"os"
"testing"
"time"
@ -49,8 +48,8 @@ func TestAddFields(t *testing.T) {
func TestAccAddError(t *testing.T) {
errBuf := bytes.NewBuffer(nil)
log.SetOutput(errBuf)
defer log.SetOutput(os.Stderr)
logger.RedirectLogging(errBuf)
defer logger.RedirectLogging(os.Stderr)
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
@ -157,5 +156,5 @@ func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
}
func (tm *TestMetricMaker) Log() telegraf.Logger {
return logger.NewLogger("TestPlugin", "test", "")
return logger.New("TestPlugin", "test", "")
}

View File

@ -137,11 +137,6 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
return fmt.Errorf("unknown command %q", cCtx.Args().First())
}
err := logger.SetupLogging(&logger.Config{})
if err != nil {
return err
}
// Deprecated: Use execd instead
// Load external plugins, if requested.
if cCtx.String("plugin-directory") != "" {

View File

@ -466,11 +466,11 @@ func (c *Config) LoadConfig(path string) error {
data, _, err := LoadConfigFileWithRetries(path, c.Agent.ConfigURLRetryAttempts)
if err != nil {
return fmt.Errorf("error loading config file %s: %w", path, err)
return fmt.Errorf("loading config file %s failed: %w", path, err)
}
if err = c.LoadConfigData(data); err != nil {
return fmt.Errorf("error loading config file %s: %w", path, err)
return fmt.Errorf("loading config file %s failed: %w", path, err)
}
return nil
@ -912,7 +912,7 @@ func (c *Config) addSecretStore(name string, table *ast.Table) error {
return err
}
logger := logging.NewLogger("secretstores", name, "")
logger := logging.New("secretstores", name, "")
models.SetLoggerOnPlugin(store, logger)
if err := store.Init(); err != nil {

View File

@ -497,7 +497,7 @@ func TestConfig_BadOrdering(t *testing.T) {
require.Error(t, err, "bad ordering")
require.Equal(
t,
"error loading config file ./testdata/non_slice_slice.toml: error parsing http array, line 4: cannot unmarshal TOML array into string (need slice)",
"loading config file ./testdata/non_slice_slice.toml failed: error parsing http array, line 4: cannot unmarshal TOML array into string (need slice)",
err.Error(),
)
}
@ -541,11 +541,11 @@ func TestConfig_URLLikeFileName(t *testing.T) {
// The error file not found error message is different on Windows
require.Equal(
t,
"error loading config file http:##www.example.com.conf: open http:##www.example.com.conf: The system cannot find the file specified.",
"loading config file http:##www.example.com.conf failed: open http:##www.example.com.conf: The system cannot find the file specified.",
err.Error(),
)
} else {
require.Equal(t, "error loading config file http:##www.example.com.conf: open http:##www.example.com.conf: no such file or directory", err.Error())
require.Equal(t, "loading config file http:##www.example.com.conf failed: open http:##www.example.com.conf: no such file or directory", err.Error())
}
}
@ -639,7 +639,7 @@ func TestConfig_SerializerInterfaceNewFormat(t *testing.T) {
formatCfg := &cfg
formatCfg.DataFormat = format
logger := logging.NewLogger("serializers", format, "test")
logger := logging.New("serializers", format, "test")
var serializer telegraf.Serializer
if creator, found := serializers.Serializers[format]; found {
@ -731,7 +731,7 @@ func TestConfig_SerializerInterfaceOldFormat(t *testing.T) {
formatCfg := &cfg
formatCfg.DataFormat = format
logger := logging.NewLogger("serializers", format, "test")
logger := logging.New("serializers", format, "test")
var serializer serializers.Serializer
if creator, found := serializers.Serializers[format]; found {
@ -837,7 +837,7 @@ func TestConfig_ParserInterface(t *testing.T) {
expected := make([]telegraf.Parser, 0, len(formats))
for _, format := range formats {
logger := logging.NewLogger("parsers", format, "parser_test_new")
logger := logging.New("parsers", format, "parser_test_new")
creator, found := parsers.Parsers[format]
require.Truef(t, found, "No parser for format %q", format)
@ -1043,7 +1043,7 @@ func TestConfig_ProcessorsWithParsers(t *testing.T) {
expected := make([]telegraf.Parser, 0, len(formats))
for _, format := range formats {
logger := logging.NewLogger("parsers", format, "processors_with_parsers")
logger := logging.New("parsers", format, "processors_with_parsers")
creator, found := parsers.Parsers[format]
require.Truef(t, found, "No parser for format %q", format)

View File

@ -383,7 +383,7 @@ func TestURLRetries3Fails(t *testing.T) {
}))
defer ts.Close()
expected := fmt.Sprintf("error loading config file %s: failed to fetch HTTP config: 404 Not Found", ts.URL)
expected := fmt.Sprintf("loading config file %s failed: failed to fetch HTTP config: 404 Not Found", ts.URL)
c := NewConfig()
err := c.LoadConfig(ts.URL)

View File

@ -212,7 +212,6 @@ following works:
- github.com/influxdata/line-protocol [MIT License](https://github.com/influxdata/line-protocol/blob/v2/LICENSE)
- github.com/influxdata/tail [MIT License](https://github.com/influxdata/tail/blob/master/LICENSE.txt)
- github.com/influxdata/toml [MIT License](https://github.com/influxdata/toml/blob/master/LICENSE)
- github.com/influxdata/wlog [MIT License](https://github.com/influxdata/wlog/blob/master/LICENSE)
- github.com/intel/iaevents [Apache License 2.0](https://github.com/intel/iaevents/blob/main/LICENSE)
- github.com/intel/powertelemetry [Apache License 2.0](https://github.com/intel/powertelemetry/blob/main/LICENSE)
- github.com/jackc/chunkreader [MIT License](https://github.com/jackc/chunkreader/blob/master/LICENSE)

1
go.mod
View File

@ -118,7 +118,6 @@ require (
github.com/influxdata/line-protocol/v2 v2.2.1
github.com/influxdata/tail v1.0.1-0.20240719165826-3c9d721090d2
github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65
github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8
github.com/intel/iaevents v1.1.0
github.com/intel/powertelemetry v1.0.1
github.com/jackc/pgconn v1.14.3

2
go.sum
View File

@ -1620,8 +1620,6 @@ github.com/influxdata/tail v1.0.1-0.20240719165826-3c9d721090d2 h1:W68O0w2sRiBXo
github.com/influxdata/tail v1.0.1-0.20240719165826-3c9d721090d2/go.mod h1:VeiWgI3qaGdJWust2fP27a6J+koITo/1c/UhxeOxgaM=
github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65 h1:vvyMtD5LTJc1W9sQKjDkAWdcg0478CszSdzlHtiAXCY=
github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65/go.mod h1:zApaNFpP/bTpQItGZNNUMISDMDAnTXu9UqJ4yT3ocz8=
github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 h1:W2IgzRCb0L9VzMujq/QuTaZUKcH8096jWwP519mHN6Q=
github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8/go.mod h1:/2NMgWB1DHM1ti/gqhOlg+LJeBVk6FqR5aVGYY0hlwI=
github.com/intel/iaevents v1.1.0 h1:FzxMBfXk/apG2EUXUCfaq3gUQ+q+TgZ1HNMjjUILUGE=
github.com/intel/iaevents v1.1.0/go.mod h1:CyUUzXw0lHRCsmyyF7Pwco9Y7NiTNQUUlcJ7RJAazKs=
github.com/intel/powertelemetry v1.0.1 h1:a35pZbqOnJlEYGEPXM+YKtetu6D6dJD4Jb4GS4Zetxs=

View File

@ -3,6 +3,19 @@ package telegraf
// LogLevel denotes the level for logging
type LogLevel int
const (
// None means nothing is logged
None LogLevel = iota
// Error will log error messages
Error
// Warn will log error messages and warnings
Warn
// Info will log error messages, warnings and information messages
Info
// Debug will log all of the above and debugging messages issued by plugins
Debug
)
func (e LogLevel) String() string {
switch e {
case Error:
@ -17,27 +30,29 @@ func (e LogLevel) String() string {
return "NONE"
}
const (
// None means nothing is logged
None LogLevel = iota
// Error will log error messages
Error
// Warn will log error messages and warnings
Warn
// Info will log error messages, warnings and information messages
Info
// Debug will log all of the above and debugging messages issued by plugins
Debug
)
func (e LogLevel) Indicator() string {
switch e {
case Error:
return "E!"
case Warn:
return "W!"
case Info:
return "I!"
case Debug:
return "D!"
}
return "U!"
}
func (e LogLevel) Includes(level LogLevel) bool {
return e >= level
}
// Logger defines an plugin-related interface for logging.
type Logger interface {
// Level returns the configured log-level of the logger
Level() LogLevel
// RegisterErrorCallback registers a callback triggered when logging errors
RegisterErrorCallback(func())
// Errorf logs an error message, patterned after log.Printf.
Errorf(format string, args ...interface{})
// Error logs an error message, patterned after log.Print.

View File

@ -5,12 +5,10 @@ import (
"io"
"log"
"os"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/rotate"
"github.com/influxdata/wlog"
)
const (
@ -19,142 +17,33 @@ const (
)
type defaultLogger struct {
Category string
Name string
Alias string
LogLevel telegraf.LogLevel
prefix string
onError []func()
writer io.Writer
internalWriter io.Writer
timezone *time.Location
logger *log.Logger
}
func (t *defaultLogger) Write(b []byte) (n int, err error) {
var line []byte
timeToPrint := time.Now().In(t.timezone)
func (l *defaultLogger) Close() error {
writer := l.logger.Writer()
if !prefixRegex.Match(b) {
line = append([]byte(timeToPrint.Format(time.RFC3339)+" I! "), b...)
} else {
line = append([]byte(timeToPrint.Format(time.RFC3339)+" "), b...)
}
return t.writer.Write(line)
}
// NewLogger creates a new logger instance
func (t *defaultLogger) New(category, name, alias string) telegraf.Logger {
var prefix string
if category != "" {
prefix = "[" + category
if name != "" {
prefix += "." + name
}
if alias != "" {
prefix += "::" + alias
}
prefix += "] "
}
return &defaultLogger{
Category: category,
Name: name,
Alias: alias,
LogLevel: t.LogLevel,
prefix: prefix,
writer: t.writer,
internalWriter: t.internalWriter,
timezone: t.timezone,
}
}
func (t *defaultLogger) Close() error {
// avoid closing stderr
if t.internalWriter == os.Stderr {
// Close the writer if possible and avoid closing stderr
if writer == os.Stderr {
return nil
}
closer, isCloser := t.internalWriter.(io.Closer)
if !isCloser {
return errors.New("the underlying writer cannot be closed")
}
return closer.Close()
}
// OnErr defines a callback that triggers only when errors are about to be written to the log
func (t *defaultLogger) RegisterErrorCallback(f func()) {
t.onError = append(t.onError, f)
}
func (t *defaultLogger) Level() telegraf.LogLevel {
return t.LogLevel
}
// Errorf logs an error message, patterned after log.Printf.
func (t *defaultLogger) Errorf(format string, args ...interface{}) {
log.Printf("E! "+t.prefix+format, args...)
for _, f := range t.onError {
f()
}
}
// Error logs an error message, patterned after log.Print.
func (t *defaultLogger) Error(args ...interface{}) {
for _, f := range t.onError {
f()
}
log.Print(append([]interface{}{"E! " + t.prefix}, args...)...)
}
// Debugf logs a debug message, patterned after log.Printf.
func (t *defaultLogger) Debugf(format string, args ...interface{}) {
log.Printf("D! "+t.prefix+" "+format, args...)
}
// Debug logs a debug message, patterned after log.Print.
func (t *defaultLogger) Debug(args ...interface{}) {
log.Print(append([]interface{}{"D! " + t.prefix}, args...)...)
}
// Warnf logs a warning message, patterned after log.Printf.
func (t *defaultLogger) Warnf(format string, args ...interface{}) {
log.Printf("W! "+t.prefix+format, args...)
}
// Warn logs a warning message, patterned after log.Print.
func (t *defaultLogger) Warn(args ...interface{}) {
log.Print(append([]interface{}{"W! " + t.prefix}, args...)...)
}
// Infof logs an information message, patterned after log.Printf.
func (t *defaultLogger) Infof(format string, args ...interface{}) {
log.Printf("I! "+t.prefix+format, args...)
}
// Info logs an information message, patterned after log.Print.
func (t *defaultLogger) Info(args ...interface{}) {
log.Print(append([]interface{}{"I! " + t.prefix}, args...)...)
}
func createDefaultLogger(cfg *Config) (logger, error) {
log.SetFlags(0)
// Set the log-level
switch cfg.logLevel {
case telegraf.Error:
wlog.SetLevel(wlog.ERROR)
case telegraf.Warn:
wlog.SetLevel(wlog.WARN)
case telegraf.Info:
wlog.SetLevel(wlog.INFO)
case telegraf.Debug:
wlog.SetLevel(wlog.DEBUG)
if closer, ok := writer.(io.Closer); ok {
return closer.Close()
}
// Setup the writer target
return errors.New("the underlying writer cannot be closed")
}
func (l *defaultLogger) SetOutput(w io.Writer) {
l.logger.SetOutput(w)
}
func (l *defaultLogger) Print(level telegraf.LogLevel, ts time.Time, prefix string, args ...interface{}) {
msg := append([]interface{}{ts.Format(time.RFC3339), " ", level.Indicator(), " ", prefix}, args...)
l.logger.Print(msg...)
}
func createDefaultLogger(cfg *Config) (sink, error) {
var writer io.Writer = os.Stderr
if cfg.LogTarget == "file" && cfg.Logfile != "" {
w, err := rotate.NewFileWriter(
@ -169,25 +58,7 @@ func createDefaultLogger(cfg *Config) (logger, error) {
writer = w
}
// Get configured timezone
timezoneName := cfg.LogWithTimezone
if strings.EqualFold(timezoneName, "local") {
timezoneName = "Local"
}
tz, err := time.LoadLocation(timezoneName)
if err != nil {
return nil, errors.New("error while setting logging timezone: " + err.Error())
}
// Setup the logger
l := &defaultLogger{
writer: wlog.NewWriter(writer),
internalWriter: writer,
timezone: tz,
}
log.SetOutput(l)
return l, nil
return &defaultLogger{logger: log.New(writer, "", 0)}, nil
}
func init() {

View File

@ -6,59 +6,97 @@ import (
"os"
"path/filepath"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/require"
"github.com/influxdata/wlog"
)
func TestWriteLogToFile(t *testing.T) {
func TestLogTargetDefault(t *testing.T) {
instance = defaultHandler()
cfg := &Config{
Quiet: true,
}
require.NoError(t, SetupLogging(cfg))
logger, ok := instance.impl.(*defaultLogger)
require.True(t, ok, "logging instance is not a default-logger")
require.Equal(t, logger.logger.Writer(), os.Stderr)
}
func TestLogTargetStderr(t *testing.T) {
instance = defaultHandler()
cfg := &Config{
LogTarget: "stderr",
Quiet: true,
}
require.NoError(t, SetupLogging(cfg))
logger, ok := instance.impl.(*defaultLogger)
require.True(t, ok, "logging instance is not a default-logger")
require.Equal(t, logger.logger.Writer(), os.Stderr)
}
func TestLogTargetFile(t *testing.T) {
tmpfile, err := os.CreateTemp("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
cfg := createBasicConfig(tmpfile.Name())
err = SetupLogging(cfg)
require.NoError(t, err)
cfg := &Config{
Logfile: tmpfile.Name(),
LogTarget: "file",
RotationMaxArchives: -1,
}
require.NoError(t, SetupLogging(cfg))
log.Printf("I! TEST")
log.Printf("D! TEST") // <- should be ignored
f, err := os.ReadFile(tmpfile.Name())
buf, err := os.ReadFile(tmpfile.Name())
require.NoError(t, err)
require.Equal(t, f[19:], []byte("Z I! TEST\n"))
require.Greater(t, len(buf), 19)
require.Equal(t, buf[19:], []byte("Z I! TEST\n"))
}
func TestDebugWriteLogToFile(t *testing.T) {
func TestLogTargetFileDebug(t *testing.T) {
tmpfile, err := os.CreateTemp("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
cfg := createBasicConfig(tmpfile.Name())
cfg.Debug = true
err = SetupLogging(cfg)
require.NoError(t, err)
cfg := &Config{
Logfile: tmpfile.Name(),
LogTarget: "file",
RotationMaxArchives: -1,
Debug: true,
}
require.NoError(t, SetupLogging(cfg))
log.Printf("D! TEST")
f, err := os.ReadFile(tmpfile.Name())
buf, err := os.ReadFile(tmpfile.Name())
require.NoError(t, err)
require.Equal(t, f[19:], []byte("Z D! TEST\n"))
require.Greater(t, len(buf), 19)
require.Equal(t, buf[19:], []byte("Z D! TEST\n"))
}
func TestErrorWriteLogToFile(t *testing.T) {
func TestLogTargetFileError(t *testing.T) {
tmpfile, err := os.CreateTemp("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
cfg := createBasicConfig(tmpfile.Name())
cfg.Quiet = true
err = SetupLogging(cfg)
require.NoError(t, err)
cfg := &Config{
Logfile: tmpfile.Name(),
LogTarget: "file",
RotationMaxArchives: -1,
Quiet: true,
}
require.NoError(t, SetupLogging(cfg))
log.Printf("E! TEST")
log.Printf("I! TEST") // <- should be ignored
f, err := os.ReadFile(tmpfile.Name())
buf, err := os.ReadFile(tmpfile.Name())
require.NoError(t, err)
require.Equal(t, f[19:], []byte("Z E! TEST\n"))
require.Greater(t, len(buf), 19)
require.Equal(t, buf[19:], []byte("Z E! TEST\n"))
}
func TestAddDefaultLogLevel(t *testing.T) {
@ -66,15 +104,20 @@ func TestAddDefaultLogLevel(t *testing.T) {
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
cfg := createBasicConfig(tmpfile.Name())
cfg.Debug = true
err = SetupLogging(cfg)
require.NoError(t, err)
cfg := &Config{
Logfile: tmpfile.Name(),
LogTarget: "file",
RotationMaxArchives: -1,
Debug: true,
}
require.NoError(t, SetupLogging(cfg))
log.Printf("TEST")
f, err := os.ReadFile(tmpfile.Name())
buf, err := os.ReadFile(tmpfile.Name())
require.NoError(t, err)
require.Equal(t, f[19:], []byte("Z I! TEST\n"))
require.Greater(t, len(buf), 19)
require.Equal(t, buf[19:], []byte("Z I! TEST\n"))
}
func TestWriteToTruncatedFile(t *testing.T) {
@ -82,15 +125,20 @@ func TestWriteToTruncatedFile(t *testing.T) {
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
cfg := createBasicConfig(tmpfile.Name())
cfg.Debug = true
err = SetupLogging(cfg)
require.NoError(t, err)
cfg := &Config{
Logfile: tmpfile.Name(),
LogTarget: "file",
RotationMaxArchives: -1,
Debug: true,
}
require.NoError(t, SetupLogging(cfg))
log.Printf("TEST")
f, err := os.ReadFile(tmpfile.Name())
buf, err := os.ReadFile(tmpfile.Name())
require.NoError(t, err)
require.Equal(t, f[19:], []byte("Z I! TEST\n"))
require.Greater(t, len(buf), 19)
require.Equal(t, buf[19:], []byte("Z I! TEST\n"))
tmpf, err := os.OpenFile(tmpfile.Name(), os.O_RDWR|os.O_TRUNC, 0640)
require.NoError(t, err)
@ -98,65 +146,42 @@ func TestWriteToTruncatedFile(t *testing.T) {
log.Printf("SHOULD BE FIRST")
f, err = os.ReadFile(tmpfile.Name())
buf, err = os.ReadFile(tmpfile.Name())
require.NoError(t, err)
require.Equal(t, f[19:], []byte("Z I! SHOULD BE FIRST\n"))
require.Equal(t, buf[19:], []byte("Z I! SHOULD BE FIRST\n"))
}
func TestWriteToFileInRotation(t *testing.T) {
tempDir := t.TempDir()
cfg := createBasicConfig(filepath.Join(tempDir, "test.log"))
cfg.RotationMaxSize = 30
cfg := &Config{
Logfile: filepath.Join(tempDir, "test.log"),
LogTarget: "file",
RotationMaxArchives: -1,
RotationMaxSize: 30,
}
require.NoError(t, SetupLogging(cfg))
// Close the writer here, otherwise the temp folder cannot be deleted because the current log file is in use.
t.Cleanup(func() { require.NoError(t, instance.Close()) })
defer CloseLogging() //nolint:errcheck // We cannot do anything if this fails
log.Printf("I! TEST 1") // Writes 31 bytes, will rotate
log.Printf("I! TEST") // Writes 29 byes, no rotation expected
files, err := os.ReadDir(tempDir)
require.NoError(t, err)
require.Len(t, files, 2)
}
func TestLogTargetSettings(t *testing.T) {
instance = nil
cfg := &Config{
LogTarget: "",
Quiet: true,
}
require.NoError(t, SetupLogging(cfg))
logger, isTelegrafLogger := instance.(*defaultLogger)
require.True(t, isTelegrafLogger)
require.Equal(t, logger.internalWriter, os.Stderr)
cfg = &Config{
LogTarget: "stderr",
Quiet: true,
}
require.NoError(t, SetupLogging(cfg))
logger, isTelegrafLogger = instance.(*defaultLogger)
require.True(t, isTelegrafLogger)
require.Equal(t, logger.internalWriter, os.Stderr)
}
func BenchmarkTelegrafLogWrite(b *testing.B) {
l, err := createDefaultLogger(&Config{})
require.NoError(b, err)
// Discard all logging output
dl := l.(*defaultLogger)
dl.writer = wlog.NewWriter(io.Discard)
dl.internalWriter = io.Discard
dl.SetOutput(io.Discard)
ts := time.Now()
for i := 0; i < b.N; i++ {
dl.Info("test")
}
}
func createBasicConfig(filename string) *Config {
return &Config{
Logfile: filename,
LogTarget: "file",
RotationMaxArchives: -1,
dl.Print(telegraf.Debug, ts, "", "test")
}
}

View File

@ -5,7 +5,8 @@ package logger
import (
"fmt"
"log"
"strings"
"os"
"time"
"github.com/influxdata/telegraf"
"golang.org/x/sys/windows/svc/eventlog"
@ -18,134 +19,37 @@ const (
)
type eventLogger struct {
Category string
Name string
Alias string
LogLevel telegraf.LogLevel
prefix string
onError []func()
eventlog *eventlog.Log
errlog *log.Logger
}
func (e *eventLogger) Write(b []byte) (int, error) {
loc := prefixRegex.FindIndex(b)
n := len(b)
if loc == nil {
return n, e.eventlog.Info(1, string(b))
}
// Skip empty log messages
if n <= 2 {
return 0, nil
}
line := strings.Trim(string(b[loc[1]:]), " \t\r\n")
switch rune(b[loc[0]]) {
case 'I':
return n, e.eventlog.Info(eidInfo, line)
case 'W':
return n, e.eventlog.Warning(eidWarning, line)
case 'E':
return n, e.eventlog.Error(eidError, line)
}
return n, nil
func (l *eventLogger) Close() error {
return l.eventlog.Close()
}
// NewLogger creates a new logger instance
func (e *eventLogger) New(category, name, alias string) telegraf.Logger {
var prefix string
if category != "" {
prefix = "[" + category
if name != "" {
prefix += "." + name
}
if alias != "" {
prefix += "::" + alias
}
prefix += "] "
}
return &eventLogger{
Category: category,
Name: name,
Alias: alias,
LogLevel: e.LogLevel,
prefix: prefix,
eventlog: e.eventlog,
}
}
func (e *eventLogger) Close() error {
return e.eventlog.Close()
}
// OnErr defines a callback that triggers only when errors are about to be written to the log
func (e *eventLogger) RegisterErrorCallback(f func()) {
e.onError = append(e.onError, f)
}
func (e *eventLogger) Level() telegraf.LogLevel {
return e.LogLevel
}
// Errorf logs an error message, patterned after log.Printf.
func (e *eventLogger) Errorf(format string, args ...interface{}) {
e.Error(fmt.Sprintf(format, args...))
}
// Error logs an error message, patterned after log.Print.
func (e *eventLogger) Error(args ...interface{}) {
if e.LogLevel >= telegraf.Error {
if err := e.eventlog.Error(eidError, "E! "+e.prefix+fmt.Sprint(args...)); err != nil {
log.Printf("E! Writing log message failed: %v", err)
}
}
for _, f := range e.onError {
f()
}
}
// Warnf logs a warning message, patterned after log.Printf.
func (e *eventLogger) Warnf(format string, args ...interface{}) {
e.Warn(fmt.Sprintf(format, args...))
}
// Warn logs a warning message, patterned after log.Print.
func (e *eventLogger) Warn(args ...interface{}) {
if e.LogLevel < telegraf.Warn {
func (l *eventLogger) Print(level telegraf.LogLevel, _ time.Time, prefix string, args ...interface{}) {
// Skip debug and beyond as they cannot be logged
if level >= telegraf.Debug {
return
}
if err := e.eventlog.Warning(eidError, "W! "+e.prefix+fmt.Sprint(args...)); err != nil {
log.Printf("E! Writing log message failed: %v", err)
msg := level.Indicator() + " " + prefix + fmt.Sprint(args...)
var err error
switch level {
case telegraf.Error:
err = l.eventlog.Error(eidError, msg)
case telegraf.Warn:
err = l.eventlog.Warning(eidWarning, msg)
case telegraf.Info:
err = l.eventlog.Info(eidInfo, msg)
}
if err != nil {
l.errlog.Printf("E! Writing log message failed: %v", err)
}
}
// Infof logs an information message, patterned after log.Printf.
func (e *eventLogger) Infof(format string, args ...interface{}) {
e.Info(fmt.Sprintf(format, args...))
}
// Info logs an information message, patterned after log.Print.
func (e *eventLogger) Info(args ...interface{}) {
if e.LogLevel < telegraf.Info {
return
}
if err := e.eventlog.Info(eidError, "I! "+e.prefix+fmt.Sprint(args...)); err != nil {
log.Printf("E! Writing log message failed: %v", err)
}
}
// No debugging output for eventlog to not spam the service
func (e *eventLogger) Debugf(string, ...interface{}) {}
// No debugging output for eventlog to not spam the service
func (e *eventLogger) Debug(...interface{}) {}
func createEventLogger(cfg *Config) (logger, error) {
func createEventLogger(cfg *Config) (sink, error) {
eventLog, err := eventlog.Open(cfg.InstanceName)
if err != nil {
return nil, err
@ -153,10 +57,9 @@ func createEventLogger(cfg *Config) (logger, error) {
l := &eventLogger{
eventlog: eventLog,
errlog: log.New(os.Stderr, "", 0),
}
log.SetOutput(l)
return l, nil
}

115
logger/handler.go Normal file
View File

@ -0,0 +1,115 @@
package logger
import (
"container/list"
"fmt"
"io"
"log"
"os"
"sync"
"time"
"github.com/influxdata/telegraf"
)
type entry struct {
timestamp time.Time
level telegraf.LogLevel
prefix string
args []interface{}
}
type handler struct {
level telegraf.LogLevel
timezone *time.Location
impl sink
earlysink *log.Logger
earlylogs *list.List
sync.Mutex
}
func defaultHandler() *handler {
return &handler{
level: telegraf.Info,
timezone: time.UTC,
earlysink: log.New(os.Stderr, "", 0),
earlylogs: list.New(),
}
}
func redirectHandler(w io.Writer) *handler {
return &handler{
level: 99,
timezone: time.UTC,
impl: &redirectLogger{writer: w},
earlysink: log.New(w, "", 0),
earlylogs: list.New(),
}
}
func (h *handler) switchSink(impl sink, level telegraf.LogLevel, tz *time.Location, skipEarlyLogs bool) {
// Setup the new sink etc
h.impl = impl
h.level = level
h.timezone = tz
// Use the new logger to output the early log-messages
h.Lock()
if !skipEarlyLogs && h.earlylogs.Len() > 0 {
current := h.earlylogs.Front()
for current != nil {
e := current.Value.(*entry)
h.impl.Print(e.level, e.timestamp.In(h.timezone), e.prefix, e.args...)
next := current.Next()
h.earlylogs.Remove(current)
current = next
}
}
h.Unlock()
}
func (h *handler) add(level telegraf.LogLevel, ts time.Time, prefix string, args ...interface{}) *entry {
e := &entry{
timestamp: ts,
level: level,
prefix: prefix,
args: args,
}
h.Lock()
h.earlylogs.PushBack(e)
h.Unlock()
return e
}
func (h *handler) close() error {
if h.impl == nil {
return nil
}
h.Lock()
current := h.earlylogs.Front()
for current != nil {
h.earlylogs.Remove(current)
current = h.earlylogs.Front()
}
h.Unlock()
if l, ok := h.impl.(io.Closer); ok {
return l.Close()
}
return nil
}
// Logger to redirect the logs to an arbitrary writer
type redirectLogger struct {
writer io.Writer
}
func (l *redirectLogger) Print(level telegraf.LogLevel, ts time.Time, prefix string, args ...interface{}) {
msg := append([]interface{}{ts.In(time.UTC).Format(time.RFC3339), " ", level.Indicator(), " ", prefix}, args...)
fmt.Fprintln(l.writer, msg...)
}

View File

@ -2,19 +2,169 @@ package logger
import (
"fmt"
"regexp"
"io"
"log"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
)
var prefixRegex = regexp.MustCompile("^[DIWE]!")
// Central handler for the logs used by the logger to actually output the logs.
// This is necessary to be able to dynamically switch the sink even though
// plugins already instantiated a logger _before_ the final sink is set up.
var (
instance *handler // handler for the actual output
once sync.Once // once token to initialize the handler only once
)
type logger interface {
telegraf.Logger
New(category, name, alias string) telegraf.Logger
Close() error
// sink interface that has to be implemented by a logging sink
type sink interface {
Print(telegraf.LogLevel, time.Time, string, ...interface{})
}
// Attr represents an attribute appended to structured logging
type Attr struct {
Key string
Value interface{}
}
// logger is the actual implementation of the telegraf logger interface
type logger struct {
level *telegraf.LogLevel
category string
name string
alias string
suffix string
prefix string
onError []func()
}
// New creates a new logging instance to be used in models
func New(category, name, alias string) *logger {
l := &logger{
category: category,
name: name,
alias: alias,
}
l.formatPrefix()
return l
}
// SubLogger creates a new logger with the given name added as suffix
func (l *logger) SubLogger(name string) telegraf.Logger {
suffix := l.suffix
if suffix != "" && name != "" {
suffix += "."
}
suffix += name
nl := &logger{
level: l.level,
category: l.category,
name: l.name,
alias: l.alias,
suffix: suffix,
}
nl.formatPrefix()
return nl
}
func (l *logger) formatPrefix() {
l.prefix = l.category
if l.prefix != "" && l.name != "" {
l.prefix += "."
}
l.prefix += l.name
if l.prefix != "" && l.alias != "" {
l.prefix += "::"
}
l.prefix += l.alias
if l.suffix != "" {
l.prefix += "(" + l.suffix + ")"
}
if l.prefix != "" {
l.prefix = "[" + l.prefix + "] "
}
}
// Level returns the current log-level of the logger
func (l *logger) Level() telegraf.LogLevel {
if l.level != nil {
return *l.level
}
return instance.level
}
// Register a callback triggered when errors are about to be written to the log
func (l *logger) RegisterErrorCallback(f func()) {
l.onError = append(l.onError, f)
}
// Error logging including callbacks
func (l *logger) Errorf(format string, args ...interface{}) {
l.Error(fmt.Sprintf(format, args...))
}
func (l *logger) Error(args ...interface{}) {
l.Print(telegraf.Error, time.Now(), args...)
for _, f := range l.onError {
f()
}
}
// Warning logging
func (l *logger) Warnf(format string, args ...interface{}) {
l.Warn(fmt.Sprintf(format, args...))
}
func (l *logger) Warn(args ...interface{}) {
l.Print(telegraf.Warn, time.Now(), args...)
}
// Info logging
func (l *logger) Infof(format string, args ...interface{}) {
l.Info(fmt.Sprintf(format, args...))
}
func (l *logger) Info(args ...interface{}) {
l.Print(telegraf.Info, time.Now(), args...)
}
// Debug logging, this is suppressed on console
func (l *logger) Debugf(format string, args ...interface{}) {
l.Debug(fmt.Sprintf(format, args...))
}
func (l *logger) Debug(args ...interface{}) {
l.Print(telegraf.Debug, time.Now(), args...)
}
func (l *logger) Print(level telegraf.LogLevel, ts time.Time, args ...interface{}) {
// Check if we are in early logging state and store the message in this case
if instance.impl == nil {
instance.add(level, ts, l.prefix, args...)
}
// Skip all messages with insufficient log-levels
if l.level != nil && !l.level.Includes(level) || l.level == nil && !instance.level.Includes(level) {
return
}
if instance.impl != nil {
instance.impl.Print(level, ts.In(instance.timezone), l.prefix, args...)
} else {
msg := append([]interface{}{ts.In(instance.timezone).Format(time.RFC3339), " ", level.Indicator(), " ", l.prefix}, args...)
instance.earlysink.Print(msg...)
}
}
type Config struct {
@ -43,11 +193,6 @@ type Config struct {
logLevel telegraf.LogLevel
}
// Keep track what is actually set as a log output, because log package doesn't provide a getter.
// It allows closing previous writer if re-set and have possibility to test what is actually set
var instance logger
var once sync.Once
// SetupLogging configures the logging output.
func SetupLogging(cfg *Config) error {
if cfg.Debug {
@ -64,14 +209,24 @@ func SetupLogging(cfg *Config) error {
cfg.InstanceName = "telegraf"
}
if cfg.LogTarget == "" {
if cfg.LogTarget == "" || cfg.LogTarget == "file" && cfg.Logfile == "" {
cfg.LogTarget = "stderr"
}
// Get configured timezone
timezoneName := cfg.LogWithTimezone
if strings.EqualFold(timezoneName, "local") {
timezoneName = "Local"
}
tz, err := time.LoadLocation(timezoneName)
if err != nil {
return fmt.Errorf("setting logging timezone failed: %w", err)
}
// Get the logging factory
creator, ok := registry[cfg.LogTarget]
if !ok {
return fmt.Errorf("unsupported logtarget: %s, using stderr", cfg.LogTarget)
return fmt.Errorf("unsupported log target: %s, using stderr", cfg.LogTarget)
}
// Create the root logging instance
@ -85,26 +240,28 @@ func SetupLogging(cfg *Config) error {
return err
}
// Use the new logger and store a reference
instance = l
// Update the logging instance
instance.switchSink(l, cfg.logLevel, tz, cfg.LogTarget == "stderr")
return nil
}
func NewLogger(category, name, alias string) telegraf.Logger {
return instance.New(category, name, alias)
func RedirectLogging(w io.Writer) {
instance = redirectHandler(w)
}
func CloseLogging() error {
if instance != nil {
return instance.Close()
}
return nil
return instance.close()
}
func init() {
once.Do(func() {
//nolint:errcheck // This should always succeed with the default config
SetupLogging(&Config{})
// Create a special logging instance that additionally buffers all
// messages logged before the final logger is up.
instance = defaultHandler()
// Redirect the standard logger output to our logger instance
log.SetFlags(0)
log.SetOutput(&stdlogRedirector{})
})
}

View File

@ -13,7 +13,7 @@ func TestErrorCounting(t *testing.T) {
"errors",
map[string]string{"input": "test"},
)
iLog := NewLogger("inputs", "test", "")
iLog := New("inputs", "test", "")
iLog.RegisterErrorCallback(func() {
reg.Incr(1)
})

View File

@ -1,6 +1,6 @@
package logger
type creator func(cfg *Config) (logger, error)
type creator func(cfg *Config) (sink, error)
var registry = make(map[string]creator)

View File

@ -0,0 +1,38 @@
package logger
import (
"bytes"
"regexp"
)
var prefixRegex = regexp.MustCompile("^[DIWE]!")
type stdlogRedirector struct {
log logger
}
func (s *stdlogRedirector) Write(b []byte) (n int, err error) {
msg := bytes.Trim(b, " \t\r\n")
// Extract the log-level indicator; use info by default
loc := prefixRegex.FindIndex(b)
level := 'I'
if loc != nil {
level = rune(b[loc[0]])
msg = bytes.Trim(msg[loc[1]:], " \t\r\n")
}
// Log with the given level
switch level {
case 'D':
s.log.Debug(string(msg))
case 'I':
s.log.Info(string(msg))
case 'W':
s.log.Warn(string(msg))
case 'E':
s.log.Error(string(msg))
}
return len(b), nil
}

View File

@ -31,7 +31,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
}
aggErrorsRegister := selfstat.Register("aggregate", "errors", tags)
logger := logging.NewLogger("aggregators", config.Name, config.Alias)
logger := logging.New("aggregators", config.Name, config.Alias)
logger.RegisterErrorCallback(func() {
aggErrorsRegister.Incr(1)
})

View File

@ -41,7 +41,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
}
inputErrorsRegister := selfstat.Register("gather", "errors", tags)
logger := logging.NewLogger("inputs", config.Name, config.Alias)
logger := logging.New("inputs", config.Name, config.Alias)
logger.RegisterErrorCallback(func() {
inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)

View File

@ -80,7 +80,7 @@ func NewRunningOutput(
}
writeErrorsRegister := selfstat.Register("write", "errors", tags)
logger := logging.NewLogger("outputs", config.Name, config.Alias)
logger := logging.New("outputs", config.Name, config.Alias)
logger.RegisterErrorCallback(func() {
writeErrorsRegister.Incr(1)
})

View File

@ -24,7 +24,7 @@ func NewRunningParser(parser telegraf.Parser, config *ParserConfig) *RunningPars
}
parserErrorsRegister := selfstat.Register("parser", "errors", tags)
logger := logging.NewLogger("parsers", config.DataFormat+"::"+config.Parent, config.Alias)
logger := logging.New("parsers", config.DataFormat+"::"+config.Parent, config.Alias)
logger.RegisterErrorCallback(func() {
parserErrorsRegister.Incr(1)
})

View File

@ -37,7 +37,7 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
}
processErrorsRegister := selfstat.Register("process", "errors", tags)
logger := logging.NewLogger("processors", config.Name, config.Alias)
logger := logging.New("processors", config.Name, config.Alias)
logger.RegisterErrorCallback(func() {
processErrorsRegister.Incr(1)
})

View File

@ -34,7 +34,7 @@ func NewRunningSerializer(serializer serializers.Serializer, config *SerializerC
}
serializerErrorsRegister := selfstat.Register("serializer", "errors", tags)
logger := logging.NewLogger("serializers", config.DataFormat+"::"+config.Parent, config.Alias)
logger := logging.New("serializers", config.DataFormat+"::"+config.Parent, config.Alias)
logger.RegisterErrorCallback(func() {
serializerErrorsRegister.Incr(1)
})

View File

@ -29,5 +29,5 @@ func (l *DebugLogger) Println(v ...interface{}) {
// SetLogger configures a debug logger for kafka (sarama)
func (k *Logger) SetLogger() {
sarama.Logger = &DebugLogger{Log: logger.NewLogger("sarama", "", "")}
sarama.Logger = &DebugLogger{Log: logger.New("sarama", "", "")}
}

View File

@ -79,7 +79,7 @@ func NewMQTTv311Client(cfg *MqttConfig) (*mqttv311Client, error) {
}
if cfg.ClientTrace {
log := &mqttLogger{logger.NewLogger("paho", "", "")}
log := &mqttLogger{logger.New("paho", "", "")}
mqttv3.ERROR = log
mqttv3.CRITICAL = log
mqttv3.WARN = log

View File

@ -122,7 +122,7 @@ func (m *mqttv5Client) Connect() (bool, error) {
m.options.ConnectPassword = []byte(pass.String())
if m.clientTrace {
log := mqttLogger{logger.NewLogger("paho", "", "")}
log := mqttLogger{logger.New("paho", "", "")}
m.options.Debug = log
m.options.Errors = log
}

View File

@ -60,7 +60,7 @@ func New() *Shim {
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
log: logger.NewLogger("", "", ""),
log: logger.New("", "", ""),
}
}

View File

@ -4,13 +4,13 @@ import (
"bufio"
"errors"
"io"
"log"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/logger"
)
func TestShimSetsUpLogger(t *testing.T) {
@ -45,7 +45,7 @@ func runErroringInputPlugin(t *testing.T, interval time.Duration, stdin io.Reade
}
if stderr != nil {
shim.stderr = stderr
log.SetOutput(stderr)
logger.RedirectLogging(stderr)
}
err := shim.AddInput(inp)
require.NoError(t, err)

View File

@ -231,7 +231,7 @@ func (tm *testMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
}
func (tm *testMetricMaker) Log() telegraf.Logger {
return logger.NewLogger("test", "test", "")
return logger.New("test", "test", "")
}
type testOutput struct {

View File

@ -242,7 +242,7 @@ func (tm *TestMetricMaker) MakeMetric(aMetric telegraf.Metric) telegraf.Metric {
}
func (tm *TestMetricMaker) Log() telegraf.Logger {
return logger.NewLogger("TestPlugin", "test", "")
return logger.New("TestPlugin", "test", "")
}
func TestMain(m *testing.M) {

View File

@ -11,12 +11,10 @@ import (
_ "embed"
"encoding/binary"
"fmt"
"log"
"net"
"net/http"
"net/http/fcgi"
"net/http/httptest"
"os"
"strconv"
"testing"
"time"
@ -26,7 +24,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/shim"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -389,7 +386,7 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t
var acc testutil.Accumulator
err := acc.GatherError(r.Gather)
require.ErrorContains(t, err, `unable to connect to phpfpm status page 'http://aninvalidone'`)
require.ErrorContains(t, err, `unable to connect to phpfpm status page "http://aninvalidone"`)
require.ErrorContains(t, err, `lookup aninvalidone`)
}
@ -425,29 +422,20 @@ slow requests: 1
var outputSampleJSON []byte
func TestPhpFpmParseJSON_Log_Error_Without_Panic_When_When_JSON_Is_Invalid(t *testing.T) {
p := &phpfpm{}
// AddInput sets the Logger
if err := shim.New().AddInput(p); err != nil {
t.Error(err)
return
}
// capture log output
var logOutput bytes.Buffer
log.SetOutput(&logOutput)
defer func() {
log.SetOutput(os.Stderr)
}()
// Capture the logging output for checking
logger := &testutil.CaptureLogger{Name: "inputs.phpfpm"}
plugin := &phpfpm{Log: logger}
require.NoError(t, plugin.Init())
// parse valid JSON without panic and without log output
validJSON := outputSampleJSON
require.NotPanics(t, func() { p.parseJSON(bytes.NewReader(validJSON), &testutil.NopAccumulator{}, "") })
require.Equal(t, "", logOutput.String())
require.NotPanics(t, func() { plugin.parseJSON(bytes.NewReader(validJSON), &testutil.NopAccumulator{}, "") })
require.Empty(t, logger.NMessages())
// parse invalid JSON without panic but with log output
invalidJSON := []byte("X")
require.NotPanics(t, func() { p.parseJSON(bytes.NewReader(invalidJSON), &testutil.NopAccumulator{}, "") })
require.Contains(t, logOutput.String(), "E! Unable to decode JSON response: invalid character 'X' looking for beginning of value")
require.NotPanics(t, func() { plugin.parseJSON(bytes.NewReader(invalidJSON), &testutil.NopAccumulator{}, "") })
require.Contains(t, logger.Errors(), "E! [inputs.phpfpm] Unable to decode JSON response: invalid character 'X' looking for beginning of value")
}
func TestGatherDespiteUnavailable(t *testing.T) {

View File

@ -487,7 +487,7 @@ func newPostgresql() *Postgresql {
TagTableCreateTemplates: []*sqltemplate.Template{{}},
TagTableAddColumnTemplates: []*sqltemplate.Template{{}},
RetryMaxBackoff: config.Duration(time.Second * 15),
Logger: logger.NewLogger("outputs", "postgresql", ""),
Logger: logger.New("outputs", "postgresql", ""),
LogLevel: "warn",
}

View File

@ -11,7 +11,7 @@ import (
)
func TestCreateCounterCacheEntry(t *testing.T) {
cc := NewCounterCache(logger.NewLogger("outputs", "stackdriver", "TestCreateCounterCacheEntry"))
cc := NewCounterCache(logger.New("outputs", "stackdriver", "TestCreateCounterCacheEntry"))
value := &monpb.TypedValue{
Value: &monpb.TypedValue_Int64Value{
Int64Value: int64(1),
@ -25,7 +25,7 @@ func TestCreateCounterCacheEntry(t *testing.T) {
}
func TestUpdateCounterCacheEntry(t *testing.T) {
cc := NewCounterCache(logger.NewLogger("outputs", "stackdriver", "TestUpdateCounterCacheEntry"))
cc := NewCounterCache(logger.New("outputs", "stackdriver", "TestUpdateCounterCacheEntry"))
now := time.Now().UTC()
value := &monpb.TypedValue{
Value: &monpb.TypedValue_Int64Value{
@ -63,7 +63,7 @@ func TestUpdateCounterCacheEntry(t *testing.T) {
}
func TestCounterCounterCacheEntryReset(t *testing.T) {
cc := NewCounterCache(logger.NewLogger("outputs", "stackdriver", "TestCounterCounterCacheEntryReset"))
cc := NewCounterCache(logger.New("outputs", "stackdriver", "TestCounterCounterCacheEntryReset"))
now := time.Now().UTC()
backdatedNow := now.Add(time.Millisecond * -1)
value := &monpb.TypedValue{
@ -103,7 +103,7 @@ func TestCounterCounterCacheEntryReset(t *testing.T) {
}
func TestCounterCacheDayRollover(t *testing.T) {
cc := NewCounterCache(logger.NewLogger("outputs", "stackdriver", "TestCounterCacheDayRollover"))
cc := NewCounterCache(logger.New("outputs", "stackdriver", "TestCounterCacheDayRollover"))
now := time.Now().UTC()
backdatedNow := now.Add(time.Millisecond * -1)
value := &monpb.TypedValue{