From bc56233e1b6dc381f11df24809671c4e1e0b2959 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 17 Nov 2022 15:03:04 +0100 Subject: [PATCH] feat(outputs.graylog): implement optional connection retries (#11950) --- .../inputs/tcp_listener/tcp_listener_test.go | 7 +- plugins/outputs/graylog/README.md | 9 + plugins/outputs/graylog/graylog.go | 132 +++++- plugins/outputs/graylog/graylog_test_linux.go | 400 +++++++++++------- plugins/outputs/graylog/sample.conf | 9 + plugins/outputs/influxdb/http_test.go | 4 +- testutil/capturelog.go | 30 +- 7 files changed, 413 insertions(+), 178 deletions(-) diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go index ddd2dcabb..020aec857 100644 --- a/plugins/inputs/tcp_listener/tcp_listener_test.go +++ b/plugins/inputs/tcp_listener/tcp_listener_test.go @@ -270,8 +270,10 @@ func TestRunParser(t *testing.T) { func TestRunParserInvalidMsg(t *testing.T) { var testmsg = []byte("cpu_load_short") + logger := &testutil.CaptureLogger{} + listener, in := newTestTCPListener() - listener.Log = &testutil.CaptureLogger{} + listener.Log = logger listener.acc = &testutil.Accumulator{} parser := &influx.Parser{} @@ -283,8 +285,7 @@ func TestRunParserInvalidMsg(t *testing.T) { in <- testmsg listener.Stop() - errmsg := listener.Log.(*testutil.CaptureLogger).LastError - require.Contains(t, errmsg, "tcp_listener has received 1 malformed packets thus far.") + require.Contains(t, logger.LastError(), "tcp_listener has received 1 malformed packets thus far.") } func TestRunParserGraphiteMsg(t *testing.T) { diff --git a/plugins/outputs/graylog/README.md b/plugins/outputs/graylog/README.md index fe9b56c62..e36a0dea6 100644 --- a/plugins/outputs/graylog/README.md +++ b/plugins/outputs/graylog/README.md @@ -51,6 +51,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Set to true for backward compatibility. # name_field_no_prefix = false + ## Connection retry options + ## Attempt to connect to the enpoints if the initial connection fails. + ## If 'false', Telegraf will give up after 3 connection attempt and will + ## exit with an error. If set to 'true', the plugin will retry to connect + ## to the unconnected endpoints infinitely. + # connection_retry = false + ## Time to wait between connection retry attempts. + # connection_retry_wait_time = "15s" + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" diff --git a/plugins/outputs/graylog/graylog.go b/plugins/outputs/graylog/graylog.go index 082b48552..9aeded2d9 100644 --- a/plugins/outputs/graylog/graylog.go +++ b/plugins/outputs/graylog/graylog.go @@ -15,6 +15,7 @@ import ( "net" "os" "strings" + "sync" "time" "github.com/influxdata/telegraf" @@ -27,12 +28,13 @@ import ( var sampleConfig string const ( - defaultEndpoint = "127.0.0.1:12201" - defaultConnection = "wan" - defaultMaxChunkSizeWan = 1420 - defaultMaxChunkSizeLan = 8154 - defaultScheme = "udp" - defaultTimeout = 5 * time.Second + defaultEndpoint = "127.0.0.1:12201" + defaultConnection = "wan" + defaultMaxChunkSizeWan = 1420 + defaultMaxChunkSizeLan = 8154 + defaultScheme = "udp" + defaultTimeout = 5 * time.Second + defaultReconnectionTime = 15 * time.Second ) var defaultSpecFields = []string{"version", "host", "short_message", "full_message", "timestamp", "level", "facility", "line", "file"} @@ -316,10 +318,18 @@ type Graylog struct { ShortMessageField string `toml:"short_message_field"` NameFieldNoPrefix bool `toml:"name_field_noprefix"` Timeout config.Duration `toml:"timeout"` + Reconnection bool `toml:"connection_retry"` + ReconnectionTime config.Duration `toml:"connection_retry_wait_time"` + Log telegraf.Logger `toml:"-"` tlsint.ClientConfig - writer io.Writer - closers []io.WriteCloser + writer io.Writer + closers []io.WriteCloser + unconnected []string + stopRetry bool + wg sync.WaitGroup + + sync.Mutex } func (*Graylog) SampleConfig() string { @@ -327,8 +337,6 @@ func (*Graylog) SampleConfig() string { } func (g *Graylog) Connect() error { - dialer := &net.Dialer{Timeout: time.Duration(g.Timeout)} - if len(g.Servers) == 0 { g.Servers = append(g.Servers, "localhost:12201") } @@ -338,22 +346,92 @@ func (g *Graylog) Connect() error { return err } - writers := make([]io.Writer, 0, len(g.Servers)) - for _, server := range g.Servers { - w := newGelfWriter(gelfConfig{Endpoint: server}, dialer, tlsCfg) - err := w.Connect() - if err != nil { - return fmt.Errorf("failed to connect to server [%s]: %v", server, err) - } - writers = append(writers, w) - g.closers = append(g.closers, w) + if g.Reconnection { + go g.connectRetry(tlsCfg) + return nil } + unconnected, gelfs := g.connectEndpoints(g.Servers, tlsCfg) + if len(unconnected) > 0 { + servers := strings.Join(unconnected, ",") + return fmt.Errorf("connect: connection failed for %s", servers) + } + var writers []io.Writer + var closers []io.WriteCloser + for _, w := range gelfs { + writers = append(writers, w) + closers = append(closers, w) + } + g.Lock() + defer g.Unlock() g.writer = io.MultiWriter(writers...) + g.closers = closers + return nil } +func (g *Graylog) connectRetry(tlsCfg *tls.Config) { + var writers []io.Writer + var closers []io.WriteCloser + var attempt int64 + + g.wg.Add(1) + + unconnected := append([]string{}, g.Servers...) + for { + unconnected, gelfs := g.connectEndpoints(unconnected, tlsCfg) + for _, w := range gelfs { + writers = append(writers, w) + closers = append(closers, w) + } + g.Lock() + g.unconnected = unconnected + stopRetry := g.stopRetry + g.Unlock() + if stopRetry { + g.Log.Info("Stopping connection retries...") + break + } + if len(unconnected) == 0 { + break + } + attempt++ + servers := strings.Join(unconnected, ",") + g.Log.Infof("Not connected to endpoints %s after attempt #%d...", servers, attempt) + time.Sleep(time.Duration(g.ReconnectionTime)) + } + g.Log.Info("Connected!") + + g.Lock() + g.writer = io.MultiWriter(writers...) + g.closers = closers + g.Unlock() + + g.wg.Done() +} + +func (g *Graylog) connectEndpoints(servers []string, tlsCfg *tls.Config) ([]string, []gelf) { + writers := make([]gelf, 0, len(servers)) + unconnected := make([]string, 0, len(servers)) + dialer := &net.Dialer{Timeout: time.Duration(g.Timeout)} + for _, server := range servers { + w := newGelfWriter(gelfConfig{Endpoint: server}, dialer, tlsCfg) + if err := w.Connect(); err != nil { + g.Log.Warnf("failed to connect to server [%s]: %v", server, err) + unconnected = append(unconnected, server) + continue + } + writers = append(writers, w) + } + return unconnected, writers +} + func (g *Graylog) Close() error { + g.Lock() + g.stopRetry = true + g.Unlock() + g.wg.Wait() + for _, closer := range g.closers { _ = closer.Close() } @@ -361,6 +439,17 @@ func (g *Graylog) Close() error { } func (g *Graylog) Write(metrics []telegraf.Metric) error { + g.Lock() + writer := g.writer + g.Unlock() + + if writer == nil { + g.Lock() + unconnected := strings.Join(g.unconnected, ",") + g.Unlock() + + return fmt.Errorf("not connected to %s", unconnected) + } for _, metric := range metrics { values, err := g.serialize(metric) if err != nil { @@ -368,7 +457,7 @@ func (g *Graylog) Write(metrics []telegraf.Metric) error { } for _, value := range values { - _, err := g.writer.Write([]byte(value)) + _, err := writer.Write([]byte(value)) if err != nil { return fmt.Errorf("error writing message: %q, %v", value, err) } @@ -444,7 +533,8 @@ func fieldInSpec(field string) bool { func init() { outputs.Add("graylog", func() telegraf.Output { return &Graylog{ - Timeout: config.Duration(defaultTimeout), + Timeout: config.Duration(defaultTimeout), + ReconnectionTime: config.Duration(defaultReconnectionTime), } }) } diff --git a/plugins/outputs/graylog/graylog_test_linux.go b/plugins/outputs/graylog/graylog_test_linux.go index c701521af..5d26f2fd6 100644 --- a/plugins/outputs/graylog/graylog_test_linux.go +++ b/plugins/outputs/graylog/graylog_test_linux.go @@ -10,21 +10,21 @@ import ( "fmt" "io" "net" + "strings" "sync" "testing" "time" + "github.com/influxdata/telegraf/config" tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) -type GelfObject map[string]interface{} - func TestWriteUDP(t *testing.T) { tests := []struct { - name string - instance Graylog + name string + namefieldnoprefix bool }{ { name: "default without scheme", @@ -33,42 +33,31 @@ func TestWriteUDP(t *testing.T) { name: "UDP", }, { - name: "UDP non-standard name field", - instance: Graylog{ - NameFieldNoPrefix: true, - }, + name: "UDP non-standard name field", + namefieldnoprefix: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var wg sync.WaitGroup - wg.Add(1) - address := make(chan string, 1) - errs := make(chan error) - go UDPServer(t, &wg, &tt.instance, address, errs) - require.NoError(t, <-errs) - - i := tt.instance - i.Servers = []string{fmt.Sprintf("udp://%s", <-address)} - err := i.Connect() - require.NoError(t, err) - defer i.Close() + address := UDPServer(t, &wg, tt.namefieldnoprefix) + plugin := Graylog{ + NameFieldNoPrefix: tt.namefieldnoprefix, + Servers: []string{"udp://" + address}, + } + require.NoError(t, plugin.Connect()) + defer plugin.Close() defer wg.Wait() metrics := testutil.MockMetrics() // UDP scenario: // 4 messages are send - - err = i.Write(metrics) - require.NoError(t, err) - err = i.Write(metrics) - require.NoError(t, err) - err = i.Write(metrics) - require.NoError(t, err) - err = i.Write(metrics) - require.NoError(t, err) + require.NoError(t, plugin.Write(metrics)) + require.NoError(t, plugin.Write(metrics)) + require.NoError(t, plugin.Write(metrics)) + require.NoError(t, plugin.Write(metrics)) }) } } @@ -80,56 +69,49 @@ func TestWriteTCP(t *testing.T) { require.NoError(t, err) tests := []struct { - name string - instance Graylog - tlsServerConfig *tls.Config + name string + tlsClientCfg tlsint.ClientConfig }{ { name: "TCP", }, { name: "TLS", - instance: Graylog{ - ClientConfig: tlsint.ClientConfig{ - ServerName: "localhost", - TLSCA: tlsClientConfig.TLSCA, - TLSKey: tlsClientConfig.TLSKey, - TLSCert: tlsClientConfig.TLSCert, - }, + tlsClientCfg: tlsint.ClientConfig{ + ServerName: "localhost", + TLSCA: tlsClientConfig.TLSCA, + TLSKey: tlsClientConfig.TLSKey, + TLSCert: tlsClientConfig.TLSCert, }, - tlsServerConfig: tlsServerConfig, }, { name: "TLS no validation", - instance: Graylog{ - ClientConfig: tlsint.ClientConfig{ - InsecureSkipVerify: true, - ServerName: "localhost", - TLSKey: tlsClientConfig.TLSKey, - TLSCert: tlsClientConfig.TLSCert, - }, + tlsClientCfg: tlsint.ClientConfig{ + InsecureSkipVerify: true, + ServerName: "localhost", + TLSKey: tlsClientConfig.TLSKey, + TLSCert: tlsClientConfig.TLSCert, }, - tlsServerConfig: tlsServerConfig, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var wg sync.WaitGroup - wg.Add(1) - address := make(chan string, 1) errs := make(chan error) - fmt.Println("test: staring TCP server") - go TCPServer(t, &wg, tt.tlsServerConfig, address, errs) - require.NoError(t, <-errs) + address := TCPServer(t, &wg, tlsServerConfig, errs) - i := tt.instance - i.Servers = []string{fmt.Sprintf("tcp://%s", <-address)} - fmt.Println("client: connecting to TCP server") - err = i.Connect() - require.NoError(t, err) - fmt.Println("client: connected") - defer i.Close() + plugin := Graylog{ + ClientConfig: tlsint.ClientConfig{ + InsecureSkipVerify: true, + ServerName: "localhost", + TLSKey: tlsClientConfig.TLSKey, + TLSCert: tlsClientConfig.TLSCert, + }, + Servers: []string{"tcp://" + address}, + } + require.NoError(t, plugin.Connect()) + defer plugin.Close() defer wg.Wait() metrics := testutil.MockMetrics() @@ -140,38 +122,20 @@ func TestWriteTCP(t *testing.T) { // -> the 3rd write fails with error // -> during the 4th write connection is restored and write is successful - fmt.Println("client: writting packet 1") - err = i.Write(metrics) - require.NoError(t, err) - - fmt.Println("client: writting packet 2") - err = i.Write(metrics) - require.NoError(t, err) - - fmt.Println("client: checking for errors") + require.NoError(t, plugin.Write(metrics)) + require.NoError(t, plugin.Write(metrics)) require.NoError(t, <-errs) - - fmt.Println("client: writting packet 3") - err = i.Write(metrics) - - fmt.Println("client: writting packet 4") - err = i.Write(metrics) - require.NoError(t, err) + require.ErrorContains(t, plugin.Write(metrics), "error writing message") + require.NoError(t, plugin.Write(metrics)) }) } } -func UDPServer(t *testing.T, wg *sync.WaitGroup, config *Graylog, address chan string, errs chan error) { - udpServer, err := net.ListenPacket("udp", "127.0.0.1:0") - errs <- err - if err != nil { - return - } +type GelfObject map[string]interface{} - // Send the address with the random port to the channel for the graylog instance to use it - address <- udpServer.LocalAddr().String() - defer udpServer.Close() - defer wg.Done() +func UDPServer(t *testing.T, wg *sync.WaitGroup, namefieldnoprefix bool) string { + udpServer, err := net.ListenPacket("udp", "127.0.0.1:0") + require.NoError(t, err) recv := func() error { bufR := make([]byte, 1024) @@ -202,7 +166,7 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, config *Graylog, address chan s return err } require.Equal(t, obj["short_message"], "telegraf") - if config.NameFieldNoPrefix { + if namefieldnoprefix { require.Equal(t, obj["name"], "test1") } else { require.Equal(t, obj["_name"], "test1") @@ -213,37 +177,28 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, config *Graylog, address chan s return nil } - // in UDP scenario all 4 messages are received + // Send the address with the random port to the channel for the graylog instance to use it + address := udpServer.LocalAddr().String() + wg.Add(1) + go func() { + defer udpServer.Close() + defer wg.Done() - err = recv() - if err != nil { - fmt.Println(err) - } - err = recv() - if err != nil { - fmt.Println(err) - } - err = recv() - if err != nil { - fmt.Println(err) - } - err = recv() - if err != nil { - fmt.Println(err) - } + // in UDP scenario all 4 messages are received + require.NoError(t, recv()) + require.NoError(t, recv()) + require.NoError(t, recv()) + require.NoError(t, recv()) + }() + return address } -func TCPServer(t *testing.T, wg *sync.WaitGroup, tlsConfig *tls.Config, address chan string, errs chan error) { +func TCPServer(t *testing.T, wg *sync.WaitGroup, tlsConfig *tls.Config, errs chan error) string { tcpServer, err := net.Listen("tcp", "127.0.0.1:0") - errs <- err - if err != nil { - return - } + require.NoError(t, err) // Send the address with the random port to the channel for the graylog instance to use it - address <- tcpServer.Addr().String() - defer tcpServer.Close() - defer wg.Done() + address := tcpServer.Addr().String() accept := func() (net.Conn, error) { conn, err := tcpServer.Accept() @@ -294,47 +249,196 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup, tlsConfig *tls.Config, address return nil } - fmt.Println("server: opening connection") - conn, err := accept() - if err != nil { - fmt.Println(err) - } - defer conn.Close() + wg.Add(1) + go func() { + defer tcpServer.Close() + defer wg.Done() - // in TCP scenario only 3 messages are received, the 3rd is lost due to simulated connection break after the 2nd + fmt.Println("server: opening connection") + conn, err := accept() + if err != nil { + fmt.Println(err) + } + defer conn.Close() - fmt.Println("server: receving packet 1") - err = recv(conn) - if err != nil { - fmt.Println(err) - } - fmt.Println("server: receving packet 2") - err = recv(conn) - if err != nil { - fmt.Println(err) - } + // in TCP scenario only 3 messages are received, the 3rd is lost due to simulated connection break after the 2nd - fmt.Println("server: closing connection") - err = conn.Close() - if err != nil { - fmt.Println(err) - } + fmt.Println("server: receving packet 1") + err = recv(conn) + if err != nil { + fmt.Println(err) + } + fmt.Println("server: receving packet 2") + err = recv(conn) + if err != nil { + fmt.Println(err) + } - errs <- err - if err != nil { - return - } + fmt.Println("server: closing connection") + err = conn.Close() + if err != nil { + fmt.Println(err) + } - fmt.Println("server: re-opening connection") - conn, err = accept() - if err != nil { - fmt.Println(err) - } - defer conn.Close() + errs <- err + if err != nil { + return + } - fmt.Println("server: receving packet 4") - err = recv(conn) - if err != nil { - fmt.Println(err) - } + fmt.Println("server: re-opening connection") + conn, err = accept() + if err != nil { + fmt.Println(err) + } + defer conn.Close() + + fmt.Println("server: receving packet 4") + err = recv(conn) + if err != nil { + fmt.Println(err) + } + }() + return address +} + +func TestWriteUDPServerDown(t *testing.T) { + dummy, err := net.ListenPacket("udp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := Graylog{ + NameFieldNoPrefix: true, + Servers: []string{"udp://" + dummy.LocalAddr().String()}, + Log: testutil.Logger{}, + } + require.NoError(t, dummy.Close()) + require.NoError(t, plugin.Connect()) +} + +func TestWriteUDPServerUnavailableOnWrite(t *testing.T) { + dummy, err := net.ListenPacket("udp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := Graylog{ + NameFieldNoPrefix: true, + Servers: []string{"udp://" + dummy.LocalAddr().String()}, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Connect()) + require.NoError(t, dummy.Close()) + require.NoError(t, plugin.Write(testutil.MockMetrics())) +} + +func TestWriteTCPServerDown(t *testing.T) { + dummy, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := Graylog{ + NameFieldNoPrefix: true, + Servers: []string{"tcp://" + dummy.Addr().String()}, + Log: testutil.Logger{}, + } + require.NoError(t, dummy.Close()) + require.ErrorContains(t, plugin.Connect(), "connect: connection refused") +} + +func TestWriteTCPServerUnavailableOnWrite(t *testing.T) { + dummy, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := Graylog{ + NameFieldNoPrefix: true, + Servers: []string{"tcp://" + dummy.Addr().String()}, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Connect()) + require.NoError(t, dummy.Close()) + err = plugin.Write(testutil.MockMetrics()) + require.ErrorContains(t, err, "error writing message") +} + +func TestWriteUDPServerDownRetry(t *testing.T) { + dummy, err := net.ListenPacket("udp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := Graylog{ + NameFieldNoPrefix: true, + Servers: []string{"udp://" + dummy.LocalAddr().String()}, + Reconnection: true, + Log: testutil.Logger{}, + } + require.NoError(t, dummy.Close()) + require.NoError(t, plugin.Connect()) + require.NoError(t, plugin.Close()) +} + +func TestWriteUDPServerUnavailableOnWriteRetry(t *testing.T) { + dummy, err := net.ListenPacket("udp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := Graylog{ + NameFieldNoPrefix: true, + Servers: []string{"udp://" + dummy.LocalAddr().String()}, + Reconnection: true, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Connect()) + require.NoError(t, dummy.Close()) + err = plugin.Write(testutil.MockMetrics()) + require.ErrorContains(t, err, "not connected") + require.NoError(t, plugin.Close()) +} + +func TestWriteTCPServerDownRetry(t *testing.T) { + dummy, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + logger := &testutil.CaptureLogger{} + plugin := Graylog{ + NameFieldNoPrefix: true, + Servers: []string{"tcp://" + dummy.Addr().String()}, + Reconnection: true, + ReconnectionTime: config.Duration(100 * time.Millisecond), + Log: logger, + } + require.NoError(t, dummy.Close()) + require.NoError(t, plugin.Connect()) + require.Eventually(t, func() bool { + return strings.Contains(logger.LastError(), "after attempt #5...") + }, 5*time.Second, 100*time.Millisecond) + require.NoError(t, plugin.Close()) +} + +func TestWriteTCPServerUnavailableOnWriteRetry(t *testing.T) { + dummy, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := Graylog{ + NameFieldNoPrefix: true, + Servers: []string{"tcp://" + dummy.Addr().String()}, + Reconnection: true, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Connect()) + require.NoError(t, dummy.Close()) + err = plugin.Write(testutil.MockMetrics()) + require.ErrorContains(t, err, "not connected") + require.NoError(t, plugin.Close()) +} + +func TestWriteTCPRetryStopping(t *testing.T) { + dummy, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + logger := &testutil.CaptureLogger{} + plugin := Graylog{ + NameFieldNoPrefix: true, + Servers: []string{"tcp://" + dummy.Addr().String()}, + Reconnection: true, + ReconnectionTime: config.Duration(10 * time.Millisecond), + Log: logger, + } + require.NoError(t, dummy.Close()) + require.NoError(t, plugin.Connect()) + time.Sleep(100 * time.Millisecond) + require.NoError(t, plugin.Close()) } diff --git a/plugins/outputs/graylog/sample.conf b/plugins/outputs/graylog/sample.conf index 9089e06a7..a07bb1be0 100644 --- a/plugins/outputs/graylog/sample.conf +++ b/plugins/outputs/graylog/sample.conf @@ -16,6 +16,15 @@ ## Set to true for backward compatibility. # name_field_no_prefix = false + ## Connection retry options + ## Attempt to connect to the enpoints if the initial connection fails. + ## If 'false', Telegraf will give up after 3 connection attempt and will + ## exit with an error. If set to 'true', the plugin will retry to connect + ## to the unconnected endpoints infinitely. + # connection_retry = false + ## Time to wait between connection retry attempts. + # connection_retry_wait_time = "15s" + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index f3bdbc325..abb847b83 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -1215,10 +1215,10 @@ func TestDBNotFoundShouldDropMetricWhenSkipDatabaseCreateIsTrue(t *testing.T) { err = output.Connect() require.NoError(t, err) err = output.Write(metrics) - require.Contains(t, logger.LastError, "database not found") + require.Contains(t, logger.LastError(), "database not found") require.NoError(t, err) err = output.Write(metrics) - require.Contains(t, logger.LastError, "database not found") + require.Contains(t, logger.LastError(), "database not found") require.NoError(t, err) } diff --git a/testutil/capturelog.go b/testutil/capturelog.go index 654538b5c..8a2fcac8d 100644 --- a/testutil/capturelog.go +++ b/testutil/capturelog.go @@ -3,6 +3,7 @@ package testutil import ( "fmt" "log" + "sync" "github.com/influxdata/telegraf" ) @@ -11,21 +12,26 @@ var _ telegraf.Logger = &CaptureLogger{} // CaptureLogger defines a logging structure for plugins. type CaptureLogger struct { - Name string // Name is the plugin name, will be printed in the `[]`. - LastError string + Name string // Name is the plugin name, will be printed in the `[]`. + errors []string + sync.Mutex } // Errorf logs an error message, patterned after log.Printf. func (l *CaptureLogger) Errorf(format string, args ...interface{}) { s := fmt.Sprintf("E! ["+l.Name+"] "+format, args...) - l.LastError = s + l.Lock() + l.errors = append(l.errors, s) + l.Unlock() log.Print(s) } // Error logs an error message, patterned after log.Print. func (l *CaptureLogger) Error(args ...interface{}) { s := fmt.Sprint(append([]interface{}{"E! [" + l.Name + "] "}, args...)...) - l.LastError = s + l.Lock() + l.errors = append(l.errors, s) + l.Unlock() log.Print(s) } @@ -58,3 +64,19 @@ func (l *CaptureLogger) Infof(format string, args ...interface{}) { func (l *CaptureLogger) Info(args ...interface{}) { log.Print(append([]interface{}{"I! [" + l.Name + "] "}, args...)...) } + +func (l *CaptureLogger) Errors() []string { + l.Lock() + defer l.Unlock() + e := append([]string{}, l.errors...) + return e +} + +func (l *CaptureLogger) LastError() string { + l.Lock() + defer l.Unlock() + if len(l.errors) > 0 { + return l.errors[len(l.errors)-1] + } + return "" +}