fix(inputs.socket_listener): Avoid noisy logs on closed connection (#13288)
This commit is contained in:
parent
727533ee55
commit
ad4df2105c
|
|
@ -136,8 +136,9 @@ func TestSocketListener(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup plugin according to test specification
|
// Setup plugin according to test specification
|
||||||
|
logger := &testutil.CaptureLogger{}
|
||||||
plugin := &SocketListener{
|
plugin := &SocketListener{
|
||||||
Log: &testutil.Logger{},
|
Log: logger,
|
||||||
ServiceAddress: proto + "://" + serverAddr,
|
ServiceAddress: proto + "://" + serverAddr,
|
||||||
ContentEncoding: tt.encoding,
|
ContentEncoding: tt.encoding,
|
||||||
ReadBufferSize: tt.buffersize,
|
ReadBufferSize: tt.buffersize,
|
||||||
|
|
@ -158,10 +159,17 @@ func TestSocketListener(t *testing.T) {
|
||||||
require.NoError(t, plugin.Start(&acc))
|
require.NoError(t, plugin.Start(&acc))
|
||||||
defer plugin.Stop()
|
defer plugin.Stop()
|
||||||
|
|
||||||
// Setup the client for submitting data
|
|
||||||
addr := plugin.listener.addr()
|
addr := plugin.listener.addr()
|
||||||
|
|
||||||
|
// Create a noop client
|
||||||
|
// Server is async, so verify no errors at the end.
|
||||||
client, err := createClient(plugin.ServiceAddress, addr, tlsCfg)
|
client, err := createClient(plugin.ServiceAddress, addr, tlsCfg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, client.Close())
|
||||||
|
|
||||||
|
// Setup the client for submitting data
|
||||||
|
client, err = createClient(plugin.ServiceAddress, addr, tlsCfg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Send the data with the correct encoding
|
// Send the data with the correct encoding
|
||||||
encoder, err := internal.NewContentEncoder(tt.encoding)
|
encoder, err := internal.NewContentEncoder(tt.encoding)
|
||||||
|
|
@ -189,6 +197,8 @@ func TestSocketListener(t *testing.T) {
|
||||||
|
|
||||||
plugin.Stop()
|
plugin.Stop()
|
||||||
|
|
||||||
|
// Make sure we clear out old messages
|
||||||
|
logger.Clear()
|
||||||
if _, ok := plugin.listener.(*streamListener); ok {
|
if _, ok := plugin.listener.(*streamListener); ok {
|
||||||
// Verify that plugin.Stop() closed the client's connection
|
// Verify that plugin.Stop() closed the client's connection
|
||||||
_ = client.SetReadDeadline(time.Now().Add(time.Second))
|
_ = client.SetReadDeadline(time.Now().Add(time.Second))
|
||||||
|
|
@ -196,6 +206,9 @@ func TestSocketListener(t *testing.T) {
|
||||||
_, err = client.Read(buf)
|
_, err = client.Read(buf)
|
||||||
require.Equal(t, err, io.EOF)
|
require.Equal(t, err, io.EOF)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
require.Empty(t, logger.Errors())
|
||||||
|
require.Empty(t, logger.Warnings())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,11 +5,13 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
|
@ -133,7 +135,7 @@ func (l *streamListener) setupConnection(conn net.Conn) error {
|
||||||
|
|
||||||
func (l *streamListener) closeConnection(conn net.Conn) {
|
func (l *streamListener) closeConnection(conn net.Conn) {
|
||||||
addr := conn.RemoteAddr().String()
|
addr := conn.RemoteAddr().String()
|
||||||
if err := conn.Close(); err != nil {
|
if err := conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) {
|
||||||
l.Log.Warnf("Cannot close connection to %q: %v", addr, err)
|
l.Log.Warnf("Cannot close connection to %q: %v", addr, err)
|
||||||
}
|
}
|
||||||
delete(l.connections, conn)
|
delete(l.connections, conn)
|
||||||
|
|
@ -190,7 +192,9 @@ func (l *streamListener) listen(acc telegraf.Accumulator) {
|
||||||
go func(c net.Conn) {
|
go func(c net.Conn) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if err := l.read(acc, c); err != nil {
|
if err := l.read(acc, c); err != nil {
|
||||||
acc.AddError(err)
|
if !errors.Is(err, io.EOF) && !errors.Is(err, syscall.ECONNRESET) {
|
||||||
|
acc.AddError(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
l.Lock()
|
l.Lock()
|
||||||
l.closeConnection(conn)
|
l.closeConnection(conn)
|
||||||
|
|
|
||||||
|
|
@ -127,3 +127,9 @@ func (l *CaptureLogger) LastError() string {
|
||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *CaptureLogger) Clear() {
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
|
l.messages = make([]Entry, 0)
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue