fix(outputs.graphite): Rework connection handling (#13527)

This commit is contained in:
Sven Rebhan 2023-07-05 21:15:58 +02:00 committed by GitHub
parent 762a1989ff
commit 533ede7a32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 211 additions and 95 deletions

View File

@ -62,7 +62,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
#] #]
## timeout in seconds for the write connection to graphite ## timeout in seconds for the write connection to graphite
timeout = 2 # timeout = "2s"
## Optional TLS Config ## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem" # tls_ca = "/etc/telegraf/ca.pem"

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
tlsint "github.com/influxdata/telegraf/plugins/common/tls" tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/graphite"
@ -21,6 +22,14 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
var ErrNotConnected = errors.New("could not write to any server in cluster")
type connection struct {
name string
conn net.Conn
connected bool
}
type Graphite struct { type Graphite struct {
GraphiteTagSupport bool `toml:"graphite_tag_support"` GraphiteTagSupport bool `toml:"graphite_tag_support"`
GraphiteTagSanitizeMode string `toml:"graphite_tag_sanitize_mode"` GraphiteTagSanitizeMode string `toml:"graphite_tag_sanitize_mode"`
@ -31,14 +40,12 @@ type Graphite struct {
Prefix string `toml:"prefix"` Prefix string `toml:"prefix"`
Template string `toml:"template"` Template string `toml:"template"`
Templates []string `toml:"templates"` Templates []string `toml:"templates"`
Timeout int `toml:"timeout"` Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
conns []net.Conn
tlsint.ClientConfig tlsint.ClientConfig
failedServers []string
serializer *graphite.GraphiteSerializer connections []connection
serializer *graphite.GraphiteSerializer
} }
func (*Graphite) SampleConfig() string { func (*Graphite) SampleConfig() string {
@ -60,75 +67,67 @@ func (g *Graphite) Init() error {
} }
g.serializer = s g.serializer = s
return nil
}
func (g *Graphite) Connect() error {
// Set default values // Set default values
if g.Timeout <= 0 {
g.Timeout = 2
}
if len(g.Servers) == 0 { if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:2003") g.Servers = append(g.Servers, "localhost:2003")
} }
// Fill in the connections from the server
g.connections = make([]connection, 0, len(g.Servers))
for _, server := range g.Servers {
g.connections = append(g.connections, connection{
name: server,
connected: false,
})
}
return nil
}
func (g *Graphite) Connect() error {
// Set tls config // Set tls config
tlsConfig, err := g.ClientConfig.TLSConfig() tlsConfig, err := g.ClientConfig.TLSConfig()
if err != nil { if err != nil {
return err return err
} }
// Only retry the failed servers // Find all non-connected servers and try to reconnect
servers := g.Servers var newConnection bool
if len(g.failedServers) > 0 { var connectedServers int
servers = g.failedServers
// Remove failed server from exisiting connections
var workingConns []net.Conn
for _, conn := range g.conns {
var found bool
for _, server := range servers {
if conn.RemoteAddr().String() == server {
found = true
break
}
}
if !found {
workingConns = append(workingConns, conn)
}
}
g.conns = workingConns
}
// Get Connections
var conns []net.Conn
var failedServers []string var failedServers []string
for _, server := range servers { for i, server := range g.connections {
if server.connected {
connectedServers++
continue
}
newConnection = true
// Dialer with timeout // Dialer with timeout
d := net.Dialer{Timeout: time.Duration(g.Timeout) * time.Second} d := net.Dialer{Timeout: time.Duration(g.Timeout)}
// Get secure connection if tls config is set // Get secure connection if tls config is set
var conn net.Conn var conn net.Conn
if tlsConfig != nil { if tlsConfig != nil {
conn, err = tls.DialWithDialer(&d, "tcp", server, tlsConfig) conn, err = tls.DialWithDialer(&d, "tcp", server.name, tlsConfig)
} else { } else {
conn, err = d.Dial("tcp", server) conn, err = d.Dial("tcp", server.name)
} }
if err == nil { if err == nil {
conns = append(conns, conn) g.connections[i].conn = conn
g.connections[i].connected = true
connectedServers++
} else { } else {
g.Log.Debugf("Failed to establish connection: %v", err) g.Log.Debugf("Failed to establish connection: %v", err)
failedServers = append(failedServers, server) failedServers = append(failedServers, server.name)
} }
} }
g.Log.Debugf("Successful connections: %d", len(conns)) if newConnection {
g.Log.Debugf("Successful connections: %d of %d", connectedServers, len(g.connections))
}
if len(failedServers) > 0 { if len(failedServers) > 0 {
g.Log.Debugf("Failed servers: %d", len(failedServers)) g.Log.Debugf("Failed servers: %d", len(failedServers))
g.conns = append(g.conns, conns...)
g.failedServers = failedServers
} else {
g.conns = conns
} }
return nil return nil
@ -136,8 +135,9 @@ func (g *Graphite) Connect() error {
func (g *Graphite) Close() error { func (g *Graphite) Close() error {
// Closing all connections // Closing all connections
for _, conn := range g.conns { for _, c := range g.connections {
_ = conn.Close() _ = c.conn.Close()
c.connected = false
} }
return nil return nil
} }
@ -196,61 +196,84 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
batch = append(batch, buf...) batch = append(batch, buf...)
} }
err := g.send(batch) // Try to connect to all servers not yet connected if any
if err := g.Connect(); err != nil {
// If a send failed for a server, try to reconnect to that server return fmt.Errorf("failed to reconnect: %w", err)
if len(g.failedServers) > 0 {
g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(g.failedServers, ","))
err = g.Connect()
if err != nil {
return fmt.Errorf("failed to reconnect: %w", err)
}
err = g.send(batch)
} }
return err // Return on success of if we encounter a non-retryable error
if err := g.send(batch); err == nil || !errors.Is(err, ErrNotConnected) {
return err
}
// Try to reconnect and resend
failedServers := make([]string, 0, len(g.connections))
for _, c := range g.connections {
if !c.connected {
failedServers = append(failedServers, c.name)
}
}
if len(failedServers) > 0 {
g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(failedServers, ","))
if err := g.Connect(); err != nil {
return fmt.Errorf("failed to reconnect: %w", err)
}
}
return g.send(batch)
} }
func (g *Graphite) send(batch []byte) error { func (g *Graphite) send(batch []byte) error {
// This will get set to nil if a successful write occurs // Try sending the data to a server. Try them in random order
globalErr := errors.New("could not write to any Graphite server in cluster") p := rand.Perm(len(g.connections))
for i, n := range p {
server := g.connections[n]
// Skip unconnected servers
if !server.connected {
continue
}
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if g.Timeout > 0 { if g.Timeout > 0 {
err := g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) deadline := time.Now().Add(time.Duration(g.Timeout))
if err != nil { if err := server.conn.SetWriteDeadline(deadline); err != nil {
g.Log.Errorf("failed to set write deadline for %s: %v", g.conns[n].RemoteAddr().String(), err) g.Log.Warnf("failed to set write deadline for %q: %v", server.name, err)
// Mark server as failed so a new connection will be made g.connections[n].connected = false
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String()) continue
} }
} }
err := g.checkEOF(g.conns[n])
if err != nil { // Check the connection state
if err := g.checkEOF(server.conn); err != nil {
// Mark server as failed so a new connection will be made // Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String()) g.connections[n].connected = false
break continue
} }
_, e := g.conns[n].Write(batch) _, err := server.conn.Write(batch)
if e == nil { if err == nil {
globalErr = nil // Sending the data was successfully
break return nil
}
g.Log.Errorf("Writing to %q failed: %v", server.name, err)
if i < len(p)-1 {
g.Log.Info("Trying next server...")
} }
// Error
g.Log.Debugf("Graphite Error: " + e.Error())
// Close explicitly and let's try the next one
err = g.conns[n].Close()
g.Log.Debugf("Failed to close the connection: %v", err)
// Mark server as failed so a new connection will be made // Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String()) if server.conn != nil {
if err := server.conn.Close(); err != nil {
g.Log.Debugf("Failed to close connection to %q: %v", server.name, err)
}
}
g.connections[n].connected = false
} }
return globalErr // If we end here, none of the writes were successful
return ErrNotConnected
} }
func init() { func init() {
outputs.Add("graphite", func() telegraf.Output { outputs.Add("graphite", func() telegraf.Output {
return &Graphite{} return &Graphite{Timeout: config.Duration(2 * time.Second)}
}) })
} }

View File

@ -2,15 +2,21 @@ package graphite
import ( import (
"bufio" "bufio"
"encoding/json"
"fmt"
"io"
"net" "net"
"net/http"
"net/textproto" "net/textproto"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -32,14 +38,12 @@ func TestGraphiteError(t *testing.T) {
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
// Prepare point list // Prepare point list
var metrics []telegraf.Metric metrics := []telegraf.Metric{m1}
metrics = append(metrics, m1)
// Error require.NoError(t, g.Connect())
err1 := g.Connect() err := g.Write(metrics)
require.NoError(t, err1) require.Error(t, err)
err2 := g.Write(metrics) require.ErrorIs(t, err, ErrNotConnected)
require.Error(t, err2)
require.Equal(t, "could not write to any Graphite server in cluster", err2.Error())
} }
func TestGraphiteReconnect(t *testing.T) { func TestGraphiteReconnect(t *testing.T) {
@ -581,6 +585,95 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func TestIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := testutil.Container{
Image: "graphiteapp/graphite-statsd",
ExposedPorts: []string{"8080", "2003", "2004"},
WaitingFor: wait.ForAll(
wait.ForListeningPort("8080"),
wait.ForListeningPort("2003"),
wait.ForListeningPort("2004"),
wait.ForLog("run: statsd:"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Init plugin
plugin := Graphite{
Servers: []string{container.Address + ":" + container.Ports["2003"]},
Template: "measurement.tags.field",
Timeout: config.Duration(2 * time.Second),
Log: testutil.Logger{},
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()
metrics := []telegraf.Metric{
metric.New(
"test",
map[string]string{"source": "foo"},
map[string]interface{}{"value": 42.0},
time.Now(),
),
metric.New(
"test",
map[string]string{"source": "bar"},
map[string]interface{}{"value": 23.0},
time.Now(),
),
}
// Verify that we can successfully write data
require.NoError(t, plugin.Write(metrics))
// Wait for the data to settle and check if we got the metrics
url := fmt.Sprintf("http://%s:%s/metrics/index.json", container.Address, container.Ports["8080"])
require.Eventually(t, func() bool {
var actual []string
if err := query(url, &actual); err != nil {
t.Logf("encountered error %v", err)
return false
}
var foundFoo, foundBar bool
for _, m := range actual {
switch m {
case "test.bar":
foundBar = true
case "test.foo":
foundFoo = true
default:
continue
}
if foundBar && foundFoo {
return true
}
}
return false
}, 10*time.Second, 100*time.Millisecond)
}
func query(url string, data interface{}) error {
//nolint:gosec // Parameters are fixed in the above call
resp, err := http.Get(url)
if err != nil {
fmt.Println("response:", resp)
return err
}
raw, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("raw:", string(raw))
return err
}
resp.Body.Close()
return json.Unmarshal(raw, &data)
}
func TCPServer1(t *testing.T, wg *sync.WaitGroup) { func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") tcpServer, err := net.Listen("tcp", "127.0.0.1:12003")
require.NoError(t, err) require.NoError(t, err)

View File

@ -40,7 +40,7 @@
#] #]
## timeout in seconds for the write connection to graphite ## timeout in seconds for the write connection to graphite
timeout = 2 # timeout = "2s"
## Optional TLS Config ## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem" # tls_ca = "/etc/telegraf/ca.pem"