diff --git a/plugins/outputs/graphite/README.md b/plugins/outputs/graphite/README.md index 45facb56f..7bb3b1189 100644 --- a/plugins/outputs/graphite/README.md +++ b/plugins/outputs/graphite/README.md @@ -62,7 +62,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. #] ## timeout in seconds for the write connection to graphite - timeout = 2 + # timeout = "2s" ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index 4a6f80898..975c817a6 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -13,6 +13,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers/graphite" @@ -21,6 +22,14 @@ import ( //go:embed sample.conf var sampleConfig string +var ErrNotConnected = errors.New("could not write to any server in cluster") + +type connection struct { + name string + conn net.Conn + connected bool +} + type Graphite struct { GraphiteTagSupport bool `toml:"graphite_tag_support"` GraphiteTagSanitizeMode string `toml:"graphite_tag_sanitize_mode"` @@ -31,14 +40,12 @@ type Graphite struct { Prefix string `toml:"prefix"` Template string `toml:"template"` Templates []string `toml:"templates"` - Timeout int `toml:"timeout"` + Timeout config.Duration `toml:"timeout"` Log telegraf.Logger `toml:"-"` - - conns []net.Conn tlsint.ClientConfig - failedServers []string - serializer *graphite.GraphiteSerializer + connections []connection + serializer *graphite.GraphiteSerializer } func (*Graphite) SampleConfig() string { @@ -60,75 +67,67 @@ func (g *Graphite) Init() error { } g.serializer = s - return nil -} - -func (g *Graphite) Connect() error { // Set default values - if g.Timeout <= 0 { - g.Timeout = 2 - } if len(g.Servers) == 0 { g.Servers = append(g.Servers, "localhost:2003") } + // Fill in the connections from the server + g.connections = make([]connection, 0, len(g.Servers)) + for _, server := range g.Servers { + g.connections = append(g.connections, connection{ + name: server, + connected: false, + }) + } + + return nil +} + +func (g *Graphite) Connect() error { // Set tls config tlsConfig, err := g.ClientConfig.TLSConfig() if err != nil { return err } - // Only retry the failed servers - servers := g.Servers - if len(g.failedServers) > 0 { - servers = g.failedServers - // Remove failed server from exisiting connections - var workingConns []net.Conn - for _, conn := range g.conns { - var found bool - for _, server := range servers { - if conn.RemoteAddr().String() == server { - found = true - break - } - } - if !found { - workingConns = append(workingConns, conn) - } - } - g.conns = workingConns - } - - // Get Connections - var conns []net.Conn + // Find all non-connected servers and try to reconnect + var newConnection bool + var connectedServers int var failedServers []string - for _, server := range servers { + for i, server := range g.connections { + if server.connected { + connectedServers++ + continue + } + newConnection = true + // Dialer with timeout - d := net.Dialer{Timeout: time.Duration(g.Timeout) * time.Second} + d := net.Dialer{Timeout: time.Duration(g.Timeout)} // Get secure connection if tls config is set var conn net.Conn if tlsConfig != nil { - conn, err = tls.DialWithDialer(&d, "tcp", server, tlsConfig) + conn, err = tls.DialWithDialer(&d, "tcp", server.name, tlsConfig) } else { - conn, err = d.Dial("tcp", server) + conn, err = d.Dial("tcp", server.name) } if err == nil { - conns = append(conns, conn) + g.connections[i].conn = conn + g.connections[i].connected = true + connectedServers++ } else { g.Log.Debugf("Failed to establish connection: %v", err) - failedServers = append(failedServers, server) + failedServers = append(failedServers, server.name) } } - g.Log.Debugf("Successful connections: %d", len(conns)) + if newConnection { + g.Log.Debugf("Successful connections: %d of %d", connectedServers, len(g.connections)) + } if len(failedServers) > 0 { g.Log.Debugf("Failed servers: %d", len(failedServers)) - g.conns = append(g.conns, conns...) - g.failedServers = failedServers - } else { - g.conns = conns } return nil @@ -136,8 +135,9 @@ func (g *Graphite) Connect() error { func (g *Graphite) Close() error { // Closing all connections - for _, conn := range g.conns { - _ = conn.Close() + for _, c := range g.connections { + _ = c.conn.Close() + c.connected = false } return nil } @@ -196,61 +196,84 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { batch = append(batch, buf...) } - err := g.send(batch) - - // If a send failed for a server, try to reconnect to that server - if len(g.failedServers) > 0 { - g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(g.failedServers, ",")) - err = g.Connect() - if err != nil { - return fmt.Errorf("failed to reconnect: %w", err) - } - err = g.send(batch) + // Try to connect to all servers not yet connected if any + if err := g.Connect(); err != nil { + return fmt.Errorf("failed to reconnect: %w", err) } - return err + // Return on success of if we encounter a non-retryable error + if err := g.send(batch); err == nil || !errors.Is(err, ErrNotConnected) { + return err + } + + // Try to reconnect and resend + failedServers := make([]string, 0, len(g.connections)) + for _, c := range g.connections { + if !c.connected { + failedServers = append(failedServers, c.name) + } + } + if len(failedServers) > 0 { + g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(failedServers, ",")) + if err := g.Connect(); err != nil { + return fmt.Errorf("failed to reconnect: %w", err) + } + } + + return g.send(batch) } func (g *Graphite) send(batch []byte) error { - // This will get set to nil if a successful write occurs - globalErr := errors.New("could not write to any Graphite server in cluster") + // Try sending the data to a server. Try them in random order + p := rand.Perm(len(g.connections)) + for i, n := range p { + server := g.connections[n] + + // Skip unconnected servers + if !server.connected { + continue + } - // Send data to a random server - p := rand.Perm(len(g.conns)) - for _, n := range p { if g.Timeout > 0 { - err := g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) - if err != nil { - g.Log.Errorf("failed to set write deadline for %s: %v", g.conns[n].RemoteAddr().String(), err) - // Mark server as failed so a new connection will be made - g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String()) + deadline := time.Now().Add(time.Duration(g.Timeout)) + if err := server.conn.SetWriteDeadline(deadline); err != nil { + g.Log.Warnf("failed to set write deadline for %q: %v", server.name, err) + g.connections[n].connected = false + continue } } - err := g.checkEOF(g.conns[n]) - if err != nil { + + // Check the connection state + if err := g.checkEOF(server.conn); err != nil { // Mark server as failed so a new connection will be made - g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String()) - break + g.connections[n].connected = false + continue } - _, e := g.conns[n].Write(batch) - if e == nil { - globalErr = nil - break + _, err := server.conn.Write(batch) + if err == nil { + // Sending the data was successfully + return nil + } + + g.Log.Errorf("Writing to %q failed: %v", server.name, err) + if i < len(p)-1 { + g.Log.Info("Trying next server...") } - // Error - g.Log.Debugf("Graphite Error: " + e.Error()) - // Close explicitly and let's try the next one - err = g.conns[n].Close() - g.Log.Debugf("Failed to close the connection: %v", err) // Mark server as failed so a new connection will be made - g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String()) + if server.conn != nil { + if err := server.conn.Close(); err != nil { + g.Log.Debugf("Failed to close connection to %q: %v", server.name, err) + } + } + g.connections[n].connected = false } - return globalErr + // If we end here, none of the writes were successful + return ErrNotConnected } func init() { outputs.Add("graphite", func() telegraf.Output { - return &Graphite{} + return &Graphite{Timeout: config.Duration(2 * time.Second)} }) } diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index 7f3333b00..eff673789 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -2,15 +2,21 @@ package graphite import ( "bufio" + "encoding/json" + "fmt" + "io" "net" + "net/http" "net/textproto" "sync" "testing" "time" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) @@ -32,14 +38,12 @@ func TestGraphiteError(t *testing.T) { time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) // Prepare point list - var metrics []telegraf.Metric - metrics = append(metrics, m1) - // Error - err1 := g.Connect() - require.NoError(t, err1) - err2 := g.Write(metrics) - require.Error(t, err2) - require.Equal(t, "could not write to any Graphite server in cluster", err2.Error()) + metrics := []telegraf.Metric{m1} + + require.NoError(t, g.Connect()) + err := g.Write(metrics) + require.Error(t, err) + require.ErrorIs(t, err, ErrNotConnected) } func TestGraphiteReconnect(t *testing.T) { @@ -581,6 +585,95 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) { require.NoError(t, err) } +func TestIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + container := testutil.Container{ + Image: "graphiteapp/graphite-statsd", + ExposedPorts: []string{"8080", "2003", "2004"}, + WaitingFor: wait.ForAll( + wait.ForListeningPort("8080"), + wait.ForListeningPort("2003"), + wait.ForListeningPort("2004"), + wait.ForLog("run: statsd:"), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // Init plugin + plugin := Graphite{ + Servers: []string{container.Address + ":" + container.Ports["2003"]}, + Template: "measurement.tags.field", + Timeout: config.Duration(2 * time.Second), + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + metrics := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"source": "foo"}, + map[string]interface{}{"value": 42.0}, + time.Now(), + ), + metric.New( + "test", + map[string]string{"source": "bar"}, + map[string]interface{}{"value": 23.0}, + time.Now(), + ), + } + + // Verify that we can successfully write data + require.NoError(t, plugin.Write(metrics)) + + // Wait for the data to settle and check if we got the metrics + url := fmt.Sprintf("http://%s:%s/metrics/index.json", container.Address, container.Ports["8080"]) + require.Eventually(t, func() bool { + var actual []string + if err := query(url, &actual); err != nil { + t.Logf("encountered error %v", err) + return false + } + var foundFoo, foundBar bool + for _, m := range actual { + switch m { + case "test.bar": + foundBar = true + case "test.foo": + foundFoo = true + default: + continue + } + if foundBar && foundFoo { + return true + } + } + return false + }, 10*time.Second, 100*time.Millisecond) +} + +func query(url string, data interface{}) error { + //nolint:gosec // Parameters are fixed in the above call + resp, err := http.Get(url) + if err != nil { + fmt.Println("response:", resp) + return err + } + raw, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("raw:", string(raw)) + return err + } + resp.Body.Close() + + return json.Unmarshal(raw, &data) +} + func TCPServer1(t *testing.T, wg *sync.WaitGroup) { tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") require.NoError(t, err) diff --git a/plugins/outputs/graphite/sample.conf b/plugins/outputs/graphite/sample.conf index ebde1073f..01f3a4b4e 100644 --- a/plugins/outputs/graphite/sample.conf +++ b/plugins/outputs/graphite/sample.conf @@ -40,7 +40,7 @@ #] ## timeout in seconds for the write connection to graphite - timeout = 2 + # timeout = "2s" ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem"