fix(inputs.socket_listener): Fix tracking of unix sockets (#13059)

This commit is contained in:
Patrick Hemmer 2023-04-11 05:46:57 -04:00 committed by GitHub
parent 797824d5fc
commit 7b31606cf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 5 deletions

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
@ -15,6 +16,7 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -181,6 +183,16 @@ func TestSocketListener(t *testing.T) {
}, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics()) }, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics())
actual := acc.GetTelegrafMetrics() actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
plugin.Stop()
if _, ok := plugin.listener.(*streamListener); ok {
// Verify that plugin.Stop() closed the client's connection
_ = client.SetReadDeadline(time.Now().Add(time.Second))
buf := []byte{1}
_, err = client.Read(buf)
assert.Equal(t, err, io.EOF)
}
}) })
} }
} }

View File

@ -32,7 +32,7 @@ type streamListener struct {
Log telegraf.Logger Log telegraf.Logger
listener net.Listener listener net.Listener
connections map[string]net.Conn connections map[net.Conn]struct{}
path string path string
wg sync.WaitGroup wg sync.WaitGroup
@ -123,7 +123,7 @@ func (l *streamListener) setupConnection(conn net.Conn) error {
// Store the connection mapped to its address // Store the connection mapped to its address
l.Lock() l.Lock()
l.connections[addr] = conn l.connections[conn] = struct{}{}
l.Unlock() l.Unlock()
return nil return nil
@ -134,7 +134,7 @@ func (l *streamListener) closeConnection(conn net.Conn) {
if err := conn.Close(); err != nil { if err := conn.Close(); err != nil {
l.Log.Errorf("Cannot close connection to %q: %v", addr, err) l.Log.Errorf("Cannot close connection to %q: %v", addr, err)
} }
delete(l.connections, addr) delete(l.connections, conn)
} }
func (l *streamListener) addr() net.Addr { func (l *streamListener) addr() net.Addr {
@ -147,7 +147,7 @@ func (l *streamListener) close() error {
} }
l.Lock() l.Lock()
for _, conn := range l.connections { for conn := range l.connections {
l.closeConnection(conn) l.closeConnection(conn)
} }
l.Unlock() l.Unlock()
@ -164,7 +164,7 @@ func (l *streamListener) close() error {
} }
func (l *streamListener) listen(acc telegraf.Accumulator) { func (l *streamListener) listen(acc telegraf.Accumulator) {
l.connections = make(map[string]net.Conn) l.connections = make(map[net.Conn]struct{})
l.wg.Add(1) l.wg.Add(1)
defer l.wg.Done() defer l.wg.Done()