feat(outputs.graylog): implement optional connection retries (#11950)
This commit is contained in:
parent
3160d52187
commit
bc56233e1b
|
|
@ -270,8 +270,10 @@ func TestRunParser(t *testing.T) {
|
|||
func TestRunParserInvalidMsg(t *testing.T) {
|
||||
var testmsg = []byte("cpu_load_short")
|
||||
|
||||
logger := &testutil.CaptureLogger{}
|
||||
|
||||
listener, in := newTestTCPListener()
|
||||
listener.Log = &testutil.CaptureLogger{}
|
||||
listener.Log = logger
|
||||
listener.acc = &testutil.Accumulator{}
|
||||
|
||||
parser := &influx.Parser{}
|
||||
|
|
@ -283,8 +285,7 @@ func TestRunParserInvalidMsg(t *testing.T) {
|
|||
in <- testmsg
|
||||
|
||||
listener.Stop()
|
||||
errmsg := listener.Log.(*testutil.CaptureLogger).LastError
|
||||
require.Contains(t, errmsg, "tcp_listener has received 1 malformed packets thus far.")
|
||||
require.Contains(t, logger.LastError(), "tcp_listener has received 1 malformed packets thus far.")
|
||||
}
|
||||
|
||||
func TestRunParserGraphiteMsg(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -51,6 +51,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
## Set to true for backward compatibility.
|
||||
# name_field_no_prefix = false
|
||||
|
||||
## Connection retry options
|
||||
## Attempt to connect to the enpoints if the initial connection fails.
|
||||
## If 'false', Telegraf will give up after 3 connection attempt and will
|
||||
## exit with an error. If set to 'true', the plugin will retry to connect
|
||||
## to the unconnected endpoints infinitely.
|
||||
# connection_retry = false
|
||||
## Time to wait between connection retry attempts.
|
||||
# connection_retry_wait_time = "15s"
|
||||
|
||||
## Optional TLS Config
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import (
|
|||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
|
|
@ -27,12 +28,13 @@ import (
|
|||
var sampleConfig string
|
||||
|
||||
const (
|
||||
defaultEndpoint = "127.0.0.1:12201"
|
||||
defaultConnection = "wan"
|
||||
defaultMaxChunkSizeWan = 1420
|
||||
defaultMaxChunkSizeLan = 8154
|
||||
defaultScheme = "udp"
|
||||
defaultTimeout = 5 * time.Second
|
||||
defaultEndpoint = "127.0.0.1:12201"
|
||||
defaultConnection = "wan"
|
||||
defaultMaxChunkSizeWan = 1420
|
||||
defaultMaxChunkSizeLan = 8154
|
||||
defaultScheme = "udp"
|
||||
defaultTimeout = 5 * time.Second
|
||||
defaultReconnectionTime = 15 * time.Second
|
||||
)
|
||||
|
||||
var defaultSpecFields = []string{"version", "host", "short_message", "full_message", "timestamp", "level", "facility", "line", "file"}
|
||||
|
|
@ -316,10 +318,18 @@ type Graylog struct {
|
|||
ShortMessageField string `toml:"short_message_field"`
|
||||
NameFieldNoPrefix bool `toml:"name_field_noprefix"`
|
||||
Timeout config.Duration `toml:"timeout"`
|
||||
Reconnection bool `toml:"connection_retry"`
|
||||
ReconnectionTime config.Duration `toml:"connection_retry_wait_time"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
tlsint.ClientConfig
|
||||
|
||||
writer io.Writer
|
||||
closers []io.WriteCloser
|
||||
writer io.Writer
|
||||
closers []io.WriteCloser
|
||||
unconnected []string
|
||||
stopRetry bool
|
||||
wg sync.WaitGroup
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func (*Graylog) SampleConfig() string {
|
||||
|
|
@ -327,8 +337,6 @@ func (*Graylog) SampleConfig() string {
|
|||
}
|
||||
|
||||
func (g *Graylog) Connect() error {
|
||||
dialer := &net.Dialer{Timeout: time.Duration(g.Timeout)}
|
||||
|
||||
if len(g.Servers) == 0 {
|
||||
g.Servers = append(g.Servers, "localhost:12201")
|
||||
}
|
||||
|
|
@ -338,22 +346,92 @@ func (g *Graylog) Connect() error {
|
|||
return err
|
||||
}
|
||||
|
||||
writers := make([]io.Writer, 0, len(g.Servers))
|
||||
for _, server := range g.Servers {
|
||||
w := newGelfWriter(gelfConfig{Endpoint: server}, dialer, tlsCfg)
|
||||
err := w.Connect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to server [%s]: %v", server, err)
|
||||
}
|
||||
writers = append(writers, w)
|
||||
g.closers = append(g.closers, w)
|
||||
if g.Reconnection {
|
||||
go g.connectRetry(tlsCfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
unconnected, gelfs := g.connectEndpoints(g.Servers, tlsCfg)
|
||||
if len(unconnected) > 0 {
|
||||
servers := strings.Join(unconnected, ",")
|
||||
return fmt.Errorf("connect: connection failed for %s", servers)
|
||||
}
|
||||
var writers []io.Writer
|
||||
var closers []io.WriteCloser
|
||||
for _, w := range gelfs {
|
||||
writers = append(writers, w)
|
||||
closers = append(closers, w)
|
||||
}
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
g.writer = io.MultiWriter(writers...)
|
||||
g.closers = closers
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Graylog) connectRetry(tlsCfg *tls.Config) {
|
||||
var writers []io.Writer
|
||||
var closers []io.WriteCloser
|
||||
var attempt int64
|
||||
|
||||
g.wg.Add(1)
|
||||
|
||||
unconnected := append([]string{}, g.Servers...)
|
||||
for {
|
||||
unconnected, gelfs := g.connectEndpoints(unconnected, tlsCfg)
|
||||
for _, w := range gelfs {
|
||||
writers = append(writers, w)
|
||||
closers = append(closers, w)
|
||||
}
|
||||
g.Lock()
|
||||
g.unconnected = unconnected
|
||||
stopRetry := g.stopRetry
|
||||
g.Unlock()
|
||||
if stopRetry {
|
||||
g.Log.Info("Stopping connection retries...")
|
||||
break
|
||||
}
|
||||
if len(unconnected) == 0 {
|
||||
break
|
||||
}
|
||||
attempt++
|
||||
servers := strings.Join(unconnected, ",")
|
||||
g.Log.Infof("Not connected to endpoints %s after attempt #%d...", servers, attempt)
|
||||
time.Sleep(time.Duration(g.ReconnectionTime))
|
||||
}
|
||||
g.Log.Info("Connected!")
|
||||
|
||||
g.Lock()
|
||||
g.writer = io.MultiWriter(writers...)
|
||||
g.closers = closers
|
||||
g.Unlock()
|
||||
|
||||
g.wg.Done()
|
||||
}
|
||||
|
||||
func (g *Graylog) connectEndpoints(servers []string, tlsCfg *tls.Config) ([]string, []gelf) {
|
||||
writers := make([]gelf, 0, len(servers))
|
||||
unconnected := make([]string, 0, len(servers))
|
||||
dialer := &net.Dialer{Timeout: time.Duration(g.Timeout)}
|
||||
for _, server := range servers {
|
||||
w := newGelfWriter(gelfConfig{Endpoint: server}, dialer, tlsCfg)
|
||||
if err := w.Connect(); err != nil {
|
||||
g.Log.Warnf("failed to connect to server [%s]: %v", server, err)
|
||||
unconnected = append(unconnected, server)
|
||||
continue
|
||||
}
|
||||
writers = append(writers, w)
|
||||
}
|
||||
return unconnected, writers
|
||||
}
|
||||
|
||||
func (g *Graylog) Close() error {
|
||||
g.Lock()
|
||||
g.stopRetry = true
|
||||
g.Unlock()
|
||||
g.wg.Wait()
|
||||
|
||||
for _, closer := range g.closers {
|
||||
_ = closer.Close()
|
||||
}
|
||||
|
|
@ -361,6 +439,17 @@ func (g *Graylog) Close() error {
|
|||
}
|
||||
|
||||
func (g *Graylog) Write(metrics []telegraf.Metric) error {
|
||||
g.Lock()
|
||||
writer := g.writer
|
||||
g.Unlock()
|
||||
|
||||
if writer == nil {
|
||||
g.Lock()
|
||||
unconnected := strings.Join(g.unconnected, ",")
|
||||
g.Unlock()
|
||||
|
||||
return fmt.Errorf("not connected to %s", unconnected)
|
||||
}
|
||||
for _, metric := range metrics {
|
||||
values, err := g.serialize(metric)
|
||||
if err != nil {
|
||||
|
|
@ -368,7 +457,7 @@ func (g *Graylog) Write(metrics []telegraf.Metric) error {
|
|||
}
|
||||
|
||||
for _, value := range values {
|
||||
_, err := g.writer.Write([]byte(value))
|
||||
_, err := writer.Write([]byte(value))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error writing message: %q, %v", value, err)
|
||||
}
|
||||
|
|
@ -444,7 +533,8 @@ func fieldInSpec(field string) bool {
|
|||
func init() {
|
||||
outputs.Add("graylog", func() telegraf.Output {
|
||||
return &Graylog{
|
||||
Timeout: config.Duration(defaultTimeout),
|
||||
Timeout: config.Duration(defaultTimeout),
|
||||
ReconnectionTime: config.Duration(defaultReconnectionTime),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,21 +10,21 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type GelfObject map[string]interface{}
|
||||
|
||||
func TestWriteUDP(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
instance Graylog
|
||||
name string
|
||||
namefieldnoprefix bool
|
||||
}{
|
||||
{
|
||||
name: "default without scheme",
|
||||
|
|
@ -33,42 +33,31 @@ func TestWriteUDP(t *testing.T) {
|
|||
name: "UDP",
|
||||
},
|
||||
{
|
||||
name: "UDP non-standard name field",
|
||||
instance: Graylog{
|
||||
NameFieldNoPrefix: true,
|
||||
},
|
||||
name: "UDP non-standard name field",
|
||||
namefieldnoprefix: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
address := make(chan string, 1)
|
||||
errs := make(chan error)
|
||||
go UDPServer(t, &wg, &tt.instance, address, errs)
|
||||
require.NoError(t, <-errs)
|
||||
|
||||
i := tt.instance
|
||||
i.Servers = []string{fmt.Sprintf("udp://%s", <-address)}
|
||||
err := i.Connect()
|
||||
require.NoError(t, err)
|
||||
defer i.Close()
|
||||
address := UDPServer(t, &wg, tt.namefieldnoprefix)
|
||||
plugin := Graylog{
|
||||
NameFieldNoPrefix: tt.namefieldnoprefix,
|
||||
Servers: []string{"udp://" + address},
|
||||
}
|
||||
require.NoError(t, plugin.Connect())
|
||||
defer plugin.Close()
|
||||
defer wg.Wait()
|
||||
|
||||
metrics := testutil.MockMetrics()
|
||||
|
||||
// UDP scenario:
|
||||
// 4 messages are send
|
||||
|
||||
err = i.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
err = i.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
err = i.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
err = i.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, plugin.Write(metrics))
|
||||
require.NoError(t, plugin.Write(metrics))
|
||||
require.NoError(t, plugin.Write(metrics))
|
||||
require.NoError(t, plugin.Write(metrics))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -80,56 +69,49 @@ func TestWriteTCP(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
instance Graylog
|
||||
tlsServerConfig *tls.Config
|
||||
name string
|
||||
tlsClientCfg tlsint.ClientConfig
|
||||
}{
|
||||
{
|
||||
name: "TCP",
|
||||
},
|
||||
{
|
||||
name: "TLS",
|
||||
instance: Graylog{
|
||||
ClientConfig: tlsint.ClientConfig{
|
||||
ServerName: "localhost",
|
||||
TLSCA: tlsClientConfig.TLSCA,
|
||||
TLSKey: tlsClientConfig.TLSKey,
|
||||
TLSCert: tlsClientConfig.TLSCert,
|
||||
},
|
||||
tlsClientCfg: tlsint.ClientConfig{
|
||||
ServerName: "localhost",
|
||||
TLSCA: tlsClientConfig.TLSCA,
|
||||
TLSKey: tlsClientConfig.TLSKey,
|
||||
TLSCert: tlsClientConfig.TLSCert,
|
||||
},
|
||||
tlsServerConfig: tlsServerConfig,
|
||||
},
|
||||
{
|
||||
name: "TLS no validation",
|
||||
instance: Graylog{
|
||||
ClientConfig: tlsint.ClientConfig{
|
||||
InsecureSkipVerify: true,
|
||||
ServerName: "localhost",
|
||||
TLSKey: tlsClientConfig.TLSKey,
|
||||
TLSCert: tlsClientConfig.TLSCert,
|
||||
},
|
||||
tlsClientCfg: tlsint.ClientConfig{
|
||||
InsecureSkipVerify: true,
|
||||
ServerName: "localhost",
|
||||
TLSKey: tlsClientConfig.TLSKey,
|
||||
TLSCert: tlsClientConfig.TLSCert,
|
||||
},
|
||||
tlsServerConfig: tlsServerConfig,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
address := make(chan string, 1)
|
||||
errs := make(chan error)
|
||||
fmt.Println("test: staring TCP server")
|
||||
go TCPServer(t, &wg, tt.tlsServerConfig, address, errs)
|
||||
require.NoError(t, <-errs)
|
||||
address := TCPServer(t, &wg, tlsServerConfig, errs)
|
||||
|
||||
i := tt.instance
|
||||
i.Servers = []string{fmt.Sprintf("tcp://%s", <-address)}
|
||||
fmt.Println("client: connecting to TCP server")
|
||||
err = i.Connect()
|
||||
require.NoError(t, err)
|
||||
fmt.Println("client: connected")
|
||||
defer i.Close()
|
||||
plugin := Graylog{
|
||||
ClientConfig: tlsint.ClientConfig{
|
||||
InsecureSkipVerify: true,
|
||||
ServerName: "localhost",
|
||||
TLSKey: tlsClientConfig.TLSKey,
|
||||
TLSCert: tlsClientConfig.TLSCert,
|
||||
},
|
||||
Servers: []string{"tcp://" + address},
|
||||
}
|
||||
require.NoError(t, plugin.Connect())
|
||||
defer plugin.Close()
|
||||
defer wg.Wait()
|
||||
|
||||
metrics := testutil.MockMetrics()
|
||||
|
|
@ -140,38 +122,20 @@ func TestWriteTCP(t *testing.T) {
|
|||
// -> the 3rd write fails with error
|
||||
// -> during the 4th write connection is restored and write is successful
|
||||
|
||||
fmt.Println("client: writting packet 1")
|
||||
err = i.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Println("client: writting packet 2")
|
||||
err = i.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Println("client: checking for errors")
|
||||
require.NoError(t, plugin.Write(metrics))
|
||||
require.NoError(t, plugin.Write(metrics))
|
||||
require.NoError(t, <-errs)
|
||||
|
||||
fmt.Println("client: writting packet 3")
|
||||
err = i.Write(metrics)
|
||||
|
||||
fmt.Println("client: writting packet 4")
|
||||
err = i.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
require.ErrorContains(t, plugin.Write(metrics), "error writing message")
|
||||
require.NoError(t, plugin.Write(metrics))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func UDPServer(t *testing.T, wg *sync.WaitGroup, config *Graylog, address chan string, errs chan error) {
|
||||
udpServer, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
errs <- err
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
type GelfObject map[string]interface{}
|
||||
|
||||
// Send the address with the random port to the channel for the graylog instance to use it
|
||||
address <- udpServer.LocalAddr().String()
|
||||
defer udpServer.Close()
|
||||
defer wg.Done()
|
||||
func UDPServer(t *testing.T, wg *sync.WaitGroup, namefieldnoprefix bool) string {
|
||||
udpServer, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
recv := func() error {
|
||||
bufR := make([]byte, 1024)
|
||||
|
|
@ -202,7 +166,7 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, config *Graylog, address chan s
|
|||
return err
|
||||
}
|
||||
require.Equal(t, obj["short_message"], "telegraf")
|
||||
if config.NameFieldNoPrefix {
|
||||
if namefieldnoprefix {
|
||||
require.Equal(t, obj["name"], "test1")
|
||||
} else {
|
||||
require.Equal(t, obj["_name"], "test1")
|
||||
|
|
@ -213,37 +177,28 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, config *Graylog, address chan s
|
|||
return nil
|
||||
}
|
||||
|
||||
// in UDP scenario all 4 messages are received
|
||||
// Send the address with the random port to the channel for the graylog instance to use it
|
||||
address := udpServer.LocalAddr().String()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer udpServer.Close()
|
||||
defer wg.Done()
|
||||
|
||||
err = recv()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
err = recv()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
err = recv()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
err = recv()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
// in UDP scenario all 4 messages are received
|
||||
require.NoError(t, recv())
|
||||
require.NoError(t, recv())
|
||||
require.NoError(t, recv())
|
||||
require.NoError(t, recv())
|
||||
}()
|
||||
return address
|
||||
}
|
||||
|
||||
func TCPServer(t *testing.T, wg *sync.WaitGroup, tlsConfig *tls.Config, address chan string, errs chan error) {
|
||||
func TCPServer(t *testing.T, wg *sync.WaitGroup, tlsConfig *tls.Config, errs chan error) string {
|
||||
tcpServer, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
errs <- err
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
// Send the address with the random port to the channel for the graylog instance to use it
|
||||
address <- tcpServer.Addr().String()
|
||||
defer tcpServer.Close()
|
||||
defer wg.Done()
|
||||
address := tcpServer.Addr().String()
|
||||
|
||||
accept := func() (net.Conn, error) {
|
||||
conn, err := tcpServer.Accept()
|
||||
|
|
@ -294,47 +249,196 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup, tlsConfig *tls.Config, address
|
|||
return nil
|
||||
}
|
||||
|
||||
fmt.Println("server: opening connection")
|
||||
conn, err := accept()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer tcpServer.Close()
|
||||
defer wg.Done()
|
||||
|
||||
// in TCP scenario only 3 messages are received, the 3rd is lost due to simulated connection break after the 2nd
|
||||
fmt.Println("server: opening connection")
|
||||
conn, err := accept()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
fmt.Println("server: receving packet 1")
|
||||
err = recv(conn)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
fmt.Println("server: receving packet 2")
|
||||
err = recv(conn)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
// in TCP scenario only 3 messages are received, the 3rd is lost due to simulated connection break after the 2nd
|
||||
|
||||
fmt.Println("server: closing connection")
|
||||
err = conn.Close()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
fmt.Println("server: receving packet 1")
|
||||
err = recv(conn)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
fmt.Println("server: receving packet 2")
|
||||
err = recv(conn)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
|
||||
errs <- err
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
fmt.Println("server: closing connection")
|
||||
err = conn.Close()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
|
||||
fmt.Println("server: re-opening connection")
|
||||
conn, err = accept()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
errs <- err
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("server: receving packet 4")
|
||||
err = recv(conn)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
fmt.Println("server: re-opening connection")
|
||||
conn, err = accept()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
fmt.Println("server: receving packet 4")
|
||||
err = recv(conn)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
return address
|
||||
}
|
||||
|
||||
func TestWriteUDPServerDown(t *testing.T) {
|
||||
dummy, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
plugin := Graylog{
|
||||
NameFieldNoPrefix: true,
|
||||
Servers: []string{"udp://" + dummy.LocalAddr().String()},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, dummy.Close())
|
||||
require.NoError(t, plugin.Connect())
|
||||
}
|
||||
|
||||
func TestWriteUDPServerUnavailableOnWrite(t *testing.T) {
|
||||
dummy, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
plugin := Graylog{
|
||||
NameFieldNoPrefix: true,
|
||||
Servers: []string{"udp://" + dummy.LocalAddr().String()},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Connect())
|
||||
require.NoError(t, dummy.Close())
|
||||
require.NoError(t, plugin.Write(testutil.MockMetrics()))
|
||||
}
|
||||
|
||||
func TestWriteTCPServerDown(t *testing.T) {
|
||||
dummy, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
plugin := Graylog{
|
||||
NameFieldNoPrefix: true,
|
||||
Servers: []string{"tcp://" + dummy.Addr().String()},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, dummy.Close())
|
||||
require.ErrorContains(t, plugin.Connect(), "connect: connection refused")
|
||||
}
|
||||
|
||||
func TestWriteTCPServerUnavailableOnWrite(t *testing.T) {
|
||||
dummy, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
plugin := Graylog{
|
||||
NameFieldNoPrefix: true,
|
||||
Servers: []string{"tcp://" + dummy.Addr().String()},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Connect())
|
||||
require.NoError(t, dummy.Close())
|
||||
err = plugin.Write(testutil.MockMetrics())
|
||||
require.ErrorContains(t, err, "error writing message")
|
||||
}
|
||||
|
||||
func TestWriteUDPServerDownRetry(t *testing.T) {
|
||||
dummy, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
plugin := Graylog{
|
||||
NameFieldNoPrefix: true,
|
||||
Servers: []string{"udp://" + dummy.LocalAddr().String()},
|
||||
Reconnection: true,
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, dummy.Close())
|
||||
require.NoError(t, plugin.Connect())
|
||||
require.NoError(t, plugin.Close())
|
||||
}
|
||||
|
||||
func TestWriteUDPServerUnavailableOnWriteRetry(t *testing.T) {
|
||||
dummy, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
plugin := Graylog{
|
||||
NameFieldNoPrefix: true,
|
||||
Servers: []string{"udp://" + dummy.LocalAddr().String()},
|
||||
Reconnection: true,
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Connect())
|
||||
require.NoError(t, dummy.Close())
|
||||
err = plugin.Write(testutil.MockMetrics())
|
||||
require.ErrorContains(t, err, "not connected")
|
||||
require.NoError(t, plugin.Close())
|
||||
}
|
||||
|
||||
func TestWriteTCPServerDownRetry(t *testing.T) {
|
||||
dummy, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
logger := &testutil.CaptureLogger{}
|
||||
plugin := Graylog{
|
||||
NameFieldNoPrefix: true,
|
||||
Servers: []string{"tcp://" + dummy.Addr().String()},
|
||||
Reconnection: true,
|
||||
ReconnectionTime: config.Duration(100 * time.Millisecond),
|
||||
Log: logger,
|
||||
}
|
||||
require.NoError(t, dummy.Close())
|
||||
require.NoError(t, plugin.Connect())
|
||||
require.Eventually(t, func() bool {
|
||||
return strings.Contains(logger.LastError(), "after attempt #5...")
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
require.NoError(t, plugin.Close())
|
||||
}
|
||||
|
||||
func TestWriteTCPServerUnavailableOnWriteRetry(t *testing.T) {
|
||||
dummy, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
plugin := Graylog{
|
||||
NameFieldNoPrefix: true,
|
||||
Servers: []string{"tcp://" + dummy.Addr().String()},
|
||||
Reconnection: true,
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Connect())
|
||||
require.NoError(t, dummy.Close())
|
||||
err = plugin.Write(testutil.MockMetrics())
|
||||
require.ErrorContains(t, err, "not connected")
|
||||
require.NoError(t, plugin.Close())
|
||||
}
|
||||
|
||||
func TestWriteTCPRetryStopping(t *testing.T) {
|
||||
dummy, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
logger := &testutil.CaptureLogger{}
|
||||
plugin := Graylog{
|
||||
NameFieldNoPrefix: true,
|
||||
Servers: []string{"tcp://" + dummy.Addr().String()},
|
||||
Reconnection: true,
|
||||
ReconnectionTime: config.Duration(10 * time.Millisecond),
|
||||
Log: logger,
|
||||
}
|
||||
require.NoError(t, dummy.Close())
|
||||
require.NoError(t, plugin.Connect())
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
require.NoError(t, plugin.Close())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,15 @@
|
|||
## Set to true for backward compatibility.
|
||||
# name_field_no_prefix = false
|
||||
|
||||
## Connection retry options
|
||||
## Attempt to connect to the enpoints if the initial connection fails.
|
||||
## If 'false', Telegraf will give up after 3 connection attempt and will
|
||||
## exit with an error. If set to 'true', the plugin will retry to connect
|
||||
## to the unconnected endpoints infinitely.
|
||||
# connection_retry = false
|
||||
## Time to wait between connection retry attempts.
|
||||
# connection_retry_wait_time = "15s"
|
||||
|
||||
## Optional TLS Config
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
|
|
|
|||
|
|
@ -1215,10 +1215,10 @@ func TestDBNotFoundShouldDropMetricWhenSkipDatabaseCreateIsTrue(t *testing.T) {
|
|||
err = output.Connect()
|
||||
require.NoError(t, err)
|
||||
err = output.Write(metrics)
|
||||
require.Contains(t, logger.LastError, "database not found")
|
||||
require.Contains(t, logger.LastError(), "database not found")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = output.Write(metrics)
|
||||
require.Contains(t, logger.LastError, "database not found")
|
||||
require.Contains(t, logger.LastError(), "database not found")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package testutil
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
|
@ -11,21 +12,26 @@ var _ telegraf.Logger = &CaptureLogger{}
|
|||
|
||||
// CaptureLogger defines a logging structure for plugins.
|
||||
type CaptureLogger struct {
|
||||
Name string // Name is the plugin name, will be printed in the `[]`.
|
||||
LastError string
|
||||
Name string // Name is the plugin name, will be printed in the `[]`.
|
||||
errors []string
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// Errorf logs an error message, patterned after log.Printf.
|
||||
func (l *CaptureLogger) Errorf(format string, args ...interface{}) {
|
||||
s := fmt.Sprintf("E! ["+l.Name+"] "+format, args...)
|
||||
l.LastError = s
|
||||
l.Lock()
|
||||
l.errors = append(l.errors, s)
|
||||
l.Unlock()
|
||||
log.Print(s)
|
||||
}
|
||||
|
||||
// Error logs an error message, patterned after log.Print.
|
||||
func (l *CaptureLogger) Error(args ...interface{}) {
|
||||
s := fmt.Sprint(append([]interface{}{"E! [" + l.Name + "] "}, args...)...)
|
||||
l.LastError = s
|
||||
l.Lock()
|
||||
l.errors = append(l.errors, s)
|
||||
l.Unlock()
|
||||
log.Print(s)
|
||||
}
|
||||
|
||||
|
|
@ -58,3 +64,19 @@ func (l *CaptureLogger) Infof(format string, args ...interface{}) {
|
|||
func (l *CaptureLogger) Info(args ...interface{}) {
|
||||
log.Print(append([]interface{}{"I! [" + l.Name + "] "}, args...)...)
|
||||
}
|
||||
|
||||
func (l *CaptureLogger) Errors() []string {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
e := append([]string{}, l.errors...)
|
||||
return e
|
||||
}
|
||||
|
||||
func (l *CaptureLogger) LastError() string {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
if len(l.errors) > 0 {
|
||||
return l.errors[len(l.errors)-1]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue