From 0e7a3c69eacd70db68eadfdb782b344ccacc4a37 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Mon, 28 Nov 2022 16:01:57 +0100 Subject: [PATCH] fix(inputs.socket_listener): ensure closed connection (#12280) --- plugins/inputs/socket_listener/stream_listener.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/socket_listener/stream_listener.go b/plugins/inputs/socket_listener/stream_listener.go index c48f29696..0c765afda 100644 --- a/plugins/inputs/socket_listener/stream_listener.go +++ b/plugins/inputs/socket_listener/stream_listener.go @@ -123,15 +123,13 @@ func (l *streamListener) setupConnection(conn net.Conn) error { // Store the connection mapped to its address l.Lock() - defer l.Unlock() l.connections[addr] = conn + l.Unlock() return nil } func (l *streamListener) closeConnection(conn net.Conn) { - l.Lock() - defer l.Unlock() addr := conn.RemoteAddr().String() if err := conn.Close(); err != nil { l.Log.Errorf("Cannot close connection to %q: %v", addr, err) @@ -148,9 +146,11 @@ func (l *streamListener) close() error { return err } + l.Lock() for _, conn := range l.connections { l.closeConnection(conn) } + l.Unlock() l.wg.Wait() if l.path != "" { @@ -184,12 +184,15 @@ func (l *streamListener) listen(acc telegraf.Accumulator) { } wg.Add(1) - go func() { + go func(c net.Conn) { defer wg.Done() - if err := l.read(acc, conn); err != nil { + if err := l.read(acc, c); err != nil { acc.AddError(err) } - }() + l.Lock() + l.closeConnection(conn) + l.Unlock() + }(conn) } wg.Wait() }