fix(common.socket): Make sure the scanner buffer matches the read-buffer size (#16111)

This commit is contained in:
Sven Rebhan 2024-10-31 22:13:24 +01:00 committed by GitHub
parent 0e9aff6217
commit 42fe362af2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 70 additions and 0 deletions

View File

@ -335,6 +335,9 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error {
timeout := time.Duration(l.ReadTimeout)
scanner := bufio.NewScanner(decoder)
if l.ReadBufferSize > bufio.MaxScanTokenSize {
scanner.Buffer(make([]byte, l.ReadBufferSize), l.ReadBufferSize)
}
scanner.Split(l.Splitter)
for {
// Set the read deadline, if any, then start reading. The read

View File

@ -1,6 +1,7 @@
package socket_listener
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
@ -25,6 +26,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/parsers/all"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/testutil"
)
@ -195,6 +197,71 @@ func TestSocketListener(t *testing.T) {
}
}
func TestLargeReadBuffer(t *testing.T) {
// Construct a buffer-size setting of 100KiB
var bufsize config.Size
require.NoError(t, bufsize.UnmarshalText([]byte("1000KiB")))
// Setup plugin with a sufficient read buffer
plugin := &SocketListener{
ServiceAddress: "tcp://127.0.0.1:0",
Config: socket.Config{
ReadBufferSize: bufsize,
},
SplitConfig: socket.SplitConfig{
SplittingStrategy: "newline",
},
Log: &testutil.Logger{},
}
parser := &value.Parser{
MetricName: "test",
DataType: "string",
}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Create a large message with the readbuffer size
message := bytes.Repeat([]byte{'a'}, int(bufsize)-2)
expected := []telegraf.Metric{
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": string(message)},
time.Unix(0, 0),
),
}
// Start the plugin
var acc testutil.Accumulator
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
addr := plugin.socket.Address()
// Setup the client for submitting data
client, err := createClient(plugin.ServiceAddress, addr, nil)
require.NoError(t, err)
defer client.Close()
_, err = client.Write(append(message, '\n'))
require.NoError(t, err)
client.Close()
getError := func() error {
acc.Lock()
defer acc.Unlock()
return acc.FirstError()
}
// Test the resulting metrics and compare against expected results
require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "did not receive metrics (%d): %v", acc.NMetrics(), getError())
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
}
func TestCases(t *testing.T) {
// Get all directories in testdata
folders, err := os.ReadDir("testcases")