feat(common.socket): Allow parallel parsing with a pool of workers (#15891)
This commit is contained in:
parent
f4f7a63860
commit
c0a365686b
|
|
@ -13,7 +13,10 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/alitto/pond"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -25,9 +28,18 @@ type packetListener struct {
|
||||||
Log telegraf.Logger
|
Log telegraf.Logger
|
||||||
|
|
||||||
conn net.PacketConn
|
conn net.PacketConn
|
||||||
decoder internal.ContentDecoder
|
decoders sync.Pool
|
||||||
path string
|
path string
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
parsePool *pond.WorkerPool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPacketListener(encoding string, maxDecompressionSize config.Size, maxWorkers int) *packetListener {
|
||||||
|
return &packetListener{
|
||||||
|
Encoding: encoding,
|
||||||
|
MaxDecompressionSize: int64(maxDecompressionSize),
|
||||||
|
parsePool: pond.New(maxWorkers, 0, pond.MinWorkers(maxWorkers/2+1)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *packetListener) listenData(onData CallbackData, onError CallbackError) {
|
func (l *packetListener) listenData(onData CallbackData, onError CallbackError) {
|
||||||
|
|
@ -48,7 +60,12 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
body, err := l.decoder.Decode(buf[:n])
|
d := make([]byte, n)
|
||||||
|
copy(d, buf[:n])
|
||||||
|
l.parsePool.Submit(func() {
|
||||||
|
decoder := l.decoders.Get().(internal.ContentDecoder)
|
||||||
|
defer l.decoders.Put(decoder)
|
||||||
|
body, err := decoder.Decode(d)
|
||||||
if err != nil && onError != nil {
|
if err != nil && onError != nil {
|
||||||
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
|
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
|
||||||
}
|
}
|
||||||
|
|
@ -56,7 +73,9 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
|
||||||
if l.path != "" {
|
if l.path != "" {
|
||||||
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
|
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
|
||||||
}
|
}
|
||||||
|
|
||||||
onData(src, body)
|
onData(src, body)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
@ -80,8 +99,15 @@ func (l *packetListener) listenConnection(onConnection CallbackConnection, onErr
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d := make([]byte, n)
|
||||||
|
copy(d, buf[:n])
|
||||||
|
l.parsePool.Submit(func() {
|
||||||
// Decode the contents depending on the given encoding
|
// Decode the contents depending on the given encoding
|
||||||
body, err := l.decoder.Decode(buf[:n])
|
decoder := l.decoders.Get().(internal.ContentDecoder)
|
||||||
|
// Not possible to immediately return the decoder to the Pool after calling Decode, because some
|
||||||
|
// decoders return a reference to their internal buffers. This would cause data races.
|
||||||
|
defer l.decoders.Put(decoder)
|
||||||
|
body, err := decoder.Decode(d[:n])
|
||||||
if err != nil && onError != nil {
|
if err != nil && onError != nil {
|
||||||
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
|
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
|
||||||
}
|
}
|
||||||
|
|
@ -93,13 +119,14 @@ func (l *packetListener) listenConnection(onConnection CallbackConnection, onErr
|
||||||
|
|
||||||
// Create a pipe and notify the caller via Callback that new data is
|
// Create a pipe and notify the caller via Callback that new data is
|
||||||
// available. Afterwards write the data. Please note: Write() will
|
// available. Afterwards write the data. Please note: Write() will
|
||||||
// blocks until all data is consumed!
|
// block until all data is consumed!
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
go onConnection(src, reader)
|
go onConnection(src, reader)
|
||||||
if _, err := writer.Write(body); err != nil && onError != nil {
|
if _, err := writer.Write(body); err != nil && onError != nil {
|
||||||
onError(err)
|
onError(err)
|
||||||
}
|
}
|
||||||
writer.Close()
|
writer.Close()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
@ -133,18 +160,7 @@ func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a decoder for the given encoding
|
return l.setupDecoder()
|
||||||
var options []internal.DecodingOption
|
|
||||||
if l.MaxDecompressionSize > 0 {
|
|
||||||
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
|
|
||||||
}
|
|
||||||
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("creating decoder failed: %w", err)
|
|
||||||
}
|
|
||||||
l.decoder = decoder
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) error {
|
func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) error {
|
||||||
|
|
@ -179,20 +195,9 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err
|
||||||
l.Log.Warnf("Setting read buffer on %s socket failed: %v", u.Scheme, err)
|
l.Log.Warnf("Setting read buffer on %s socket failed: %v", u.Scheme, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
l.conn = conn
|
l.conn = conn
|
||||||
|
return l.setupDecoder()
|
||||||
// Create a decoder for the given encoding
|
|
||||||
var options []internal.DecodingOption
|
|
||||||
if l.MaxDecompressionSize > 0 {
|
|
||||||
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
|
|
||||||
}
|
|
||||||
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("creating decoder failed: %w", err)
|
|
||||||
}
|
|
||||||
l.decoder = decoder
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *packetListener) setupIP(u *url.URL) error {
|
func (l *packetListener) setupIP(u *url.URL) error {
|
||||||
|
|
@ -200,18 +205,27 @@ func (l *packetListener) setupIP(u *url.URL) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("listening (ip) failed: %w", err)
|
return fmt.Errorf("listening (ip) failed: %w", err)
|
||||||
}
|
}
|
||||||
l.conn = conn
|
|
||||||
|
|
||||||
|
l.conn = conn
|
||||||
|
return l.setupDecoder()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *packetListener) setupDecoder() error {
|
||||||
// Create a decoder for the given encoding
|
// Create a decoder for the given encoding
|
||||||
var options []internal.DecodingOption
|
var options []internal.DecodingOption
|
||||||
if l.MaxDecompressionSize > 0 {
|
if l.MaxDecompressionSize > 0 {
|
||||||
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
|
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
l.decoders = sync.Pool{New: func() any {
|
||||||
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
|
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("creating decoder failed: %w", err)
|
l.Log.Errorf("creating decoder failed: %v", err)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
l.decoder = decoder
|
|
||||||
|
return decoder
|
||||||
|
}}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -237,5 +251,7 @@ func (l *packetListener) close() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
l.parsePool.StopAndWait()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@ type Config struct {
|
||||||
SocketMode string `toml:"socket_mode"`
|
SocketMode string `toml:"socket_mode"`
|
||||||
ContentEncoding string `toml:"content_encoding"`
|
ContentEncoding string `toml:"content_encoding"`
|
||||||
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
|
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
|
||||||
|
MaxParallelParsers int `toml:"max_parallel_parsers"`
|
||||||
common_tls.ServerConfig
|
common_tls.ServerConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -96,74 +97,54 @@ func (cfg *Config) NewSocket(address string, splitcfg *SplitConfig, logger teleg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Socket) Setup() error {
|
func (s *Socket) Setup() error {
|
||||||
|
s.MaxParallelParsers = max(s.MaxParallelParsers, 1)
|
||||||
switch s.url.Scheme {
|
switch s.url.Scheme {
|
||||||
case "tcp", "tcp4", "tcp6":
|
case "tcp", "tcp4", "tcp6":
|
||||||
l := &streamListener{
|
l := newStreamListener(
|
||||||
ReadBufferSize: int(s.ReadBufferSize),
|
s.Config,
|
||||||
ReadTimeout: s.ReadTimeout,
|
s.splitter,
|
||||||
KeepAlivePeriod: s.KeepAlivePeriod,
|
s.log,
|
||||||
MaxConnections: s.MaxConnections,
|
)
|
||||||
Encoding: s.ContentEncoding,
|
|
||||||
Splitter: s.splitter,
|
|
||||||
Log: s.log,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := l.setupTCP(s.url, s.tlsCfg); err != nil {
|
if err := l.setupTCP(s.url, s.tlsCfg); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.listener = l
|
s.listener = l
|
||||||
case "unix", "unixpacket":
|
case "unix", "unixpacket":
|
||||||
l := &streamListener{
|
l := newStreamListener(
|
||||||
ReadBufferSize: int(s.ReadBufferSize),
|
s.Config,
|
||||||
ReadTimeout: s.ReadTimeout,
|
s.splitter,
|
||||||
KeepAlivePeriod: s.KeepAlivePeriod,
|
s.log,
|
||||||
MaxConnections: s.MaxConnections,
|
)
|
||||||
Encoding: s.ContentEncoding,
|
|
||||||
Splitter: s.splitter,
|
|
||||||
Log: s.log,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := l.setupUnix(s.url, s.tlsCfg, s.SocketMode); err != nil {
|
if err := l.setupUnix(s.url, s.tlsCfg, s.SocketMode); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.listener = l
|
s.listener = l
|
||||||
case "udp", "udp4", "udp6":
|
case "udp", "udp4", "udp6":
|
||||||
l := &packetListener{
|
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
|
||||||
Encoding: s.ContentEncoding,
|
|
||||||
MaxDecompressionSize: int64(s.MaxDecompressionSize),
|
|
||||||
}
|
|
||||||
if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil {
|
if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.listener = l
|
s.listener = l
|
||||||
case "ip", "ip4", "ip6":
|
case "ip", "ip4", "ip6":
|
||||||
l := &packetListener{
|
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
|
||||||
Encoding: s.ContentEncoding,
|
|
||||||
MaxDecompressionSize: int64(s.MaxDecompressionSize),
|
|
||||||
}
|
|
||||||
if err := l.setupIP(s.url); err != nil {
|
if err := l.setupIP(s.url); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.listener = l
|
s.listener = l
|
||||||
case "unixgram":
|
case "unixgram":
|
||||||
l := &packetListener{
|
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
|
||||||
Encoding: s.ContentEncoding,
|
|
||||||
MaxDecompressionSize: int64(s.MaxDecompressionSize),
|
|
||||||
}
|
|
||||||
if err := l.setupUnixgram(s.url, s.SocketMode); err != nil {
|
if err := l.setupUnixgram(s.url, s.SocketMode); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.listener = l
|
s.listener = l
|
||||||
case "vsock":
|
case "vsock":
|
||||||
l := &streamListener{
|
l := newStreamListener(
|
||||||
ReadBufferSize: int(s.ReadBufferSize),
|
s.Config,
|
||||||
ReadTimeout: s.ReadTimeout,
|
s.splitter,
|
||||||
KeepAlivePeriod: s.KeepAlivePeriod,
|
s.log,
|
||||||
MaxConnections: s.MaxConnections,
|
)
|
||||||
Encoding: s.ContentEncoding,
|
|
||||||
Splitter: s.splitter,
|
|
||||||
Log: s.log,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := l.setupVsock(s.url); err != nil {
|
if err := l.setupVsock(s.url); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/alitto/pond"
|
||||||
"github.com/mdlayher/vsock"
|
"github.com/mdlayher/vsock"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
|
@ -43,11 +44,29 @@ type streamListener struct {
|
||||||
connections uint64
|
connections uint64
|
||||||
path string
|
path string
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
parsePool *pond.WorkerPool
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newStreamListener(conf Config, splitter bufio.SplitFunc, log telegraf.Logger) *streamListener {
|
||||||
|
return &streamListener{
|
||||||
|
ReadBufferSize: int(conf.ReadBufferSize),
|
||||||
|
ReadTimeout: conf.ReadTimeout,
|
||||||
|
KeepAlivePeriod: conf.KeepAlivePeriod,
|
||||||
|
MaxConnections: conf.MaxConnections,
|
||||||
|
Encoding: conf.ContentEncoding,
|
||||||
|
Splitter: splitter,
|
||||||
|
Log: log,
|
||||||
|
|
||||||
|
parsePool: pond.New(
|
||||||
|
conf.MaxParallelParsers,
|
||||||
|
0,
|
||||||
|
pond.MinWorkers(conf.MaxParallelParsers/2+1)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (l *streamListener) setupTCP(u *url.URL, tlsCfg *tls.Config) error {
|
func (l *streamListener) setupTCP(u *url.URL, tlsCfg *tls.Config) error {
|
||||||
var err error
|
var err error
|
||||||
if tlsCfg == nil {
|
if tlsCfg == nil {
|
||||||
|
|
@ -216,6 +235,9 @@ func (l *streamListener) close() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
l.parsePool.StopAndWait()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -334,8 +356,13 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error {
|
||||||
if l.path != "" {
|
if l.path != "" {
|
||||||
src = &net.UnixAddr{Name: l.path, Net: "unix"}
|
src = &net.UnixAddr{Name: l.path, Net: "unix"}
|
||||||
}
|
}
|
||||||
|
|
||||||
data := scanner.Bytes()
|
data := scanner.Bytes()
|
||||||
onData(src, data)
|
d := make([]byte, len(data))
|
||||||
|
copy(d, data)
|
||||||
|
l.parsePool.Submit(func() {
|
||||||
|
onData(src, d)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := scanner.Err(); err != nil {
|
if err := scanner.Err(); err != nil {
|
||||||
|
|
@ -379,7 +406,10 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("read on %s failed: %w", src, err)
|
return fmt.Errorf("read on %s failed: %w", src, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
l.parsePool.Submit(func() {
|
||||||
onData(src, buf)
|
onData(src, buf)
|
||||||
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue