fix(common.socket): Use read buffer size config setting as a datagram reader buffer size (#16156)

This commit is contained in:
MarekZydor 2024-11-18 15:46:58 +01:00 committed by GitHub
parent b9fa5b4c8e
commit e28c33e5d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 100 additions and 6 deletions

View File

@ -49,7 +49,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
go func() { go func() {
defer l.wg.Done() defer l.wg.Done()
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet buf := make([]byte, l.ReadBufferSize)
for { for {
n, src, err := l.conn.ReadFrom(buf) n, src, err := l.conn.ReadFrom(buf)
receiveTime := time.Now() receiveTime := time.Now()
@ -88,7 +88,7 @@ func (l *packetListener) listenConnection(onConnection CallbackConnection, onErr
defer l.wg.Done() defer l.wg.Done()
defer l.conn.Close() defer l.conn.Close()
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet buf := make([]byte, l.ReadBufferSize)
for { for {
// Wait for packets and read them // Wait for packets and read them
n, src, err := l.conn.ReadFrom(buf) n, src, err := l.conn.ReadFrom(buf)
@ -133,7 +133,7 @@ func (l *packetListener) listenConnection(onConnection CallbackConnection, onErr
}() }()
} }
func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error { func (l *packetListener) setupUnixgram(u *url.URL, socketMode string, bufferSize int) error {
l.path = filepath.FromSlash(u.Path) l.path = filepath.FromSlash(u.Path)
if runtime.GOOS == "windows" && strings.Contains(l.path, ":") { if runtime.GOOS == "windows" && strings.Contains(l.path, ":") {
l.path = strings.TrimPrefix(l.path, `\`) l.path = strings.TrimPrefix(l.path, `\`)
@ -162,6 +162,12 @@ func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error {
} }
} }
if bufferSize > 0 {
l.ReadBufferSize = bufferSize
} else {
l.ReadBufferSize = 64 * 1024 // 64kb - IP packet size
}
return l.setupDecoder() return l.setupDecoder()
} }
@ -198,6 +204,7 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err
} }
} }
l.ReadBufferSize = 64 * 1024 // 64kb - IP packet size
l.conn = conn l.conn = conn
return l.setupDecoder() return l.setupDecoder()
} }
@ -208,6 +215,7 @@ func (l *packetListener) setupIP(u *url.URL) error {
return fmt.Errorf("listening (ip) failed: %w", err) return fmt.Errorf("listening (ip) failed: %w", err)
} }
l.ReadBufferSize = 64 * 1024 // 64kb - IP packet size
l.conn = conn l.conn = conn
return l.setupDecoder() return l.setupDecoder()
} }

View File

@ -136,7 +136,7 @@ func (s *Socket) Setup() error {
s.listener = l s.listener = l
case "unixgram": case "unixgram":
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers) l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
if err := l.setupUnixgram(s.url, s.SocketMode); err != nil { if err := l.setupUnixgram(s.url, s.SocketMode, int(s.ReadBufferSize)); err != nil {
return err return err
} }
s.listener = l s.listener = l

View File

@ -197,8 +197,8 @@ func TestSocketListener(t *testing.T) {
} }
} }
func TestLargeReadBuffer(t *testing.T) { func TestLargeReadBufferTCP(t *testing.T) {
// Construct a buffer-size setting of 100KiB // Construct a buffer-size setting of 1000KiB
var bufsize config.Size var bufsize config.Size
require.NoError(t, bufsize.UnmarshalText([]byte("1000KiB"))) require.NoError(t, bufsize.UnmarshalText([]byte("1000KiB")))
@ -262,6 +262,92 @@ func TestLargeReadBuffer(t *testing.T) {
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
} }
func TestLargeReadBufferUnixgram(t *testing.T) {
// Construct a buffer-size setting of 100KiB
// Assuming that the testing environment has net.core.wmem_max set to a value greater than 100KiB
if runtime.GOOS == "windows" {
t.Skip("Skipping on Windows, as unixgram sockets are not supported")
}
if runtime.GOOS == "darwin" {
t.Skip("Skipping on macOS (darwin), as unixgram write buffer size cannot be changed (default 2048 bytes)")
}
var bufsize config.Size
require.NoError(t, bufsize.UnmarshalText([]byte("100KiB")))
// Create a socket
sock, err := os.CreateTemp("", "sock-")
require.NoError(t, err)
defer sock.Close()
defer os.Remove(sock.Name())
var serverAddr = sock.Name()
// Setup plugin with a sufficient read buffer
plugin := &SocketListener{
ServiceAddress: "unixgram" + "://" + serverAddr,
Config: socket.Config{
ReadBufferSize: bufsize,
},
Log: &testutil.Logger{},
}
parser := &value.Parser{
MetricName: "test",
DataType: "string",
}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Create a large message with the readbuffer size
message := bytes.Repeat([]byte{'a'}, int(bufsize))
expected := []telegraf.Metric{
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": string(message)},
time.Unix(0, 0),
),
}
// Start the plugin
var acc testutil.Accumulator
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
addr := plugin.socket.Address()
// Setup the client for submitting data
client, err := createClient(plugin.ServiceAddress, addr, nil)
require.NoError(t, err)
defer client.Close()
// Check the socket write buffer size
unixConn, ok := client.(*net.UnixConn)
require.True(t, ok, "client is not a *net.UnixConn")
if err := unixConn.SetWriteBuffer(len(message)); err != nil {
t.Skipf("Failed to set write buffer size: %v. Skipping test.", err)
}
// Write the message
_, err = client.Write(message)
require.NoError(t, err)
client.Close()
getError := func() error {
acc.Lock()
defer acc.Unlock()
return acc.FirstError()
}
// Test the resulting metrics and compare against expected results
require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "did not receive metrics (%d): %v", acc.NMetrics(), getError())
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
}
func TestCases(t *testing.T) { func TestCases(t *testing.T) {
// Get all directories in testdata // Get all directories in testdata
folders, err := os.ReadDir("testcases") folders, err := os.ReadDir("testcases")