diff --git a/plugins/common/socket/stream.go b/plugins/common/socket/stream.go index ae41e1db7..36e642b34 100644 --- a/plugins/common/socket/stream.go +++ b/plugins/common/socket/stream.go @@ -335,6 +335,9 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error { timeout := time.Duration(l.ReadTimeout) scanner := bufio.NewScanner(decoder) + if l.ReadBufferSize > bufio.MaxScanTokenSize { + scanner.Buffer(make([]byte, l.ReadBufferSize), l.ReadBufferSize) + } scanner.Split(l.Splitter) for { // Set the read deadline, if any, then start reading. The read diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index ac3ca3815..31585d457 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -1,6 +1,7 @@ package socket_listener import ( + "bytes" "crypto/tls" "encoding/json" "errors" @@ -25,6 +26,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/parsers/all" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/testutil" ) @@ -195,6 +197,71 @@ func TestSocketListener(t *testing.T) { } } +func TestLargeReadBuffer(t *testing.T) { + // Construct a buffer-size setting of 100KiB + var bufsize config.Size + require.NoError(t, bufsize.UnmarshalText([]byte("1000KiB"))) + + // Setup plugin with a sufficient read buffer + plugin := &SocketListener{ + ServiceAddress: "tcp://127.0.0.1:0", + Config: socket.Config{ + ReadBufferSize: bufsize, + }, + SplitConfig: socket.SplitConfig{ + SplittingStrategy: "newline", + }, + Log: &testutil.Logger{}, + } + parser := &value.Parser{ + MetricName: "test", + DataType: "string", + } + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Create a large message with the readbuffer size + message := bytes.Repeat([]byte{'a'}, int(bufsize)-2) + expected := []telegraf.Metric{ + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": string(message)}, + time.Unix(0, 0), + ), + } + + // Start the plugin + var acc testutil.Accumulator + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + addr := plugin.socket.Address() + + // Setup the client for submitting data + client, err := createClient(plugin.ServiceAddress, addr, nil) + require.NoError(t, err) + defer client.Close() + + _, err = client.Write(append(message, '\n')) + require.NoError(t, err) + client.Close() + + getError := func() error { + acc.Lock() + defer acc.Unlock() + return acc.FirstError() + } + + // Test the resulting metrics and compare against expected results + require.Eventuallyf(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "did not receive metrics (%d): %v", acc.NMetrics(), getError()) + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) +} + func TestCases(t *testing.T) { // Get all directories in testdata folders, err := os.ReadDir("testcases")