diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index 6d4048aee..ba914ef47 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -5,9 +5,11 @@ import ( "crypto/tls" _ "embed" "errors" + "fmt" "io" "math/rand" "net" + "strings" "time" "github.com/influxdata/telegraf" @@ -34,6 +36,7 @@ type Graphite struct { conns []net.Conn tlsint.ClientConfig + failedServers []string } func (*Graphite) SampleConfig() string { @@ -55,9 +58,31 @@ func (g *Graphite) Connect() error { return err } + // Only retry the failed servers + servers := g.Servers + if len(g.failedServers) > 0 { + servers = g.failedServers + // Remove failed server from exisiting connections + var workingConns []net.Conn + for _, conn := range g.conns { + var found bool + for _, server := range servers { + if conn.RemoteAddr().String() == server { + found = true + break + } + } + if !found { + workingConns = append(workingConns, conn) + } + } + g.conns = workingConns + } + // Get Connections var conns []net.Conn - for _, server := range g.Servers { + var failedServers []string + for _, server := range servers { // Dialer with timeout d := net.Dialer{Timeout: time.Duration(g.Timeout) * time.Second} @@ -71,9 +96,19 @@ func (g *Graphite) Connect() error { if err == nil { conns = append(conns, conn) + } else { + g.Log.Debugf("Failed to establish connection: %v", err) + failedServers = append(failedServers, server) } } - g.conns = conns + + if len(g.failedServers) > 0 { + g.conns = append(g.conns, conns...) + g.failedServers = failedServers + } else { + g.conns = conns + } + return nil } @@ -90,29 +125,35 @@ func (g *Graphite) Close() error { // We can detect that by finding an eof // if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!) // props to Tv via the authors of carbon-relay-ng` for this trick. -func (g *Graphite) checkEOF(conn net.Conn) { +func (g *Graphite) checkEOF(conn net.Conn) error { b := make([]byte, 1024) if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil { - g.Log.Errorf("Couldn't set read deadline for connection %s. closing conn explicitly", conn) - _ = conn.Close() - return + g.Log.Debugf("Couldn't set read deadline for connection due to error %v with remote address %s. closing conn explicitly", err, conn.RemoteAddr().String()) + err = conn.Close() + g.Log.Debugf("Failed to close the connection: %v", err) + return err } num, err := conn.Read(b) if err == io.EOF { - g.Log.Errorf("Conn %s is closed. closing conn explicitly", conn) - _ = conn.Close() - return + g.Log.Debugf("Conn %s is closed. closing conn explicitly", conn.RemoteAddr().String()) + err = conn.Close() + g.Log.Debugf("Failed to close the connection: %v", err) + return err } // just in case i misunderstand something or the remote behaves badly if num != 0 { g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num]) } - // Log non-timeout errors or close. + // Log non-timeout errors and close. if e, ok := err.(net.Error); !(ok && e.Timeout()) { - g.Log.Errorf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err) - _ = conn.Close() + g.Log.Debugf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err) + err = conn.Close() + g.Log.Debugf("Failed to close the connection: %v", err) + return err } + + return nil } // Choose a random server in the cluster to write to until a successful write @@ -135,10 +176,13 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { err = g.send(batch) - // try to reconnect and retry to send - if err != nil { - g.Log.Error("Graphite: Reconnecting and retrying...") - _ = g.Connect() + // If a send failed for a server, try to reconnect to that server + if len(g.failedServers) > 0 { + g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(g.failedServers, ",")) + err = g.Connect() + if err != nil { + return fmt.Errorf("Failed to reconnect: %v", err) + } err = g.send(batch) } @@ -147,28 +191,40 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { func (g *Graphite) send(batch []byte) error { // This will get set to nil if a successful write occurs - err := errors.New("could not write to any Graphite server in cluster") + globalErr := errors.New("could not write to any Graphite server in cluster") // Send data to a random server p := rand.Perm(len(g.conns)) for _, n := range p { if g.Timeout > 0 { - _ = g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) + err := g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) + if err != nil { + g.Log.Errorf("failed to set write deadline for %s: %v", g.conns[n].RemoteAddr().String(), err) + // Mark server as failed so a new connection will be made + g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String()) + } + } + err := g.checkEOF(g.conns[n]) + if err != nil { + // Mark server as failed so a new connection will be made + g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String()) + break } - g.checkEOF(g.conns[n]) if _, e := g.conns[n].Write(batch); e != nil { // Error - g.Log.Errorf("Graphite Error: " + e.Error()) + g.Log.Debugf("Graphite Error: " + e.Error()) // Close explicitly and let's try the next one - _ = g.conns[n].Close() + err := g.conns[n].Close() + g.Log.Debugf("Failed to close the connection: %v", err) + // Mark server as failed so a new connection will be made + g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String()) } else { - // Success - err = nil + globalErr = nil break } } - return err + return globalErr } func init() { diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index d1bde0f48..d686b4d52 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -98,7 +98,8 @@ func TestGraphiteOK(t *testing.T) { require.NoError(t, err3) t.Log("Finished writing third data") wg2.Wait() - g.Close() + err := g.Close() + require.NoError(t, err) } func TestGraphiteOkWithSeparatorDot(t *testing.T) { @@ -160,7 +161,8 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) { require.NoError(t, err3) t.Log("Finished writing third data") wg2.Wait() - g.Close() + err := g.Close() + require.NoError(t, err) } func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) { @@ -222,7 +224,8 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) { require.NoError(t, err3) t.Log("Finished writing third data") wg2.Wait() - g.Close() + err := g.Close() + require.NoError(t, err) } func TestGraphiteOKWithMultipleTemplates(t *testing.T) { @@ -288,7 +291,8 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) { require.NoError(t, err3) t.Log("Finished writing third data") wg2.Wait() - g.Close() + err := g.Close() + require.NoError(t, err) } func TestGraphiteOkWithTags(t *testing.T) { @@ -350,7 +354,8 @@ func TestGraphiteOkWithTags(t *testing.T) { require.NoError(t, err3) t.Log("Finished writing third data") wg2.Wait() - g.Close() + err := g.Close() + require.NoError(t, err) } func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) { @@ -413,7 +418,8 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) { require.NoError(t, err3) t.Log("Finished writing third data") wg2.Wait() - g.Close() + err := g.Close() + require.NoError(t, err) } func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) { @@ -476,7 +482,8 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) { require.NoError(t, err3) t.Log("Finished writing third data") wg2.Wait() - g.Close() + err := g.Close() + require.NoError(t, err) } func TCPServer1(t *testing.T, wg *sync.WaitGroup) {