fix(inputs.socket_listener): ensure closed connection (#12280)
This commit is contained in:
parent
25154e50fd
commit
0e7a3c69ea
|
|
@ -123,15 +123,13 @@ func (l *streamListener) setupConnection(conn net.Conn) error {
|
|||
|
||||
// Store the connection mapped to its address
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
l.connections[addr] = conn
|
||||
l.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *streamListener) closeConnection(conn net.Conn) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
addr := conn.RemoteAddr().String()
|
||||
if err := conn.Close(); err != nil {
|
||||
l.Log.Errorf("Cannot close connection to %q: %v", addr, err)
|
||||
|
|
@ -148,9 +146,11 @@ func (l *streamListener) close() error {
|
|||
return err
|
||||
}
|
||||
|
||||
l.Lock()
|
||||
for _, conn := range l.connections {
|
||||
l.closeConnection(conn)
|
||||
}
|
||||
l.Unlock()
|
||||
l.wg.Wait()
|
||||
|
||||
if l.path != "" {
|
||||
|
|
@ -184,12 +184,15 @@ func (l *streamListener) listen(acc telegraf.Accumulator) {
|
|||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
go func(c net.Conn) {
|
||||
defer wg.Done()
|
||||
if err := l.read(acc, conn); err != nil {
|
||||
if err := l.read(acc, c); err != nil {
|
||||
acc.AddError(err)
|
||||
}
|
||||
}()
|
||||
l.Lock()
|
||||
l.closeConnection(conn)
|
||||
l.Unlock()
|
||||
}(conn)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue