fix(outputs.graphite): Fix logic to reconnect with servers that were not up on agent startup (#13228) (#13239)
This commit is contained in:
parent
43048aad8c
commit
cf7ec5072d
|
|
@ -102,7 +102,9 @@ func (g *Graphite) Connect() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(g.failedServers) > 0 {
|
g.Log.Debugf("Successful connections: %d", len(conns))
|
||||||
|
if len(failedServers) > 0 {
|
||||||
|
g.Log.Debugf("Failed servers: %d", len(failedServers))
|
||||||
g.conns = append(g.conns, conns...)
|
g.conns = append(g.conns, conns...)
|
||||||
g.failedServers = failedServers
|
g.failedServers = failedServers
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,51 @@ func TestGraphiteError(t *testing.T) {
|
||||||
require.Equal(t, "could not write to any Graphite server in cluster", err2.Error())
|
require.Equal(t, "could not write to any Graphite server in cluster", err2.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGraphiteReconnect(t *testing.T) {
|
||||||
|
m := metric.New(
|
||||||
|
"mymeasurement",
|
||||||
|
map[string]string{
|
||||||
|
"host": "192.168.0.1",
|
||||||
|
"datacenter": "|us-west-2|",
|
||||||
|
},
|
||||||
|
map[string]interface{}{"myfield": float64(0.123)},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
|
||||||
|
g := Graphite{
|
||||||
|
Servers: []string{"localhost:12042"},
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
GraphiteStrictRegex: `[^a-zA-Z0-9-:._=|\p{L}]`,
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("Writing metric, without any server up, expected to fail")
|
||||||
|
require.NoError(t, g.Connect())
|
||||||
|
require.Error(t, g.Write([]telegraf.Metric{m}))
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
t.Log("Starting server")
|
||||||
|
tcpServer, err := net.Listen("tcp", "127.0.0.1:12042")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Log("Writing metric after server came up, we expect automatic reconnect on write without calling Connect() again")
|
||||||
|
require.NoError(t, g.Write([]telegraf.Metric{m}))
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
conn, _ := (tcpServer).Accept()
|
||||||
|
reader := bufio.NewReader(conn)
|
||||||
|
tp := textproto.NewReader(reader)
|
||||||
|
data1, _ := tp.ReadLine()
|
||||||
|
require.Equal(t, "192_168_0_1.|us-west-2|.mymeasurement.myfield 0.123 1289430000", data1)
|
||||||
|
require.NoError(t, conn.Close())
|
||||||
|
require.NoError(t, tcpServer.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
require.NoError(t, g.Close())
|
||||||
|
}
|
||||||
|
|
||||||
func TestGraphiteOK(t *testing.T) {
|
func TestGraphiteOK(t *testing.T) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
// Start TCP server
|
// Start TCP server
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue