From f098e5f9f60607917877697f6727133d070c062c Mon Sep 17 00:00:00 2001 From: Patrick Hemmer Date: Wed, 17 May 2023 14:34:53 -0400 Subject: [PATCH] fix(inputs.socket_listener): Fix loss of connection tracking (#13056) --- .../socket_listener/socket_listener_test.go | 7 ++-- .../inputs/socket_listener/stream_listener.go | 35 +++++++++---------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index 11b7b80ac..0dcd4448d 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -16,7 +16,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -184,6 +183,10 @@ func TestSocketListener(t *testing.T) { actual := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) + if sl, ok := plugin.listener.(*streamListener); ok { + require.NotEmpty(t, sl.connections) + } + plugin.Stop() if _, ok := plugin.listener.(*streamListener); ok { @@ -191,7 +194,7 @@ func TestSocketListener(t *testing.T) { _ = client.SetReadDeadline(time.Now().Add(time.Second)) buf := []byte{1} _, err = client.Read(buf) - assert.Equal(t, err, io.EOF) + require.Equal(t, err, io.EOF) } }) } diff --git a/plugins/inputs/socket_listener/stream_listener.go b/plugins/inputs/socket_listener/stream_listener.go index aeb63fcd6..0a77c9429 100644 --- a/plugins/inputs/socket_listener/stream_listener.go +++ b/plugins/inputs/socket_listener/stream_listener.go @@ -86,6 +86,17 @@ func (l *streamListener) setupConnection(conn net.Conn) error { conn = c.NetConn() } + addr := conn.RemoteAddr().String() + l.Lock() + if l.MaxConnections > 0 && len(l.connections) >= l.MaxConnections { + l.Unlock() + // Ignore the returned error as we cannot do anything about it anyway + _ = conn.Close() + return fmt.Errorf("unable to accept connection from %q: too many connections", addr) + } + l.connections[conn] = struct{}{} + l.Unlock() + if l.ReadBufferSize > 0 { if rb, ok := conn.(hasSetReadBuffer); ok { if err := rb.SetReadBuffer(l.ReadBufferSize); err != nil { @@ -96,47 +107,34 @@ func (l *streamListener) setupConnection(conn net.Conn) error { } } - addr := conn.RemoteAddr().String() - if l.MaxConnections > 0 && len(l.connections) >= l.MaxConnections { - // Ignore the returned error as we cannot do anything about it anyway - _ = conn.Close() - l.Log.Infof("unable to accept connection from %q: too many connections", addr) - return nil - } - // Set keep alive handlings if l.KeepAlivePeriod != nil { tcpConn, ok := conn.(*net.TCPConn) if !ok { - return fmt.Errorf("cannot set keep-alive: not a TCP connection (%T)", conn) + l.Log.Warnf("connection not a TCP connection (%T)", conn) } if *l.KeepAlivePeriod == 0 { if err := tcpConn.SetKeepAlive(false); err != nil { - return fmt.Errorf("cannot set keep-alive: %w", err) + l.Log.Warnf("Cannot set keep-alive: %w", err) } } else { if err := tcpConn.SetKeepAlive(true); err != nil { - return fmt.Errorf("cannot set keep-alive: %w", err) + l.Log.Warnf("Cannot set keep-alive: %w", err) } err := tcpConn.SetKeepAlivePeriod(time.Duration(*l.KeepAlivePeriod)) if err != nil { - return fmt.Errorf("cannot set keep-alive period: %w", err) + l.Log.Warnf("Cannot set keep-alive period: %w", err) } } } - // Store the connection mapped to its address - l.Lock() - l.connections[conn] = struct{}{} - l.Unlock() - return nil } func (l *streamListener) closeConnection(conn net.Conn) { addr := conn.RemoteAddr().String() if err := conn.Close(); err != nil { - l.Log.Errorf("Cannot close connection to %q: %v", addr, err) + l.Log.Warnf("Cannot close connection to %q: %v", addr, err) } delete(l.connections, conn) } @@ -185,6 +183,7 @@ func (l *streamListener) listen(acc telegraf.Accumulator) { if err := l.setupConnection(conn); err != nil { acc.AddError(err) + continue } wg.Add(1)