feat(outputs.graphite): Retry connecting to servers with failed send attempts (#11439)

This commit is contained in:
Sebastian Spaink 2022-07-25 14:06:30 -05:00 committed by GitHub
parent a201ae4064
commit beb18d9389
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 31 deletions

View File

@ -5,9 +5,11 @@ import (
"crypto/tls" "crypto/tls"
_ "embed" _ "embed"
"errors" "errors"
"fmt"
"io" "io"
"math/rand" "math/rand"
"net" "net"
"strings"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -34,6 +36,7 @@ type Graphite struct {
conns []net.Conn conns []net.Conn
tlsint.ClientConfig tlsint.ClientConfig
failedServers []string
} }
func (*Graphite) SampleConfig() string { func (*Graphite) SampleConfig() string {
@ -55,9 +58,31 @@ func (g *Graphite) Connect() error {
return err return err
} }
// Only retry the failed servers
servers := g.Servers
if len(g.failedServers) > 0 {
servers = g.failedServers
// Remove failed server from exisiting connections
var workingConns []net.Conn
for _, conn := range g.conns {
var found bool
for _, server := range servers {
if conn.RemoteAddr().String() == server {
found = true
break
}
}
if !found {
workingConns = append(workingConns, conn)
}
}
g.conns = workingConns
}
// Get Connections // Get Connections
var conns []net.Conn var conns []net.Conn
for _, server := range g.Servers { var failedServers []string
for _, server := range servers {
// Dialer with timeout // Dialer with timeout
d := net.Dialer{Timeout: time.Duration(g.Timeout) * time.Second} d := net.Dialer{Timeout: time.Duration(g.Timeout) * time.Second}
@ -71,9 +96,19 @@ func (g *Graphite) Connect() error {
if err == nil { if err == nil {
conns = append(conns, conn) conns = append(conns, conn)
} else {
g.Log.Debugf("Failed to establish connection: %v", err)
failedServers = append(failedServers, server)
} }
} }
g.conns = conns
if len(g.failedServers) > 0 {
g.conns = append(g.conns, conns...)
g.failedServers = failedServers
} else {
g.conns = conns
}
return nil return nil
} }
@ -90,29 +125,35 @@ func (g *Graphite) Close() error {
// We can detect that by finding an eof // We can detect that by finding an eof
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!) // if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv via the authors of carbon-relay-ng` for this trick. // props to Tv via the authors of carbon-relay-ng` for this trick.
func (g *Graphite) checkEOF(conn net.Conn) { func (g *Graphite) checkEOF(conn net.Conn) error {
b := make([]byte, 1024) b := make([]byte, 1024)
if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil { if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil {
g.Log.Errorf("Couldn't set read deadline for connection %s. closing conn explicitly", conn) g.Log.Debugf("Couldn't set read deadline for connection due to error %v with remote address %s. closing conn explicitly", err, conn.RemoteAddr().String())
_ = conn.Close() err = conn.Close()
return g.Log.Debugf("Failed to close the connection: %v", err)
return err
} }
num, err := conn.Read(b) num, err := conn.Read(b)
if err == io.EOF { if err == io.EOF {
g.Log.Errorf("Conn %s is closed. closing conn explicitly", conn) g.Log.Debugf("Conn %s is closed. closing conn explicitly", conn.RemoteAddr().String())
_ = conn.Close() err = conn.Close()
return g.Log.Debugf("Failed to close the connection: %v", err)
return err
} }
// just in case i misunderstand something or the remote behaves badly // just in case i misunderstand something or the remote behaves badly
if num != 0 { if num != 0 {
g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num]) g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num])
} }
// Log non-timeout errors or close. // Log non-timeout errors and close.
if e, ok := err.(net.Error); !(ok && e.Timeout()) { if e, ok := err.(net.Error); !(ok && e.Timeout()) {
g.Log.Errorf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err) g.Log.Debugf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
_ = conn.Close() err = conn.Close()
g.Log.Debugf("Failed to close the connection: %v", err)
return err
} }
return nil
} }
// Choose a random server in the cluster to write to until a successful write // Choose a random server in the cluster to write to until a successful write
@ -135,10 +176,13 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
err = g.send(batch) err = g.send(batch)
// try to reconnect and retry to send // If a send failed for a server, try to reconnect to that server
if err != nil { if len(g.failedServers) > 0 {
g.Log.Error("Graphite: Reconnecting and retrying...") g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(g.failedServers, ","))
_ = g.Connect() err = g.Connect()
if err != nil {
return fmt.Errorf("Failed to reconnect: %v", err)
}
err = g.send(batch) err = g.send(batch)
} }
@ -147,28 +191,40 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
func (g *Graphite) send(batch []byte) error { func (g *Graphite) send(batch []byte) error {
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
err := errors.New("could not write to any Graphite server in cluster") globalErr := errors.New("could not write to any Graphite server in cluster")
// Send data to a random server // Send data to a random server
p := rand.Perm(len(g.conns)) p := rand.Perm(len(g.conns))
for _, n := range p { for _, n := range p {
if g.Timeout > 0 { if g.Timeout > 0 {
_ = g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) err := g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
if err != nil {
g.Log.Errorf("failed to set write deadline for %s: %v", g.conns[n].RemoteAddr().String(), err)
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
}
}
err := g.checkEOF(g.conns[n])
if err != nil {
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
break
} }
g.checkEOF(g.conns[n])
if _, e := g.conns[n].Write(batch); e != nil { if _, e := g.conns[n].Write(batch); e != nil {
// Error // Error
g.Log.Errorf("Graphite Error: " + e.Error()) g.Log.Debugf("Graphite Error: " + e.Error())
// Close explicitly and let's try the next one // Close explicitly and let's try the next one
_ = g.conns[n].Close() err := g.conns[n].Close()
g.Log.Debugf("Failed to close the connection: %v", err)
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
} else { } else {
// Success globalErr = nil
err = nil
break break
} }
} }
return err return globalErr
} }
func init() { func init() {

View File

@ -98,7 +98,8 @@ func TestGraphiteOK(t *testing.T) {
require.NoError(t, err3) require.NoError(t, err3)
t.Log("Finished writing third data") t.Log("Finished writing third data")
wg2.Wait() wg2.Wait()
g.Close() err := g.Close()
require.NoError(t, err)
} }
func TestGraphiteOkWithSeparatorDot(t *testing.T) { func TestGraphiteOkWithSeparatorDot(t *testing.T) {
@ -160,7 +161,8 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) {
require.NoError(t, err3) require.NoError(t, err3)
t.Log("Finished writing third data") t.Log("Finished writing third data")
wg2.Wait() wg2.Wait()
g.Close() err := g.Close()
require.NoError(t, err)
} }
func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) { func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
@ -222,7 +224,8 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
require.NoError(t, err3) require.NoError(t, err3)
t.Log("Finished writing third data") t.Log("Finished writing third data")
wg2.Wait() wg2.Wait()
g.Close() err := g.Close()
require.NoError(t, err)
} }
func TestGraphiteOKWithMultipleTemplates(t *testing.T) { func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
@ -288,7 +291,8 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
require.NoError(t, err3) require.NoError(t, err3)
t.Log("Finished writing third data") t.Log("Finished writing third data")
wg2.Wait() wg2.Wait()
g.Close() err := g.Close()
require.NoError(t, err)
} }
func TestGraphiteOkWithTags(t *testing.T) { func TestGraphiteOkWithTags(t *testing.T) {
@ -350,7 +354,8 @@ func TestGraphiteOkWithTags(t *testing.T) {
require.NoError(t, err3) require.NoError(t, err3)
t.Log("Finished writing third data") t.Log("Finished writing third data")
wg2.Wait() wg2.Wait()
g.Close() err := g.Close()
require.NoError(t, err)
} }
func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) { func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
@ -413,7 +418,8 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
require.NoError(t, err3) require.NoError(t, err3)
t.Log("Finished writing third data") t.Log("Finished writing third data")
wg2.Wait() wg2.Wait()
g.Close() err := g.Close()
require.NoError(t, err)
} }
func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) { func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
@ -476,7 +482,8 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
require.NoError(t, err3) require.NoError(t, err3)
t.Log("Finished writing third data") t.Log("Finished writing third data")
wg2.Wait() wg2.Wait()
g.Close() err := g.Close()
require.NoError(t, err)
} }
func TCPServer1(t *testing.T, wg *sync.WaitGroup) { func TCPServer1(t *testing.T, wg *sync.WaitGroup) {