diff --git a/plugins/inputs/socket_listener/packet_listener.go b/plugins/common/socket/datagram.go similarity index 89% rename from plugins/inputs/socket_listener/packet_listener.go rename to plugins/common/socket/datagram.go index f7d4e2d0d..d7999fcf7 100644 --- a/plugins/inputs/socket_listener/packet_listener.go +++ b/plugins/common/socket/datagram.go @@ -1,4 +1,4 @@ -package socket_listener +package socket import ( "errors" @@ -18,7 +18,8 @@ type packetListener struct { MaxDecompressionSize int64 SocketMode string ReadBufferSize int - Parser telegraf.Parser + OnData CallbackData + OnError CallbackError Log telegraf.Logger conn net.PacketConn @@ -26,31 +27,25 @@ type packetListener struct { path string } -func (l *packetListener) listen(acc telegraf.Accumulator) { +func (l *packetListener) listen() { buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet for { n, _, err := l.conn.ReadFrom(buf) if err != nil { if !strings.HasSuffix(err.Error(), ": use of closed network connection") { - acc.AddError(err) + if l.OnError != nil { + l.OnError(err) + } } break } body, err := l.decoder.Decode(buf[:n]) - if err != nil { - acc.AddError(fmt.Errorf("unable to decode incoming packet: %w", err)) + if err != nil && l.OnError != nil { + l.OnError(fmt.Errorf("unable to decode incoming packet: %w", err)) } - metrics, err := l.Parser.Parse(body) - if err != nil { - acc.AddError(fmt.Errorf("unable to parse incoming packet: %w", err)) - // TODO rate limit - continue - } - for _, m := range metrics { - acc.AddMetric(m) - } + l.OnData(body) } } @@ -164,7 +159,7 @@ func (l *packetListener) setupIP(u *url.URL) error { return nil } -func (l *packetListener) addr() net.Addr { +func (l *packetListener) address() net.Addr { return l.conn.LocalAddr() } diff --git a/plugins/common/socket/sample.conf b/plugins/common/socket/sample.conf new file mode 100644 index 000000000..a87b90ad0 --- /dev/null +++ b/plugins/common/socket/sample.conf @@ -0,0 +1,75 @@ + ## Permission for unix sockets (only available on unix sockets) + ## This setting may not be respected by some platforms. To safely restrict + ## permissions it is recommended to place the socket into a previously + ## created directory with the desired permissions. + ## ex: socket_mode = "777" + # socket_mode = "" + + ## Maximum number of concurrent connections (only available on stream sockets like TCP) + ## Zero means unlimited. + # max_connections = 0 + + ## Read timeout (only available on stream sockets like TCP) + ## Zero means unlimited. + # read_timeout = "0s" + + ## Optional TLS configuration (only available on stream sockets like TCP) + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Enables client authentication if set. + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Maximum socket buffer size (in bytes when no unit specified) + ## For stream sockets, once the buffer fills up, the sender will start + ## backing up. For datagram sockets, once the buffer fills up, metrics will + ## start dropping. Defaults to the OS default. + # read_buffer_size = "64KiB" + + ## Period between keep alive probes (only applies to TCP sockets) + ## Zero disables keep alive probes. Defaults to the OS configuration. + # keep_alive_period = "5m" + + ## Content encoding for message payloads + ## Can be set to "gzip" for compressed payloads or "identity" for no encoding. + # content_encoding = "identity" + + ## Maximum size of decoded packet (in bytes when no unit specified) + # max_decompression_size = "500MB" + + ## Message splitting strategy and corresponding settings for stream sockets + ## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet + ## listeners such as udp. + ## Available strategies are: + ## newline -- split at newlines (default) + ## null -- split at null bytes + ## delimiter -- split at delimiter byte-sequence in hex-format + ## given in `splitting_delimiter` + ## fixed length -- split after number of bytes given in `splitting_length` + ## variable length -- split depending on length information received in the + ## data. The length field information is specified in + ## `splitting_length_field`. + # splitting_strategy = "newline" + + ## Delimiter used to split received data to messages consumed by the parser. + ## The delimiter is a hex byte-sequence marking the end of a message + ## e.g. "0x0D0A", "x0d0a" or "0d0a" marks a Windows line-break (CR LF). + ## The value is case-insensitive and can be specified with "0x" or "x" prefix + ## or without. + ## Note: This setting is only used for splitting_strategy = "delimiter". + # splitting_delimiter = "" + + ## Fixed length of a message in bytes. + ## Note: This setting is only used for splitting_strategy = "fixed length". + # splitting_length = 0 + + ## Specification of the length field contained in the data to split messages + ## with variable length. The specification contains the following fields: + ## offset -- start of length field in bytes from begin of data + ## bytes -- length of length field in bytes + ## endianness -- endianness of the value, either "be" for big endian or + ## "le" for little endian + ## header_length -- total length of header to be skipped when passing + ## data on to the parser. If zero (default), the header + ## is passed on to the parser together with the message. + ## Note: This setting is only used for splitting_strategy = "variable length". + # splitting_length_field = {offset = 0, bytes = 0, endianness = "be", header_length = 0} \ No newline at end of file diff --git a/plugins/common/socket/socket.go b/plugins/common/socket/socket.go new file mode 100644 index 000000000..19fd1e043 --- /dev/null +++ b/plugins/common/socket/socket.go @@ -0,0 +1,279 @@ +package socket + +import ( + "bufio" + "crypto/tls" + "encoding/binary" + "encoding/hex" + "fmt" + "net" + "net/url" + "regexp" + "strings" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + tlsint "github.com/influxdata/telegraf/plugins/common/tls" +) + +type listener interface { + address() net.Addr + listen() + close() error +} + +type lengthFieldSpec struct { + Offset int64 `toml:"offset"` + Bytes int64 `toml:"bytes"` + Endianness string `toml:"endianness"` + HeaderLength int64 `toml:"header_length"` + converter func([]byte) int +} + +type CallbackData func([]byte) +type CallbackError func(error) + +type Config struct { + MaxConnections int `toml:"max_connections"` + ReadBufferSize config.Size `toml:"read_buffer_size"` + ReadTimeout config.Duration `toml:"read_timeout"` + KeepAlivePeriod *config.Duration `toml:"keep_alive_period"` + SocketMode string `toml:"socket_mode"` + ContentEncoding string `toml:"content_encoding"` + MaxDecompressionSize config.Size `toml:"max_decompression_size"` + SplittingStrategy string `toml:"splitting_strategy"` + SplittingDelimiter string `toml:"splitting_delimiter"` + SplittingLength int `toml:"splitting_length"` + SplittingLengthField lengthFieldSpec `toml:"splitting_length_field"` + tlsint.ServerConfig +} + +type Socket struct { + Config + + url *url.URL + interfaceName string + tlsCfg *tls.Config + log telegraf.Logger + + splitter bufio.SplitFunc + wg sync.WaitGroup + + listener listener +} + +func (cfg *Config) NewSocket(address string, logger telegraf.Logger) (*Socket, error) { + s := &Socket{ + Config: *cfg, + log: logger, + } + + switch s.SplittingStrategy { + case "", "newline": + s.splitter = bufio.ScanLines + case "null": + s.splitter = scanNull + case "delimiter": + re := regexp.MustCompile(`(\s*0?x)`) + d := re.ReplaceAllString(strings.ToLower(s.SplittingDelimiter), "") + delimiter, err := hex.DecodeString(d) + if err != nil { + return nil, fmt.Errorf("decoding delimiter failed: %w", err) + } + s.splitter = createScanDelimiter(delimiter) + case "fixed length": + s.splitter = createScanFixedLength(s.SplittingLength) + case "variable length": + // Create the converter function + var order binary.ByteOrder + switch strings.ToLower(s.SplittingLengthField.Endianness) { + case "", "be": + order = binary.BigEndian + case "le": + order = binary.LittleEndian + default: + return nil, fmt.Errorf("invalid 'endianness' %q", s.SplittingLengthField.Endianness) + } + + switch s.SplittingLengthField.Bytes { + case 1: + s.SplittingLengthField.converter = func(b []byte) int { + return int(b[0]) + } + case 2: + s.SplittingLengthField.converter = func(b []byte) int { + return int(order.Uint16(b)) + } + case 4: + s.SplittingLengthField.converter = func(b []byte) int { + return int(order.Uint32(b)) + } + case 8: + s.SplittingLengthField.converter = func(b []byte) int { + return int(order.Uint64(b)) + } + default: + s.SplittingLengthField.converter = func(b []byte) int { + buf := make([]byte, 8) + start := 0 + if order == binary.BigEndian { + start = 8 - len(b) + } + for i := 0; i < len(b); i++ { + buf[start+i] = b[i] + } + return int(order.Uint64(buf)) + } + } + + // Check if we have enough bytes in the header + s.splitter = createScanVariableLength(s.SplittingLengthField) + default: + return nil, fmt.Errorf("unknown 'splitting_strategy' %q", s.SplittingStrategy) + } + + // Resolve the interface to an address if any given + ifregex := regexp.MustCompile(`%([\w\.]+)`) + if matches := ifregex.FindStringSubmatch(address); len(matches) == 2 { + s.interfaceName = matches[1] + address = strings.Replace(address, "%"+s.interfaceName, "", 1) + } + + // Preparing TLS configuration + tlsCfg, err := s.ServerConfig.TLSConfig() + if err != nil { + return nil, fmt.Errorf("getting TLS config failed: %w", err) + } + s.tlsCfg = tlsCfg + + // Parse and check the address + u, err := url.Parse(address) + if err != nil { + return nil, fmt.Errorf("parsing address failed: %w", err) + } + s.url = u + + switch s.url.Scheme { + case "tcp", "tcp4", "tcp6", "unix", "unixpacket", + "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram", "vsock": + default: + return nil, fmt.Errorf("unknown protocol %q in %q", u.Scheme, address) + } + + return s, nil +} + +func (s *Socket) Listen(onData CallbackData, onError CallbackError) error { + switch s.url.Scheme { + case "tcp", "tcp4", "tcp6": + l := &streamListener{ + ReadBufferSize: int(s.ReadBufferSize), + ReadTimeout: s.ReadTimeout, + KeepAlivePeriod: s.KeepAlivePeriod, + MaxConnections: s.MaxConnections, + Encoding: s.ContentEncoding, + Splitter: s.splitter, + OnData: onData, + OnError: onError, + Log: s.log, + } + + if err := l.setupTCP(s.url, s.tlsCfg); err != nil { + return err + } + s.listener = l + case "unix", "unixpacket": + l := &streamListener{ + ReadBufferSize: int(s.ReadBufferSize), + ReadTimeout: s.ReadTimeout, + KeepAlivePeriod: s.KeepAlivePeriod, + MaxConnections: s.MaxConnections, + Encoding: s.ContentEncoding, + Splitter: s.splitter, + OnData: onData, + OnError: onError, + Log: s.log, + } + + if err := l.setupUnix(s.url, s.tlsCfg, s.SocketMode); err != nil { + return err + } + s.listener = l + case "udp", "udp4", "udp6": + l := &packetListener{ + Encoding: s.ContentEncoding, + MaxDecompressionSize: int64(s.MaxDecompressionSize), + OnData: onData, + OnError: onError, + } + if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil { + return err + } + s.listener = l + case "ip", "ip4", "ip6": + l := &packetListener{ + Encoding: s.ContentEncoding, + MaxDecompressionSize: int64(s.MaxDecompressionSize), + OnData: onData, + OnError: onError, + } + if err := l.setupIP(s.url); err != nil { + return err + } + s.listener = l + case "unixgram": + l := &packetListener{ + Encoding: s.ContentEncoding, + MaxDecompressionSize: int64(s.MaxDecompressionSize), + OnData: onData, + OnError: onError, + } + if err := l.setupUnixgram(s.url, s.SocketMode); err != nil { + return err + } + s.listener = l + case "vsock": + l := &streamListener{ + ReadBufferSize: int(s.ReadBufferSize), + ReadTimeout: s.ReadTimeout, + KeepAlivePeriod: s.KeepAlivePeriod, + MaxConnections: s.MaxConnections, + Encoding: s.ContentEncoding, + Splitter: s.splitter, + OnData: onData, + OnError: onError, + Log: s.log, + } + + if err := l.setupVsock(s.url); err != nil { + return err + } + s.listener = l + default: + return fmt.Errorf("unknown protocol %q", s.url.Scheme) + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.listener.listen() + }() + + return nil +} + +func (s *Socket) Close() { + if s.listener != nil { + // Ignore the returned error as we cannot do anything about it anyway + if err := s.listener.close(); err != nil { + s.log.Warnf("Closing socket failed: %v", err) + } + s.listener = nil + } + s.wg.Wait() +} + +func (s *Socket) Address() net.Addr { + return s.listener.address() +} diff --git a/plugins/common/socket/socket_test.go b/plugins/common/socket/socket_test.go new file mode 100644 index 000000000..9334422d1 --- /dev/null +++ b/plugins/common/socket/socket_test.go @@ -0,0 +1,282 @@ +package socket + +import ( + "crypto/tls" + "fmt" + "io" + "net" + "os" + "runtime" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" + _ "github.com/influxdata/telegraf/plugins/parsers/all" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/testutil" +) + +var pki = testutil.NewPKI("../../../testutil/pki") + +func TestSocketListener(t *testing.T) { + messages := [][]byte{ + []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n"), + []byte("test,foo=zab v=3i 123456791\n"), + } + expected := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"foo": "bar"}, + map[string]interface{}{"v": int64(1)}, + time.Unix(0, 123456789), + ), + metric.New( + "test", + map[string]string{"foo": "baz"}, + map[string]interface{}{"v": int64(2)}, + time.Unix(0, 123456790), + ), + metric.New( + "test", + map[string]string{"foo": "zab"}, + map[string]interface{}{"v": int64(3)}, + time.Unix(0, 123456791), + ), + } + + tests := []struct { + name string + schema string + buffersize config.Size + encoding string + }{ + { + name: "TCP", + schema: "tcp", + buffersize: config.Size(1024), + }, + { + name: "TCP with TLS", + schema: "tcp+tls", + }, + { + name: "TCP with gzip encoding", + schema: "tcp", + buffersize: config.Size(1024), + encoding: "gzip", + }, + { + name: "UDP", + schema: "udp", + buffersize: config.Size(1024), + }, + { + name: "UDP with gzip encoding", + schema: "udp", + buffersize: config.Size(1024), + encoding: "gzip", + }, + { + name: "unix socket", + schema: "unix", + buffersize: config.Size(1024), + }, + { + name: "unix socket with TLS", + schema: "unix+tls", + }, + { + name: "unix socket with gzip encoding", + schema: "unix", + encoding: "gzip", + }, + { + name: "unixgram socket", + schema: "unixgram", + buffersize: config.Size(1024), + }, + } + + serverTLS := pki.TLSServerConfig() + clientTLS := pki.TLSClientConfig() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + proto := strings.TrimSuffix(tt.schema, "+tls") + + // Prepare the address and socket if needed + var serviceAddress string + var tlsCfg *tls.Config + switch proto { + case "tcp", "udp": + serviceAddress = proto + "://" + "127.0.0.1:0" + case "unix", "unixgram": + if runtime.GOOS == "windows" { + t.Skip("Skipping on Windows, as unixgram sockets are not supported") + } + + // Create a socket + fn := testutil.TempSocket(t) + f, err := os.Create(fn) + require.NoError(t, err) + defer f.Close() + serviceAddress = proto + "://" + fn + } + + // Setup the configuration according to test specification + cfg := &Config{ + ContentEncoding: tt.encoding, + ReadBufferSize: tt.buffersize, + } + if strings.HasSuffix(tt.schema, "tls") { + cfg.ServerConfig = *serverTLS + var err error + tlsCfg, err = clientTLS.TLSConfig() + require.NoError(t, err) + } + + // Create the socket + sock, err := cfg.NewSocket(serviceAddress, &testutil.Logger{}) + require.NoError(t, err) + + // Create callbacks + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + var acc testutil.Accumulator + onData := func(data []byte) { + m, err := parser.Parse(data) + require.NoError(t, err) + acc.AddMetrics(m) + } + onError := func(err error) { + acc.AddError(err) + } + + // Start the listener + require.NoError(t, sock.Listen(onData, onError)) + defer sock.Close() + + addr := sock.Address() + + // Create a noop client + // Server is async, so verify no errors at the end. + client, err := createClient(serviceAddress, addr, tlsCfg) + require.NoError(t, err) + require.NoError(t, client.Close()) + + // Setup the client for submitting data + client, err = createClient(serviceAddress, addr, tlsCfg) + require.NoError(t, err) + + // Send the data with the correct encoding + encoder, err := internal.NewContentEncoder(tt.encoding) + require.NoError(t, err) + + for i, msg := range messages { + m, err := encoder.Encode(msg) + require.NoErrorf(t, err, "encoding failed for msg %d", i) + _, err = client.Write(m) + require.NoErrorf(t, err, "sending msg %d failed", i) + } + + // Test the resulting metrics and compare against expected results + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics()) + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) + }) + } +} + +func TestSocketListenerStream(t *testing.T) { + // Setup the configuration + cfg := &Config{ + ReadBufferSize: 1024, + } + + // Create the socket + serviceAddress := "tcp://127.0.0.1:0" + logger := &testutil.CaptureLogger{} + sock, err := cfg.NewSocket(serviceAddress, logger) + require.NoError(t, err) + + // Create callbacks + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + var acc testutil.Accumulator + onData := func(data []byte) { + m, err := parser.Parse(data) + require.NoError(t, err) + acc.AddMetrics(m) + } + onError := func(err error) { + acc.AddError(err) + } + + // Start the listener + require.NoError(t, sock.Listen(onData, onError)) + defer sock.Close() + + addr := sock.Address() + + // Create a noop client + client, err := createClient(serviceAddress, addr, nil) + require.NoError(t, err) + + _, err = client.Write([]byte("test value=42i\n")) + require.NoError(t, err) + + require.Eventually(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= 1 + }, time.Second, 100*time.Millisecond, "did not receive metric") + + // This has to be a stream-listener... + listener, ok := sock.listener.(*streamListener) + require.True(t, ok) + listener.Lock() + conns := len(listener.connections) + listener.Unlock() + require.NotZero(t, conns) + + sock.Close() + + // Verify that plugin.Stop() closed the client's connection + _ = client.SetReadDeadline(time.Now().Add(time.Second)) + buf := []byte{1} + _, err = client.Read(buf) + require.Equal(t, err, io.EOF) + + require.Empty(t, logger.Errors()) + require.Empty(t, logger.Warnings()) +} + +func createClient(endpoint string, addr net.Addr, tlsCfg *tls.Config) (net.Conn, error) { + // Determine the protocol in a crude fashion + parts := strings.SplitN(endpoint, "://", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid endpoint %q", endpoint) + } + protocol := parts[0] + + if tlsCfg == nil { + return net.Dial(protocol, addr.String()) + } + + if protocol == "unix" { + tlsCfg.InsecureSkipVerify = true + } + return tls.Dial(protocol, addr.String(), tlsCfg) +} diff --git a/plugins/inputs/socket_listener/splitters.go b/plugins/common/socket/splitters.go similarity index 98% rename from plugins/inputs/socket_listener/splitters.go rename to plugins/common/socket/splitters.go index 295c7b9e1..278c9fc16 100644 --- a/plugins/inputs/socket_listener/splitters.go +++ b/plugins/common/socket/splitters.go @@ -1,4 +1,4 @@ -package socket_listener +package socket import ( "bufio" diff --git a/plugins/inputs/socket_listener/stream_listener.go b/plugins/common/socket/stream.go similarity index 88% rename from plugins/inputs/socket_listener/stream_listener.go rename to plugins/common/socket/stream.go index bb5ff28e9..24cec1b6d 100644 --- a/plugins/inputs/socket_listener/stream_listener.go +++ b/plugins/common/socket/stream.go @@ -1,4 +1,4 @@ -package socket_listener +package socket import ( "bufio" @@ -34,11 +34,12 @@ type streamListener struct { ReadTimeout config.Duration KeepAlivePeriod *config.Duration Splitter bufio.SplitFunc - Parser telegraf.Parser + OnData CallbackData + OnError CallbackError Log telegraf.Logger listener net.Listener - connections map[net.Conn]struct{} + connections map[net.Conn]bool path string wg sync.WaitGroup @@ -124,7 +125,7 @@ func (l *streamListener) setupConnection(conn net.Conn) error { _ = conn.Close() return fmt.Errorf("unable to accept connection from %q: too many connections", addr) } - l.connections[conn] = struct{}{} + l.connections[conn] = true l.Unlock() if l.ReadBufferSize > 0 { @@ -169,7 +170,7 @@ func (l *streamListener) closeConnection(conn net.Conn) { delete(l.connections, conn) } -func (l *streamListener) addr() net.Addr { +func (l *streamListener) address() net.Addr { return l.listener.Addr() } @@ -195,8 +196,8 @@ func (l *streamListener) close() error { return nil } -func (l *streamListener) listen(acc telegraf.Accumulator) { - l.connections = make(map[net.Conn]struct{}) +func (l *streamListener) listen() { + l.connections = make(map[net.Conn]bool) l.wg.Add(1) defer l.wg.Done() @@ -205,23 +206,25 @@ func (l *streamListener) listen(acc telegraf.Accumulator) { for { conn, err := l.listener.Accept() if err != nil { - if !errors.Is(err, net.ErrClosed) { - acc.AddError(err) + if !errors.Is(err, net.ErrClosed) && l.OnError != nil { + l.OnError(err) } break } - if err := l.setupConnection(conn); err != nil { - acc.AddError(err) + if err := l.setupConnection(conn); err != nil && l.OnError != nil { + l.OnError(err) continue } wg.Add(1) go func(c net.Conn) { defer wg.Done() - if err := l.read(acc, c); err != nil { + if err := l.read(c); err != nil { if !errors.Is(err, io.EOF) && !errors.Is(err, syscall.ECONNRESET) { - acc.AddError(err) + if l.OnError != nil { + l.OnError(err) + } } } l.Lock() @@ -232,7 +235,7 @@ func (l *streamListener) listen(acc telegraf.Accumulator) { wg.Wait() } -func (l *streamListener) read(acc telegraf.Accumulator, conn net.Conn) error { +func (l *streamListener) read(conn net.Conn) error { decoder, err := internal.NewStreamContentDecoder(l.Encoding, conn) if err != nil { return fmt.Errorf("creating decoder failed: %w", err) @@ -259,15 +262,7 @@ func (l *streamListener) read(acc telegraf.Accumulator, conn net.Conn) error { } data := scanner.Bytes() - metrics, err := l.Parser.Parse(data) - if err != nil { - acc.AddError(fmt.Errorf("parsing error: %w", err)) - l.Log.Debugf("invalid data for parser: %v", data) - continue - } - for _, m := range metrics { - acc.AddMetric(m) - } + l.OnData(data) } if err := scanner.Err(); err != nil { diff --git a/plugins/inputs/socket_listener/README.md b/plugins/inputs/socket_listener/README.md index b19e2de49..515f2d1c4 100644 --- a/plugins/inputs/socket_listener/README.md +++ b/plugins/inputs/socket_listener/README.md @@ -44,55 +44,42 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # service_address = "unixgram:///tmp/telegraf.sock" # service_address = "vsock://cid:port" - ## Change the file mode bits on unix sockets. These permissions may not be - ## respected by some platforms, to safely restrict write permissions it is best - ## to place the socket into a directory that has previously been created - ## with the desired permissions. + ## Permission for unix sockets (only available on unix sockets) + ## This setting may not be respected by some platforms. To safely restrict + ## permissions it is recommended to place the socket into a previously + ## created directory with the desired permissions. ## ex: socket_mode = "777" # socket_mode = "" - ## Maximum number of concurrent connections. - ## Only applies to stream sockets (e.g. TCP). - ## 0 (default) is unlimited. - # max_connections = 1024 + ## Maximum number of concurrent connections (only available on stream sockets like TCP) + ## Zero means unlimited. + # max_connections = 0 - ## Read timeout. - ## Only applies to stream sockets (e.g. TCP). - ## 0 (default) is unlimited. - # read_timeout = "30s" + ## Read timeout (only available on stream sockets like TCP) + ## Zero means unlimited. + # read_timeout = "0s" - ## Optional TLS configuration. - ## Only applies to stream sockets (e.g. TCP). + ## Optional TLS configuration (only available on stream sockets like TCP) # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Enables client authentication if set. # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] - ## Maximum socket buffer size (in bytes when no unit specified). - ## For stream sockets, once the buffer fills up, the sender will start backing up. - ## For datagram sockets, once the buffer fills up, metrics will start dropping. - ## Defaults to the OS default. + ## Maximum socket buffer size (in bytes when no unit specified) + ## For stream sockets, once the buffer fills up, the sender will start + ## backing up. For datagram sockets, once the buffer fills up, metrics will + ## start dropping. Defaults to the OS default. # read_buffer_size = "64KiB" - ## Period between keep alive probes. - ## Only applies to TCP sockets. - ## 0 disables keep alive probes. - ## Defaults to the OS configuration. + ## Period between keep alive probes (only applies to TCP sockets) + ## Zero disables keep alive probes. Defaults to the OS configuration. # keep_alive_period = "5m" - ## Data format to consume. - ## Each data format has its own unique set of configuration options, read - ## more about them here: - ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - # data_format = "influx" - - ## Content encoding for message payloads, can be set to "gzip" to or - ## "identity" to apply no encoding. + ## Content encoding for message payloads + ## Can be set to "gzip" for compressed payloads or "identity" for no encoding. # content_encoding = "identity" - ## Maximum size of decoded packet. - ## Acceptable units are B, KiB, KB, MiB, MB... - ## Without quotes and units, interpreted as size in bytes. + ## Maximum size of decoded packet (in bytes when no unit specified) # max_decompression_size = "500MB" ## Message splitting strategy and corresponding settings for stream sockets @@ -132,6 +119,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## is passed on to the parser together with the message. ## Note: This setting is only used for splitting_strategy = "variable length". # splitting_length_field = {offset = 0, bytes = 0, endianness = "be", header_length = 0} + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + # data_format = "influx" ``` ## A Note on UDP OS Buffer Sizes diff --git a/plugins/inputs/socket_listener/sample.conf b/plugins/inputs/socket_listener/sample.conf index 7b4983e09..33162ae81 100644 --- a/plugins/inputs/socket_listener/sample.conf +++ b/plugins/inputs/socket_listener/sample.conf @@ -13,55 +13,42 @@ # service_address = "unixgram:///tmp/telegraf.sock" # service_address = "vsock://cid:port" - ## Change the file mode bits on unix sockets. These permissions may not be - ## respected by some platforms, to safely restrict write permissions it is best - ## to place the socket into a directory that has previously been created - ## with the desired permissions. + ## Permission for unix sockets (only available on unix sockets) + ## This setting may not be respected by some platforms. To safely restrict + ## permissions it is recommended to place the socket into a previously + ## created directory with the desired permissions. ## ex: socket_mode = "777" # socket_mode = "" - ## Maximum number of concurrent connections. - ## Only applies to stream sockets (e.g. TCP). - ## 0 (default) is unlimited. - # max_connections = 1024 + ## Maximum number of concurrent connections (only available on stream sockets like TCP) + ## Zero means unlimited. + # max_connections = 0 - ## Read timeout. - ## Only applies to stream sockets (e.g. TCP). - ## 0 (default) is unlimited. - # read_timeout = "30s" + ## Read timeout (only available on stream sockets like TCP) + ## Zero means unlimited. + # read_timeout = "0s" - ## Optional TLS configuration. - ## Only applies to stream sockets (e.g. TCP). + ## Optional TLS configuration (only available on stream sockets like TCP) # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Enables client authentication if set. # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] - ## Maximum socket buffer size (in bytes when no unit specified). - ## For stream sockets, once the buffer fills up, the sender will start backing up. - ## For datagram sockets, once the buffer fills up, metrics will start dropping. - ## Defaults to the OS default. + ## Maximum socket buffer size (in bytes when no unit specified) + ## For stream sockets, once the buffer fills up, the sender will start + ## backing up. For datagram sockets, once the buffer fills up, metrics will + ## start dropping. Defaults to the OS default. # read_buffer_size = "64KiB" - ## Period between keep alive probes. - ## Only applies to TCP sockets. - ## 0 disables keep alive probes. - ## Defaults to the OS configuration. + ## Period between keep alive probes (only applies to TCP sockets) + ## Zero disables keep alive probes. Defaults to the OS configuration. # keep_alive_period = "5m" - ## Data format to consume. - ## Each data format has its own unique set of configuration options, read - ## more about them here: - ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - # data_format = "influx" - - ## Content encoding for message payloads, can be set to "gzip" to or - ## "identity" to apply no encoding. + ## Content encoding for message payloads + ## Can be set to "gzip" for compressed payloads or "identity" for no encoding. # content_encoding = "identity" - ## Maximum size of decoded packet. - ## Acceptable units are B, KiB, KB, MiB, MB... - ## Without quotes and units, interpreted as size in bytes. + ## Maximum size of decoded packet (in bytes when no unit specified) # max_decompression_size = "500MB" ## Message splitting strategy and corresponding settings for stream sockets @@ -101,3 +88,9 @@ ## is passed on to the parser together with the message. ## Note: This setting is only used for splitting_strategy = "variable length". # splitting_length_field = {offset = 0, bytes = 0, endianness = "be", header_length = 0} + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + # data_format = "influx" diff --git a/plugins/inputs/socket_listener/sample.conf.in b/plugins/inputs/socket_listener/sample.conf.in new file mode 100644 index 000000000..f8c1e19a9 --- /dev/null +++ b/plugins/inputs/socket_listener/sample.conf.in @@ -0,0 +1,22 @@ +# Generic socket listener capable of handling multiple socket types. +[[inputs.socket_listener]] + ## URL to listen on + # service_address = "tcp://:8094" + # service_address = "tcp://127.0.0.1:http" + # service_address = "tcp4://:8094" + # service_address = "tcp6://:8094" + # service_address = "tcp6://[2001:db8::1]:8094" + # service_address = "udp://:8094" + # service_address = "udp4://:8094" + # service_address = "udp6://:8094" + # service_address = "unix:///tmp/telegraf.sock" + # service_address = "unixgram:///tmp/telegraf.sock" + # service_address = "vsock://cid:port" + +{{template "/plugins/common/socket/sample.conf"}} + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + # data_format = "influx" diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index b57c29c14..6893d3f15 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -1,62 +1,25 @@ +//go:generate ../../../tools/config_includer/generator //go:generate ../../../tools/readme_config_includer/generator package socket_listener import ( - "bufio" _ "embed" - "encoding/binary" - "encoding/hex" - "fmt" - "net" - "net/url" - "regexp" - "strings" - "sync" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/config" - tlsint "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/common/socket" "github.com/influxdata/telegraf/plugins/inputs" ) //go:embed sample.conf var sampleConfig string -type listener interface { - listen(acc telegraf.Accumulator) - addr() net.Addr - close() error -} - -type lengthFieldSpec struct { - Offset int64 `toml:"offset"` - Bytes int64 `toml:"bytes"` - Endianness string `toml:"endianness"` - HeaderLength int64 `toml:"header_length"` - converter func([]byte) int -} - type SocketListener struct { - ServiceAddress string `toml:"service_address"` - MaxConnections int `toml:"max_connections"` - ReadBufferSize config.Size `toml:"read_buffer_size"` - ReadTimeout config.Duration `toml:"read_timeout"` - KeepAlivePeriod *config.Duration `toml:"keep_alive_period"` - SocketMode string `toml:"socket_mode"` - ContentEncoding string `toml:"content_encoding"` - MaxDecompressionSize config.Size `toml:"max_decompression_size"` - SplittingStrategy string `toml:"splitting_strategy"` - SplittingDelimiter string `toml:"splitting_delimiter"` - SplittingLength int `toml:"splitting_length"` - SplittingLengthField lengthFieldSpec `toml:"splitting_length_field"` - Log telegraf.Logger `toml:"-"` - tlsint.ServerConfig + ServiceAddress string `toml:"service_address"` + Log telegraf.Logger `toml:"-"` + socket.Config - wg sync.WaitGroup - parser telegraf.Parser - splitter bufio.SplitFunc - - listener listener + socket *socket.Socket + parser telegraf.Parser } func (*SocketListener) SampleConfig() string { @@ -64,69 +27,12 @@ func (*SocketListener) SampleConfig() string { } func (sl *SocketListener) Init() error { - switch sl.SplittingStrategy { - case "", "newline": - sl.splitter = bufio.ScanLines - case "null": - sl.splitter = scanNull - case "delimiter": - re := regexp.MustCompile(`(\s*0?x)`) - d := re.ReplaceAllString(strings.ToLower(sl.SplittingDelimiter), "") - delimiter, err := hex.DecodeString(d) - if err != nil { - return fmt.Errorf("decoding delimiter failed: %w", err) - } - sl.splitter = createScanDelimiter(delimiter) - case "fixed length": - sl.splitter = createScanFixedLength(sl.SplittingLength) - case "variable length": - // Create the converter function - var order binary.ByteOrder - switch strings.ToLower(sl.SplittingLengthField.Endianness) { - case "", "be": - order = binary.BigEndian - case "le": - order = binary.LittleEndian - default: - return fmt.Errorf("invalid 'endianness' %q", sl.SplittingLengthField.Endianness) - } - - switch sl.SplittingLengthField.Bytes { - case 1: - sl.SplittingLengthField.converter = func(b []byte) int { - return int(b[0]) - } - case 2: - sl.SplittingLengthField.converter = func(b []byte) int { - return int(order.Uint16(b)) - } - case 4: - sl.SplittingLengthField.converter = func(b []byte) int { - return int(order.Uint32(b)) - } - case 8: - sl.SplittingLengthField.converter = func(b []byte) int { - return int(order.Uint64(b)) - } - default: - sl.SplittingLengthField.converter = func(b []byte) int { - buf := make([]byte, 8) - start := 0 - if order == binary.BigEndian { - start = 8 - len(b) - } - for i := 0; i < len(b); i++ { - buf[start+i] = b[i] - } - return int(order.Uint64(buf)) - } - } - - // Check if we have enough bytes in the header - sl.splitter = createScanVariableLength(sl.SplittingLengthField) - default: - return fmt.Errorf("unknown 'splitting_strategy' %q", sl.SplittingStrategy) + sock, err := sl.Config.NewSocket(sl.ServiceAddress, sl.Log) + if err != nil { + return err } + sl.socket = sock + return nil } @@ -139,128 +45,35 @@ func (sl *SocketListener) SetParser(parser telegraf.Parser) { } func (sl *SocketListener) Start(acc telegraf.Accumulator) error { - // Resolve the interface to an address if any given - var ifname string - ifregex := regexp.MustCompile(`%([\w\.]+)`) - if matches := ifregex.FindStringSubmatch(sl.ServiceAddress); len(matches) == 2 { - ifname := matches[1] - sl.ServiceAddress = strings.Replace(sl.ServiceAddress, "%"+ifname, "", 1) + // Create the callbacks for parsing the data and recording issues + onData := func(data []byte) { + metrics, err := sl.parser.Parse(data) + if err != nil { + acc.AddError(err) + return + } + for _, m := range metrics { + acc.AddMetric(m) + } + } + onError := func(err error) { + acc.AddError(err) } - // Preparing TLS configuration - tlsCfg, err := sl.ServerConfig.TLSConfig() - if err != nil { - return fmt.Errorf("getting TLS config failed: %w", err) + // Start the listener + if err := sl.socket.Listen(onData, onError); err != nil { + return err } - - // Setup the network connection - u, err := url.Parse(sl.ServiceAddress) - if err != nil { - return fmt.Errorf("parsing address failed: %w", err) - } - - switch u.Scheme { - case "tcp", "tcp4", "tcp6": - ssl := &streamListener{ - ReadBufferSize: int(sl.ReadBufferSize), - ReadTimeout: sl.ReadTimeout, - KeepAlivePeriod: sl.KeepAlivePeriod, - MaxConnections: sl.MaxConnections, - Encoding: sl.ContentEncoding, - Splitter: sl.splitter, - Parser: sl.parser, - Log: sl.Log, - } - - if err := ssl.setupTCP(u, tlsCfg); err != nil { - return err - } - sl.listener = ssl - case "unix", "unixpacket": - ssl := &streamListener{ - ReadBufferSize: int(sl.ReadBufferSize), - ReadTimeout: sl.ReadTimeout, - KeepAlivePeriod: sl.KeepAlivePeriod, - MaxConnections: sl.MaxConnections, - Encoding: sl.ContentEncoding, - Splitter: sl.splitter, - Parser: sl.parser, - Log: sl.Log, - } - - if err := ssl.setupUnix(u, tlsCfg, sl.SocketMode); err != nil { - return err - } - sl.listener = ssl - - case "udp", "udp4", "udp6": - psl := &packetListener{ - Encoding: sl.ContentEncoding, - MaxDecompressionSize: int64(sl.MaxDecompressionSize), - Parser: sl.parser, - } - if err := psl.setupUDP(u, ifname, int(sl.ReadBufferSize)); err != nil { - return err - } - sl.listener = psl - case "ip", "ip4", "ip6": - psl := &packetListener{ - Encoding: sl.ContentEncoding, - MaxDecompressionSize: int64(sl.MaxDecompressionSize), - Parser: sl.parser, - } - if err := psl.setupIP(u); err != nil { - return err - } - sl.listener = psl - case "unixgram": - psl := &packetListener{ - Encoding: sl.ContentEncoding, - MaxDecompressionSize: int64(sl.MaxDecompressionSize), - Parser: sl.parser, - } - if err := psl.setupUnixgram(u, sl.SocketMode); err != nil { - return err - } - sl.listener = psl - case "vsock": - ssl := &streamListener{ - ReadBufferSize: int(sl.ReadBufferSize), - ReadTimeout: sl.ReadTimeout, - KeepAlivePeriod: sl.KeepAlivePeriod, - MaxConnections: sl.MaxConnections, - Encoding: sl.ContentEncoding, - Splitter: sl.splitter, - Parser: sl.parser, - Log: sl.Log, - } - - if err := ssl.setupVsock(u); err != nil { - return err - } - sl.listener = ssl - default: - return fmt.Errorf("unknown protocol %q in %q", u.Scheme, sl.ServiceAddress) - } - - sl.Log.Infof("Listening on %s://%s", u.Scheme, sl.listener.addr()) - - sl.wg.Add(1) - go func() { - defer sl.wg.Done() - sl.listener.listen(acc) - }() + addr := sl.socket.Address() + sl.Log.Infof("Listening on %s://%s", addr.Network(), addr.String()) return nil } func (sl *SocketListener) Stop() { - if sl.listener != nil { - // Ignore the returned error as we cannot do anything about it anyway - _ = sl.listener.close() - sl.listener = nil + if sl.socket != nil { + sl.socket.Close() } - sl.wg.Wait() } func init() { diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index 7bdc63d7e..ac3ca3815 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "net" "os" "path/filepath" @@ -22,6 +21,7 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/common/socket" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/parsers/all" "github.com/influxdata/telegraf/plugins/parsers/influx" @@ -137,10 +137,12 @@ func TestSocketListener(t *testing.T) { // Setup plugin according to test specification plugin := &SocketListener{ - Log: &testutil.Logger{}, - ServiceAddress: proto + "://" + serverAddr, - ContentEncoding: tt.encoding, - ReadBufferSize: tt.buffersize, + ServiceAddress: proto + "://" + serverAddr, + Config: socket.Config{ + ContentEncoding: tt.encoding, + ReadBufferSize: tt.buffersize, + }, + Log: &testutil.Logger{}, } if strings.HasSuffix(tt.schema, "tls") { plugin.ServerConfig = *serverTLS @@ -158,7 +160,7 @@ func TestSocketListener(t *testing.T) { require.NoError(t, plugin.Start(&acc)) defer plugin.Stop() - addr := plugin.listener.addr() + addr := plugin.socket.Address() // Create a noop client // Server is async, so verify no errors at the end. @@ -193,59 +195,6 @@ func TestSocketListener(t *testing.T) { } } -func TestSocketListenerStream(t *testing.T) { - logger := &testutil.CaptureLogger{} - - plugin := &SocketListener{ - Log: logger, - ServiceAddress: "tcp://127.0.0.1:0", - ReadBufferSize: 1024, - } - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - plugin.SetParser(parser) - - // Start the plugin - var acc testutil.Accumulator - require.NoError(t, plugin.Init()) - require.NoError(t, plugin.Start(&acc)) - defer plugin.Stop() - - addr := plugin.listener.addr() - - // Create a noop client - client, err := createClient(plugin.ServiceAddress, addr, nil) - require.NoError(t, err) - - _, err = client.Write([]byte("test value=42i\n")) - require.NoError(t, err) - - require.Eventually(t, func() bool { - acc.Lock() - defer acc.Unlock() - return acc.NMetrics() >= 1 - }, time.Second, 100*time.Millisecond, "did not receive metric") - - // This has to be a stream-listener... - listener, ok := plugin.listener.(*streamListener) - require.True(t, ok) - listener.Lock() - conns := len(listener.connections) - listener.Unlock() - require.NotZero(t, conns) - - plugin.Stop() - - // Verify that plugin.Stop() closed the client's connection - _ = client.SetReadDeadline(time.Now().Add(time.Second)) - buf := []byte{1} - _, err = client.Read(buf) - require.Equal(t, err, io.EOF) - - require.Empty(t, logger.Errors()) - require.Empty(t, logger.Warnings()) -} - func TestCases(t *testing.T) { // Get all directories in testdata folders, err := os.ReadDir("testcases") @@ -314,7 +263,7 @@ func TestCases(t *testing.T) { defer plugin.Stop() // Create a client without TLS - addr := plugin.listener.addr() + addr := plugin.socket.Address() client, err := createClient(plugin.ServiceAddress, addr, nil) require.NoError(t, err) diff --git a/plugins/inputs/socket_listener/testcases/invalid_line_format/expected.err b/plugins/inputs/socket_listener/testcases/invalid_line_format/expected.err index ded386b17..a7d5357c9 100644 --- a/plugins/inputs/socket_listener/testcases/invalid_line_format/expected.err +++ b/plugins/inputs/socket_listener/testcases/invalid_line_format/expected.err @@ -1 +1 @@ -parsing error: metric parse error \ No newline at end of file +metric parse error \ No newline at end of file