chore(inputs.socket_listener): Move underlying socket implementation to common (#14787)
This commit is contained in:
parent
eb0a6991c4
commit
655a8a786b
|
|
@ -1,4 +1,4 @@
|
||||||
package socket_listener
|
package socket
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
@ -18,7 +18,8 @@ type packetListener struct {
|
||||||
MaxDecompressionSize int64
|
MaxDecompressionSize int64
|
||||||
SocketMode string
|
SocketMode string
|
||||||
ReadBufferSize int
|
ReadBufferSize int
|
||||||
Parser telegraf.Parser
|
OnData CallbackData
|
||||||
|
OnError CallbackError
|
||||||
Log telegraf.Logger
|
Log telegraf.Logger
|
||||||
|
|
||||||
conn net.PacketConn
|
conn net.PacketConn
|
||||||
|
|
@ -26,31 +27,25 @@ type packetListener struct {
|
||||||
path string
|
path string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *packetListener) listen(acc telegraf.Accumulator) {
|
func (l *packetListener) listen() {
|
||||||
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
|
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
|
||||||
for {
|
for {
|
||||||
n, _, err := l.conn.ReadFrom(buf)
|
n, _, err := l.conn.ReadFrom(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
|
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
|
||||||
acc.AddError(err)
|
if l.OnError != nil {
|
||||||
|
l.OnError(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
body, err := l.decoder.Decode(buf[:n])
|
body, err := l.decoder.Decode(buf[:n])
|
||||||
if err != nil {
|
if err != nil && l.OnError != nil {
|
||||||
acc.AddError(fmt.Errorf("unable to decode incoming packet: %w", err))
|
l.OnError(fmt.Errorf("unable to decode incoming packet: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics, err := l.Parser.Parse(body)
|
l.OnData(body)
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("unable to parse incoming packet: %w", err))
|
|
||||||
// TODO rate limit
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, m := range metrics {
|
|
||||||
acc.AddMetric(m)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -164,7 +159,7 @@ func (l *packetListener) setupIP(u *url.URL) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *packetListener) addr() net.Addr {
|
func (l *packetListener) address() net.Addr {
|
||||||
return l.conn.LocalAddr()
|
return l.conn.LocalAddr()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -0,0 +1,75 @@
|
||||||
|
## Permission for unix sockets (only available on unix sockets)
|
||||||
|
## This setting may not be respected by some platforms. To safely restrict
|
||||||
|
## permissions it is recommended to place the socket into a previously
|
||||||
|
## created directory with the desired permissions.
|
||||||
|
## ex: socket_mode = "777"
|
||||||
|
# socket_mode = ""
|
||||||
|
|
||||||
|
## Maximum number of concurrent connections (only available on stream sockets like TCP)
|
||||||
|
## Zero means unlimited.
|
||||||
|
# max_connections = 0
|
||||||
|
|
||||||
|
## Read timeout (only available on stream sockets like TCP)
|
||||||
|
## Zero means unlimited.
|
||||||
|
# read_timeout = "0s"
|
||||||
|
|
||||||
|
## Optional TLS configuration (only available on stream sockets like TCP)
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
## Enables client authentication if set.
|
||||||
|
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
||||||
|
|
||||||
|
## Maximum socket buffer size (in bytes when no unit specified)
|
||||||
|
## For stream sockets, once the buffer fills up, the sender will start
|
||||||
|
## backing up. For datagram sockets, once the buffer fills up, metrics will
|
||||||
|
## start dropping. Defaults to the OS default.
|
||||||
|
# read_buffer_size = "64KiB"
|
||||||
|
|
||||||
|
## Period between keep alive probes (only applies to TCP sockets)
|
||||||
|
## Zero disables keep alive probes. Defaults to the OS configuration.
|
||||||
|
# keep_alive_period = "5m"
|
||||||
|
|
||||||
|
## Content encoding for message payloads
|
||||||
|
## Can be set to "gzip" for compressed payloads or "identity" for no encoding.
|
||||||
|
# content_encoding = "identity"
|
||||||
|
|
||||||
|
## Maximum size of decoded packet (in bytes when no unit specified)
|
||||||
|
# max_decompression_size = "500MB"
|
||||||
|
|
||||||
|
## Message splitting strategy and corresponding settings for stream sockets
|
||||||
|
## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet
|
||||||
|
## listeners such as udp.
|
||||||
|
## Available strategies are:
|
||||||
|
## newline -- split at newlines (default)
|
||||||
|
## null -- split at null bytes
|
||||||
|
## delimiter -- split at delimiter byte-sequence in hex-format
|
||||||
|
## given in `splitting_delimiter`
|
||||||
|
## fixed length -- split after number of bytes given in `splitting_length`
|
||||||
|
## variable length -- split depending on length information received in the
|
||||||
|
## data. The length field information is specified in
|
||||||
|
## `splitting_length_field`.
|
||||||
|
# splitting_strategy = "newline"
|
||||||
|
|
||||||
|
## Delimiter used to split received data to messages consumed by the parser.
|
||||||
|
## The delimiter is a hex byte-sequence marking the end of a message
|
||||||
|
## e.g. "0x0D0A", "x0d0a" or "0d0a" marks a Windows line-break (CR LF).
|
||||||
|
## The value is case-insensitive and can be specified with "0x" or "x" prefix
|
||||||
|
## or without.
|
||||||
|
## Note: This setting is only used for splitting_strategy = "delimiter".
|
||||||
|
# splitting_delimiter = ""
|
||||||
|
|
||||||
|
## Fixed length of a message in bytes.
|
||||||
|
## Note: This setting is only used for splitting_strategy = "fixed length".
|
||||||
|
# splitting_length = 0
|
||||||
|
|
||||||
|
## Specification of the length field contained in the data to split messages
|
||||||
|
## with variable length. The specification contains the following fields:
|
||||||
|
## offset -- start of length field in bytes from begin of data
|
||||||
|
## bytes -- length of length field in bytes
|
||||||
|
## endianness -- endianness of the value, either "be" for big endian or
|
||||||
|
## "le" for little endian
|
||||||
|
## header_length -- total length of header to be skipped when passing
|
||||||
|
## data on to the parser. If zero (default), the header
|
||||||
|
## is passed on to the parser together with the message.
|
||||||
|
## Note: This setting is only used for splitting_strategy = "variable length".
|
||||||
|
# splitting_length_field = {offset = 0, bytes = 0, endianness = "be", header_length = 0}
|
||||||
|
|
@ -0,0 +1,279 @@
|
||||||
|
package socket
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/binary"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
|
)
|
||||||
|
|
||||||
|
type listener interface {
|
||||||
|
address() net.Addr
|
||||||
|
listen()
|
||||||
|
close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type lengthFieldSpec struct {
|
||||||
|
Offset int64 `toml:"offset"`
|
||||||
|
Bytes int64 `toml:"bytes"`
|
||||||
|
Endianness string `toml:"endianness"`
|
||||||
|
HeaderLength int64 `toml:"header_length"`
|
||||||
|
converter func([]byte) int
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackData func([]byte)
|
||||||
|
type CallbackError func(error)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
MaxConnections int `toml:"max_connections"`
|
||||||
|
ReadBufferSize config.Size `toml:"read_buffer_size"`
|
||||||
|
ReadTimeout config.Duration `toml:"read_timeout"`
|
||||||
|
KeepAlivePeriod *config.Duration `toml:"keep_alive_period"`
|
||||||
|
SocketMode string `toml:"socket_mode"`
|
||||||
|
ContentEncoding string `toml:"content_encoding"`
|
||||||
|
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
|
||||||
|
SplittingStrategy string `toml:"splitting_strategy"`
|
||||||
|
SplittingDelimiter string `toml:"splitting_delimiter"`
|
||||||
|
SplittingLength int `toml:"splitting_length"`
|
||||||
|
SplittingLengthField lengthFieldSpec `toml:"splitting_length_field"`
|
||||||
|
tlsint.ServerConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
type Socket struct {
|
||||||
|
Config
|
||||||
|
|
||||||
|
url *url.URL
|
||||||
|
interfaceName string
|
||||||
|
tlsCfg *tls.Config
|
||||||
|
log telegraf.Logger
|
||||||
|
|
||||||
|
splitter bufio.SplitFunc
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
listener listener
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfg *Config) NewSocket(address string, logger telegraf.Logger) (*Socket, error) {
|
||||||
|
s := &Socket{
|
||||||
|
Config: *cfg,
|
||||||
|
log: logger,
|
||||||
|
}
|
||||||
|
|
||||||
|
switch s.SplittingStrategy {
|
||||||
|
case "", "newline":
|
||||||
|
s.splitter = bufio.ScanLines
|
||||||
|
case "null":
|
||||||
|
s.splitter = scanNull
|
||||||
|
case "delimiter":
|
||||||
|
re := regexp.MustCompile(`(\s*0?x)`)
|
||||||
|
d := re.ReplaceAllString(strings.ToLower(s.SplittingDelimiter), "")
|
||||||
|
delimiter, err := hex.DecodeString(d)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decoding delimiter failed: %w", err)
|
||||||
|
}
|
||||||
|
s.splitter = createScanDelimiter(delimiter)
|
||||||
|
case "fixed length":
|
||||||
|
s.splitter = createScanFixedLength(s.SplittingLength)
|
||||||
|
case "variable length":
|
||||||
|
// Create the converter function
|
||||||
|
var order binary.ByteOrder
|
||||||
|
switch strings.ToLower(s.SplittingLengthField.Endianness) {
|
||||||
|
case "", "be":
|
||||||
|
order = binary.BigEndian
|
||||||
|
case "le":
|
||||||
|
order = binary.LittleEndian
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid 'endianness' %q", s.SplittingLengthField.Endianness)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch s.SplittingLengthField.Bytes {
|
||||||
|
case 1:
|
||||||
|
s.SplittingLengthField.converter = func(b []byte) int {
|
||||||
|
return int(b[0])
|
||||||
|
}
|
||||||
|
case 2:
|
||||||
|
s.SplittingLengthField.converter = func(b []byte) int {
|
||||||
|
return int(order.Uint16(b))
|
||||||
|
}
|
||||||
|
case 4:
|
||||||
|
s.SplittingLengthField.converter = func(b []byte) int {
|
||||||
|
return int(order.Uint32(b))
|
||||||
|
}
|
||||||
|
case 8:
|
||||||
|
s.SplittingLengthField.converter = func(b []byte) int {
|
||||||
|
return int(order.Uint64(b))
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
s.SplittingLengthField.converter = func(b []byte) int {
|
||||||
|
buf := make([]byte, 8)
|
||||||
|
start := 0
|
||||||
|
if order == binary.BigEndian {
|
||||||
|
start = 8 - len(b)
|
||||||
|
}
|
||||||
|
for i := 0; i < len(b); i++ {
|
||||||
|
buf[start+i] = b[i]
|
||||||
|
}
|
||||||
|
return int(order.Uint64(buf))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we have enough bytes in the header
|
||||||
|
s.splitter = createScanVariableLength(s.SplittingLengthField)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown 'splitting_strategy' %q", s.SplittingStrategy)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve the interface to an address if any given
|
||||||
|
ifregex := regexp.MustCompile(`%([\w\.]+)`)
|
||||||
|
if matches := ifregex.FindStringSubmatch(address); len(matches) == 2 {
|
||||||
|
s.interfaceName = matches[1]
|
||||||
|
address = strings.Replace(address, "%"+s.interfaceName, "", 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Preparing TLS configuration
|
||||||
|
tlsCfg, err := s.ServerConfig.TLSConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting TLS config failed: %w", err)
|
||||||
|
}
|
||||||
|
s.tlsCfg = tlsCfg
|
||||||
|
|
||||||
|
// Parse and check the address
|
||||||
|
u, err := url.Parse(address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parsing address failed: %w", err)
|
||||||
|
}
|
||||||
|
s.url = u
|
||||||
|
|
||||||
|
switch s.url.Scheme {
|
||||||
|
case "tcp", "tcp4", "tcp6", "unix", "unixpacket",
|
||||||
|
"udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram", "vsock":
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown protocol %q in %q", u.Scheme, address)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Socket) Listen(onData CallbackData, onError CallbackError) error {
|
||||||
|
switch s.url.Scheme {
|
||||||
|
case "tcp", "tcp4", "tcp6":
|
||||||
|
l := &streamListener{
|
||||||
|
ReadBufferSize: int(s.ReadBufferSize),
|
||||||
|
ReadTimeout: s.ReadTimeout,
|
||||||
|
KeepAlivePeriod: s.KeepAlivePeriod,
|
||||||
|
MaxConnections: s.MaxConnections,
|
||||||
|
Encoding: s.ContentEncoding,
|
||||||
|
Splitter: s.splitter,
|
||||||
|
OnData: onData,
|
||||||
|
OnError: onError,
|
||||||
|
Log: s.log,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.setupTCP(s.url, s.tlsCfg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.listener = l
|
||||||
|
case "unix", "unixpacket":
|
||||||
|
l := &streamListener{
|
||||||
|
ReadBufferSize: int(s.ReadBufferSize),
|
||||||
|
ReadTimeout: s.ReadTimeout,
|
||||||
|
KeepAlivePeriod: s.KeepAlivePeriod,
|
||||||
|
MaxConnections: s.MaxConnections,
|
||||||
|
Encoding: s.ContentEncoding,
|
||||||
|
Splitter: s.splitter,
|
||||||
|
OnData: onData,
|
||||||
|
OnError: onError,
|
||||||
|
Log: s.log,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.setupUnix(s.url, s.tlsCfg, s.SocketMode); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.listener = l
|
||||||
|
case "udp", "udp4", "udp6":
|
||||||
|
l := &packetListener{
|
||||||
|
Encoding: s.ContentEncoding,
|
||||||
|
MaxDecompressionSize: int64(s.MaxDecompressionSize),
|
||||||
|
OnData: onData,
|
||||||
|
OnError: onError,
|
||||||
|
}
|
||||||
|
if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.listener = l
|
||||||
|
case "ip", "ip4", "ip6":
|
||||||
|
l := &packetListener{
|
||||||
|
Encoding: s.ContentEncoding,
|
||||||
|
MaxDecompressionSize: int64(s.MaxDecompressionSize),
|
||||||
|
OnData: onData,
|
||||||
|
OnError: onError,
|
||||||
|
}
|
||||||
|
if err := l.setupIP(s.url); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.listener = l
|
||||||
|
case "unixgram":
|
||||||
|
l := &packetListener{
|
||||||
|
Encoding: s.ContentEncoding,
|
||||||
|
MaxDecompressionSize: int64(s.MaxDecompressionSize),
|
||||||
|
OnData: onData,
|
||||||
|
OnError: onError,
|
||||||
|
}
|
||||||
|
if err := l.setupUnixgram(s.url, s.SocketMode); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.listener = l
|
||||||
|
case "vsock":
|
||||||
|
l := &streamListener{
|
||||||
|
ReadBufferSize: int(s.ReadBufferSize),
|
||||||
|
ReadTimeout: s.ReadTimeout,
|
||||||
|
KeepAlivePeriod: s.KeepAlivePeriod,
|
||||||
|
MaxConnections: s.MaxConnections,
|
||||||
|
Encoding: s.ContentEncoding,
|
||||||
|
Splitter: s.splitter,
|
||||||
|
OnData: onData,
|
||||||
|
OnError: onError,
|
||||||
|
Log: s.log,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.setupVsock(s.url); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.listener = l
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown protocol %q", s.url.Scheme)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer s.wg.Done()
|
||||||
|
s.listener.listen()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Socket) Close() {
|
||||||
|
if s.listener != nil {
|
||||||
|
// Ignore the returned error as we cannot do anything about it anyway
|
||||||
|
if err := s.listener.close(); err != nil {
|
||||||
|
s.log.Warnf("Closing socket failed: %v", err)
|
||||||
|
}
|
||||||
|
s.listener = nil
|
||||||
|
}
|
||||||
|
s.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Socket) Address() net.Addr {
|
||||||
|
return s.listener.address()
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,282 @@
|
||||||
|
package socket
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/parsers/all"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
var pki = testutil.NewPKI("../../../testutil/pki")
|
||||||
|
|
||||||
|
func TestSocketListener(t *testing.T) {
|
||||||
|
messages := [][]byte{
|
||||||
|
[]byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n"),
|
||||||
|
[]byte("test,foo=zab v=3i 123456791\n"),
|
||||||
|
}
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"test",
|
||||||
|
map[string]string{"foo": "bar"},
|
||||||
|
map[string]interface{}{"v": int64(1)},
|
||||||
|
time.Unix(0, 123456789),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"test",
|
||||||
|
map[string]string{"foo": "baz"},
|
||||||
|
map[string]interface{}{"v": int64(2)},
|
||||||
|
time.Unix(0, 123456790),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"test",
|
||||||
|
map[string]string{"foo": "zab"},
|
||||||
|
map[string]interface{}{"v": int64(3)},
|
||||||
|
time.Unix(0, 123456791),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
schema string
|
||||||
|
buffersize config.Size
|
||||||
|
encoding string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "TCP",
|
||||||
|
schema: "tcp",
|
||||||
|
buffersize: config.Size(1024),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "TCP with TLS",
|
||||||
|
schema: "tcp+tls",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "TCP with gzip encoding",
|
||||||
|
schema: "tcp",
|
||||||
|
buffersize: config.Size(1024),
|
||||||
|
encoding: "gzip",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "UDP",
|
||||||
|
schema: "udp",
|
||||||
|
buffersize: config.Size(1024),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "UDP with gzip encoding",
|
||||||
|
schema: "udp",
|
||||||
|
buffersize: config.Size(1024),
|
||||||
|
encoding: "gzip",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix socket",
|
||||||
|
schema: "unix",
|
||||||
|
buffersize: config.Size(1024),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix socket with TLS",
|
||||||
|
schema: "unix+tls",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unix socket with gzip encoding",
|
||||||
|
schema: "unix",
|
||||||
|
encoding: "gzip",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unixgram socket",
|
||||||
|
schema: "unixgram",
|
||||||
|
buffersize: config.Size(1024),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
serverTLS := pki.TLSServerConfig()
|
||||||
|
clientTLS := pki.TLSClientConfig()
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
proto := strings.TrimSuffix(tt.schema, "+tls")
|
||||||
|
|
||||||
|
// Prepare the address and socket if needed
|
||||||
|
var serviceAddress string
|
||||||
|
var tlsCfg *tls.Config
|
||||||
|
switch proto {
|
||||||
|
case "tcp", "udp":
|
||||||
|
serviceAddress = proto + "://" + "127.0.0.1:0"
|
||||||
|
case "unix", "unixgram":
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
t.Skip("Skipping on Windows, as unixgram sockets are not supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a socket
|
||||||
|
fn := testutil.TempSocket(t)
|
||||||
|
f, err := os.Create(fn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer f.Close()
|
||||||
|
serviceAddress = proto + "://" + fn
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup the configuration according to test specification
|
||||||
|
cfg := &Config{
|
||||||
|
ContentEncoding: tt.encoding,
|
||||||
|
ReadBufferSize: tt.buffersize,
|
||||||
|
}
|
||||||
|
if strings.HasSuffix(tt.schema, "tls") {
|
||||||
|
cfg.ServerConfig = *serverTLS
|
||||||
|
var err error
|
||||||
|
tlsCfg, err = clientTLS.TLSConfig()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the socket
|
||||||
|
sock, err := cfg.NewSocket(serviceAddress, &testutil.Logger{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create callbacks
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
onData := func(data []byte) {
|
||||||
|
m, err := parser.Parse(data)
|
||||||
|
require.NoError(t, err)
|
||||||
|
acc.AddMetrics(m)
|
||||||
|
}
|
||||||
|
onError := func(err error) {
|
||||||
|
acc.AddError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the listener
|
||||||
|
require.NoError(t, sock.Listen(onData, onError))
|
||||||
|
defer sock.Close()
|
||||||
|
|
||||||
|
addr := sock.Address()
|
||||||
|
|
||||||
|
// Create a noop client
|
||||||
|
// Server is async, so verify no errors at the end.
|
||||||
|
client, err := createClient(serviceAddress, addr, tlsCfg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, client.Close())
|
||||||
|
|
||||||
|
// Setup the client for submitting data
|
||||||
|
client, err = createClient(serviceAddress, addr, tlsCfg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Send the data with the correct encoding
|
||||||
|
encoder, err := internal.NewContentEncoder(tt.encoding)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for i, msg := range messages {
|
||||||
|
m, err := encoder.Encode(msg)
|
||||||
|
require.NoErrorf(t, err, "encoding failed for msg %d", i)
|
||||||
|
_, err = client.Write(m)
|
||||||
|
require.NoErrorf(t, err, "sending msg %d failed", i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test the resulting metrics and compare against expected results
|
||||||
|
require.Eventuallyf(t, func() bool {
|
||||||
|
acc.Lock()
|
||||||
|
defer acc.Unlock()
|
||||||
|
return acc.NMetrics() >= uint64(len(expected))
|
||||||
|
}, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics())
|
||||||
|
actual := acc.GetTelegrafMetrics()
|
||||||
|
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSocketListenerStream(t *testing.T) {
|
||||||
|
// Setup the configuration
|
||||||
|
cfg := &Config{
|
||||||
|
ReadBufferSize: 1024,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the socket
|
||||||
|
serviceAddress := "tcp://127.0.0.1:0"
|
||||||
|
logger := &testutil.CaptureLogger{}
|
||||||
|
sock, err := cfg.NewSocket(serviceAddress, logger)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create callbacks
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
onData := func(data []byte) {
|
||||||
|
m, err := parser.Parse(data)
|
||||||
|
require.NoError(t, err)
|
||||||
|
acc.AddMetrics(m)
|
||||||
|
}
|
||||||
|
onError := func(err error) {
|
||||||
|
acc.AddError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the listener
|
||||||
|
require.NoError(t, sock.Listen(onData, onError))
|
||||||
|
defer sock.Close()
|
||||||
|
|
||||||
|
addr := sock.Address()
|
||||||
|
|
||||||
|
// Create a noop client
|
||||||
|
client, err := createClient(serviceAddress, addr, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = client.Write([]byte("test value=42i\n"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
acc.Lock()
|
||||||
|
defer acc.Unlock()
|
||||||
|
return acc.NMetrics() >= 1
|
||||||
|
}, time.Second, 100*time.Millisecond, "did not receive metric")
|
||||||
|
|
||||||
|
// This has to be a stream-listener...
|
||||||
|
listener, ok := sock.listener.(*streamListener)
|
||||||
|
require.True(t, ok)
|
||||||
|
listener.Lock()
|
||||||
|
conns := len(listener.connections)
|
||||||
|
listener.Unlock()
|
||||||
|
require.NotZero(t, conns)
|
||||||
|
|
||||||
|
sock.Close()
|
||||||
|
|
||||||
|
// Verify that plugin.Stop() closed the client's connection
|
||||||
|
_ = client.SetReadDeadline(time.Now().Add(time.Second))
|
||||||
|
buf := []byte{1}
|
||||||
|
_, err = client.Read(buf)
|
||||||
|
require.Equal(t, err, io.EOF)
|
||||||
|
|
||||||
|
require.Empty(t, logger.Errors())
|
||||||
|
require.Empty(t, logger.Warnings())
|
||||||
|
}
|
||||||
|
|
||||||
|
func createClient(endpoint string, addr net.Addr, tlsCfg *tls.Config) (net.Conn, error) {
|
||||||
|
// Determine the protocol in a crude fashion
|
||||||
|
parts := strings.SplitN(endpoint, "://", 2)
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return nil, fmt.Errorf("invalid endpoint %q", endpoint)
|
||||||
|
}
|
||||||
|
protocol := parts[0]
|
||||||
|
|
||||||
|
if tlsCfg == nil {
|
||||||
|
return net.Dial(protocol, addr.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
if protocol == "unix" {
|
||||||
|
tlsCfg.InsecureSkipVerify = true
|
||||||
|
}
|
||||||
|
return tls.Dial(protocol, addr.String(), tlsCfg)
|
||||||
|
}
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package socket_listener
|
package socket
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package socket_listener
|
package socket
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
|
@ -34,11 +34,12 @@ type streamListener struct {
|
||||||
ReadTimeout config.Duration
|
ReadTimeout config.Duration
|
||||||
KeepAlivePeriod *config.Duration
|
KeepAlivePeriod *config.Duration
|
||||||
Splitter bufio.SplitFunc
|
Splitter bufio.SplitFunc
|
||||||
Parser telegraf.Parser
|
OnData CallbackData
|
||||||
|
OnError CallbackError
|
||||||
Log telegraf.Logger
|
Log telegraf.Logger
|
||||||
|
|
||||||
listener net.Listener
|
listener net.Listener
|
||||||
connections map[net.Conn]struct{}
|
connections map[net.Conn]bool
|
||||||
path string
|
path string
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
@ -124,7 +125,7 @@ func (l *streamListener) setupConnection(conn net.Conn) error {
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
return fmt.Errorf("unable to accept connection from %q: too many connections", addr)
|
return fmt.Errorf("unable to accept connection from %q: too many connections", addr)
|
||||||
}
|
}
|
||||||
l.connections[conn] = struct{}{}
|
l.connections[conn] = true
|
||||||
l.Unlock()
|
l.Unlock()
|
||||||
|
|
||||||
if l.ReadBufferSize > 0 {
|
if l.ReadBufferSize > 0 {
|
||||||
|
|
@ -169,7 +170,7 @@ func (l *streamListener) closeConnection(conn net.Conn) {
|
||||||
delete(l.connections, conn)
|
delete(l.connections, conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *streamListener) addr() net.Addr {
|
func (l *streamListener) address() net.Addr {
|
||||||
return l.listener.Addr()
|
return l.listener.Addr()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -195,8 +196,8 @@ func (l *streamListener) close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *streamListener) listen(acc telegraf.Accumulator) {
|
func (l *streamListener) listen() {
|
||||||
l.connections = make(map[net.Conn]struct{})
|
l.connections = make(map[net.Conn]bool)
|
||||||
|
|
||||||
l.wg.Add(1)
|
l.wg.Add(1)
|
||||||
defer l.wg.Done()
|
defer l.wg.Done()
|
||||||
|
|
@ -205,23 +206,25 @@ func (l *streamListener) listen(acc telegraf.Accumulator) {
|
||||||
for {
|
for {
|
||||||
conn, err := l.listener.Accept()
|
conn, err := l.listener.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, net.ErrClosed) {
|
if !errors.Is(err, net.ErrClosed) && l.OnError != nil {
|
||||||
acc.AddError(err)
|
l.OnError(err)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := l.setupConnection(conn); err != nil {
|
if err := l.setupConnection(conn); err != nil && l.OnError != nil {
|
||||||
acc.AddError(err)
|
l.OnError(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(c net.Conn) {
|
go func(c net.Conn) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if err := l.read(acc, c); err != nil {
|
if err := l.read(c); err != nil {
|
||||||
if !errors.Is(err, io.EOF) && !errors.Is(err, syscall.ECONNRESET) {
|
if !errors.Is(err, io.EOF) && !errors.Is(err, syscall.ECONNRESET) {
|
||||||
acc.AddError(err)
|
if l.OnError != nil {
|
||||||
|
l.OnError(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
l.Lock()
|
l.Lock()
|
||||||
|
|
@ -232,7 +235,7 @@ func (l *streamListener) listen(acc telegraf.Accumulator) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *streamListener) read(acc telegraf.Accumulator, conn net.Conn) error {
|
func (l *streamListener) read(conn net.Conn) error {
|
||||||
decoder, err := internal.NewStreamContentDecoder(l.Encoding, conn)
|
decoder, err := internal.NewStreamContentDecoder(l.Encoding, conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("creating decoder failed: %w", err)
|
return fmt.Errorf("creating decoder failed: %w", err)
|
||||||
|
|
@ -259,15 +262,7 @@ func (l *streamListener) read(acc telegraf.Accumulator, conn net.Conn) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
data := scanner.Bytes()
|
data := scanner.Bytes()
|
||||||
metrics, err := l.Parser.Parse(data)
|
l.OnData(data)
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("parsing error: %w", err))
|
|
||||||
l.Log.Debugf("invalid data for parser: %v", data)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, m := range metrics {
|
|
||||||
acc.AddMetric(m)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := scanner.Err(); err != nil {
|
if err := scanner.Err(); err != nil {
|
||||||
|
|
@ -44,55 +44,42 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
# service_address = "unixgram:///tmp/telegraf.sock"
|
# service_address = "unixgram:///tmp/telegraf.sock"
|
||||||
# service_address = "vsock://cid:port"
|
# service_address = "vsock://cid:port"
|
||||||
|
|
||||||
## Change the file mode bits on unix sockets. These permissions may not be
|
## Permission for unix sockets (only available on unix sockets)
|
||||||
## respected by some platforms, to safely restrict write permissions it is best
|
## This setting may not be respected by some platforms. To safely restrict
|
||||||
## to place the socket into a directory that has previously been created
|
## permissions it is recommended to place the socket into a previously
|
||||||
## with the desired permissions.
|
## created directory with the desired permissions.
|
||||||
## ex: socket_mode = "777"
|
## ex: socket_mode = "777"
|
||||||
# socket_mode = ""
|
# socket_mode = ""
|
||||||
|
|
||||||
## Maximum number of concurrent connections.
|
## Maximum number of concurrent connections (only available on stream sockets like TCP)
|
||||||
## Only applies to stream sockets (e.g. TCP).
|
## Zero means unlimited.
|
||||||
## 0 (default) is unlimited.
|
# max_connections = 0
|
||||||
# max_connections = 1024
|
|
||||||
|
|
||||||
## Read timeout.
|
## Read timeout (only available on stream sockets like TCP)
|
||||||
## Only applies to stream sockets (e.g. TCP).
|
## Zero means unlimited.
|
||||||
## 0 (default) is unlimited.
|
# read_timeout = "0s"
|
||||||
# read_timeout = "30s"
|
|
||||||
|
|
||||||
## Optional TLS configuration.
|
## Optional TLS configuration (only available on stream sockets like TCP)
|
||||||
## Only applies to stream sockets (e.g. TCP).
|
|
||||||
# tls_cert = "/etc/telegraf/cert.pem"
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
# tls_key = "/etc/telegraf/key.pem"
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
## Enables client authentication if set.
|
## Enables client authentication if set.
|
||||||
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
||||||
|
|
||||||
## Maximum socket buffer size (in bytes when no unit specified).
|
## Maximum socket buffer size (in bytes when no unit specified)
|
||||||
## For stream sockets, once the buffer fills up, the sender will start backing up.
|
## For stream sockets, once the buffer fills up, the sender will start
|
||||||
## For datagram sockets, once the buffer fills up, metrics will start dropping.
|
## backing up. For datagram sockets, once the buffer fills up, metrics will
|
||||||
## Defaults to the OS default.
|
## start dropping. Defaults to the OS default.
|
||||||
# read_buffer_size = "64KiB"
|
# read_buffer_size = "64KiB"
|
||||||
|
|
||||||
## Period between keep alive probes.
|
## Period between keep alive probes (only applies to TCP sockets)
|
||||||
## Only applies to TCP sockets.
|
## Zero disables keep alive probes. Defaults to the OS configuration.
|
||||||
## 0 disables keep alive probes.
|
|
||||||
## Defaults to the OS configuration.
|
|
||||||
# keep_alive_period = "5m"
|
# keep_alive_period = "5m"
|
||||||
|
|
||||||
## Data format to consume.
|
## Content encoding for message payloads
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Can be set to "gzip" for compressed payloads or "identity" for no encoding.
|
||||||
## more about them here:
|
|
||||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
|
||||||
# data_format = "influx"
|
|
||||||
|
|
||||||
## Content encoding for message payloads, can be set to "gzip" to or
|
|
||||||
## "identity" to apply no encoding.
|
|
||||||
# content_encoding = "identity"
|
# content_encoding = "identity"
|
||||||
|
|
||||||
## Maximum size of decoded packet.
|
## Maximum size of decoded packet (in bytes when no unit specified)
|
||||||
## Acceptable units are B, KiB, KB, MiB, MB...
|
|
||||||
## Without quotes and units, interpreted as size in bytes.
|
|
||||||
# max_decompression_size = "500MB"
|
# max_decompression_size = "500MB"
|
||||||
|
|
||||||
## Message splitting strategy and corresponding settings for stream sockets
|
## Message splitting strategy and corresponding settings for stream sockets
|
||||||
|
|
@ -132,6 +119,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## is passed on to the parser together with the message.
|
## is passed on to the parser together with the message.
|
||||||
## Note: This setting is only used for splitting_strategy = "variable length".
|
## Note: This setting is only used for splitting_strategy = "variable length".
|
||||||
# splitting_length_field = {offset = 0, bytes = 0, endianness = "be", header_length = 0}
|
# splitting_length_field = {offset = 0, bytes = 0, endianness = "be", header_length = 0}
|
||||||
|
|
||||||
|
## Data format to consume.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
|
# data_format = "influx"
|
||||||
```
|
```
|
||||||
|
|
||||||
## A Note on UDP OS Buffer Sizes
|
## A Note on UDP OS Buffer Sizes
|
||||||
|
|
|
||||||
|
|
@ -13,55 +13,42 @@
|
||||||
# service_address = "unixgram:///tmp/telegraf.sock"
|
# service_address = "unixgram:///tmp/telegraf.sock"
|
||||||
# service_address = "vsock://cid:port"
|
# service_address = "vsock://cid:port"
|
||||||
|
|
||||||
## Change the file mode bits on unix sockets. These permissions may not be
|
## Permission for unix sockets (only available on unix sockets)
|
||||||
## respected by some platforms, to safely restrict write permissions it is best
|
## This setting may not be respected by some platforms. To safely restrict
|
||||||
## to place the socket into a directory that has previously been created
|
## permissions it is recommended to place the socket into a previously
|
||||||
## with the desired permissions.
|
## created directory with the desired permissions.
|
||||||
## ex: socket_mode = "777"
|
## ex: socket_mode = "777"
|
||||||
# socket_mode = ""
|
# socket_mode = ""
|
||||||
|
|
||||||
## Maximum number of concurrent connections.
|
## Maximum number of concurrent connections (only available on stream sockets like TCP)
|
||||||
## Only applies to stream sockets (e.g. TCP).
|
## Zero means unlimited.
|
||||||
## 0 (default) is unlimited.
|
# max_connections = 0
|
||||||
# max_connections = 1024
|
|
||||||
|
|
||||||
## Read timeout.
|
## Read timeout (only available on stream sockets like TCP)
|
||||||
## Only applies to stream sockets (e.g. TCP).
|
## Zero means unlimited.
|
||||||
## 0 (default) is unlimited.
|
# read_timeout = "0s"
|
||||||
# read_timeout = "30s"
|
|
||||||
|
|
||||||
## Optional TLS configuration.
|
## Optional TLS configuration (only available on stream sockets like TCP)
|
||||||
## Only applies to stream sockets (e.g. TCP).
|
|
||||||
# tls_cert = "/etc/telegraf/cert.pem"
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
# tls_key = "/etc/telegraf/key.pem"
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
## Enables client authentication if set.
|
## Enables client authentication if set.
|
||||||
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
||||||
|
|
||||||
## Maximum socket buffer size (in bytes when no unit specified).
|
## Maximum socket buffer size (in bytes when no unit specified)
|
||||||
## For stream sockets, once the buffer fills up, the sender will start backing up.
|
## For stream sockets, once the buffer fills up, the sender will start
|
||||||
## For datagram sockets, once the buffer fills up, metrics will start dropping.
|
## backing up. For datagram sockets, once the buffer fills up, metrics will
|
||||||
## Defaults to the OS default.
|
## start dropping. Defaults to the OS default.
|
||||||
# read_buffer_size = "64KiB"
|
# read_buffer_size = "64KiB"
|
||||||
|
|
||||||
## Period between keep alive probes.
|
## Period between keep alive probes (only applies to TCP sockets)
|
||||||
## Only applies to TCP sockets.
|
## Zero disables keep alive probes. Defaults to the OS configuration.
|
||||||
## 0 disables keep alive probes.
|
|
||||||
## Defaults to the OS configuration.
|
|
||||||
# keep_alive_period = "5m"
|
# keep_alive_period = "5m"
|
||||||
|
|
||||||
## Data format to consume.
|
## Content encoding for message payloads
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Can be set to "gzip" for compressed payloads or "identity" for no encoding.
|
||||||
## more about them here:
|
|
||||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
|
||||||
# data_format = "influx"
|
|
||||||
|
|
||||||
## Content encoding for message payloads, can be set to "gzip" to or
|
|
||||||
## "identity" to apply no encoding.
|
|
||||||
# content_encoding = "identity"
|
# content_encoding = "identity"
|
||||||
|
|
||||||
## Maximum size of decoded packet.
|
## Maximum size of decoded packet (in bytes when no unit specified)
|
||||||
## Acceptable units are B, KiB, KB, MiB, MB...
|
|
||||||
## Without quotes and units, interpreted as size in bytes.
|
|
||||||
# max_decompression_size = "500MB"
|
# max_decompression_size = "500MB"
|
||||||
|
|
||||||
## Message splitting strategy and corresponding settings for stream sockets
|
## Message splitting strategy and corresponding settings for stream sockets
|
||||||
|
|
@ -101,3 +88,9 @@
|
||||||
## is passed on to the parser together with the message.
|
## is passed on to the parser together with the message.
|
||||||
## Note: This setting is only used for splitting_strategy = "variable length".
|
## Note: This setting is only used for splitting_strategy = "variable length".
|
||||||
# splitting_length_field = {offset = 0, bytes = 0, endianness = "be", header_length = 0}
|
# splitting_length_field = {offset = 0, bytes = 0, endianness = "be", header_length = 0}
|
||||||
|
|
||||||
|
## Data format to consume.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
|
# data_format = "influx"
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
# Generic socket listener capable of handling multiple socket types.
|
||||||
|
[[inputs.socket_listener]]
|
||||||
|
## URL to listen on
|
||||||
|
# service_address = "tcp://:8094"
|
||||||
|
# service_address = "tcp://127.0.0.1:http"
|
||||||
|
# service_address = "tcp4://:8094"
|
||||||
|
# service_address = "tcp6://:8094"
|
||||||
|
# service_address = "tcp6://[2001:db8::1]:8094"
|
||||||
|
# service_address = "udp://:8094"
|
||||||
|
# service_address = "udp4://:8094"
|
||||||
|
# service_address = "udp6://:8094"
|
||||||
|
# service_address = "unix:///tmp/telegraf.sock"
|
||||||
|
# service_address = "unixgram:///tmp/telegraf.sock"
|
||||||
|
# service_address = "vsock://cid:port"
|
||||||
|
|
||||||
|
{{template "/plugins/common/socket/sample.conf"}}
|
||||||
|
|
||||||
|
## Data format to consume.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
|
# data_format = "influx"
|
||||||
|
|
@ -1,62 +1,25 @@
|
||||||
|
//go:generate ../../../tools/config_includer/generator
|
||||||
//go:generate ../../../tools/readme_config_includer/generator
|
//go:generate ../../../tools/readme_config_includer/generator
|
||||||
package socket_listener
|
package socket_listener
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
_ "embed"
|
_ "embed"
|
||||||
"encoding/binary"
|
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"net/url"
|
|
||||||
"regexp"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/plugins/common/socket"
|
||||||
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:embed sample.conf
|
//go:embed sample.conf
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
type listener interface {
|
|
||||||
listen(acc telegraf.Accumulator)
|
|
||||||
addr() net.Addr
|
|
||||||
close() error
|
|
||||||
}
|
|
||||||
|
|
||||||
type lengthFieldSpec struct {
|
|
||||||
Offset int64 `toml:"offset"`
|
|
||||||
Bytes int64 `toml:"bytes"`
|
|
||||||
Endianness string `toml:"endianness"`
|
|
||||||
HeaderLength int64 `toml:"header_length"`
|
|
||||||
converter func([]byte) int
|
|
||||||
}
|
|
||||||
|
|
||||||
type SocketListener struct {
|
type SocketListener struct {
|
||||||
ServiceAddress string `toml:"service_address"`
|
ServiceAddress string `toml:"service_address"`
|
||||||
MaxConnections int `toml:"max_connections"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
ReadBufferSize config.Size `toml:"read_buffer_size"`
|
socket.Config
|
||||||
ReadTimeout config.Duration `toml:"read_timeout"`
|
|
||||||
KeepAlivePeriod *config.Duration `toml:"keep_alive_period"`
|
|
||||||
SocketMode string `toml:"socket_mode"`
|
|
||||||
ContentEncoding string `toml:"content_encoding"`
|
|
||||||
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
|
|
||||||
SplittingStrategy string `toml:"splitting_strategy"`
|
|
||||||
SplittingDelimiter string `toml:"splitting_delimiter"`
|
|
||||||
SplittingLength int `toml:"splitting_length"`
|
|
||||||
SplittingLengthField lengthFieldSpec `toml:"splitting_length_field"`
|
|
||||||
Log telegraf.Logger `toml:"-"`
|
|
||||||
tlsint.ServerConfig
|
|
||||||
|
|
||||||
wg sync.WaitGroup
|
socket *socket.Socket
|
||||||
parser telegraf.Parser
|
parser telegraf.Parser
|
||||||
splitter bufio.SplitFunc
|
|
||||||
|
|
||||||
listener listener
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*SocketListener) SampleConfig() string {
|
func (*SocketListener) SampleConfig() string {
|
||||||
|
|
@ -64,69 +27,12 @@ func (*SocketListener) SampleConfig() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *SocketListener) Init() error {
|
func (sl *SocketListener) Init() error {
|
||||||
switch sl.SplittingStrategy {
|
sock, err := sl.Config.NewSocket(sl.ServiceAddress, sl.Log)
|
||||||
case "", "newline":
|
if err != nil {
|
||||||
sl.splitter = bufio.ScanLines
|
return err
|
||||||
case "null":
|
|
||||||
sl.splitter = scanNull
|
|
||||||
case "delimiter":
|
|
||||||
re := regexp.MustCompile(`(\s*0?x)`)
|
|
||||||
d := re.ReplaceAllString(strings.ToLower(sl.SplittingDelimiter), "")
|
|
||||||
delimiter, err := hex.DecodeString(d)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("decoding delimiter failed: %w", err)
|
|
||||||
}
|
|
||||||
sl.splitter = createScanDelimiter(delimiter)
|
|
||||||
case "fixed length":
|
|
||||||
sl.splitter = createScanFixedLength(sl.SplittingLength)
|
|
||||||
case "variable length":
|
|
||||||
// Create the converter function
|
|
||||||
var order binary.ByteOrder
|
|
||||||
switch strings.ToLower(sl.SplittingLengthField.Endianness) {
|
|
||||||
case "", "be":
|
|
||||||
order = binary.BigEndian
|
|
||||||
case "le":
|
|
||||||
order = binary.LittleEndian
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("invalid 'endianness' %q", sl.SplittingLengthField.Endianness)
|
|
||||||
}
|
|
||||||
|
|
||||||
switch sl.SplittingLengthField.Bytes {
|
|
||||||
case 1:
|
|
||||||
sl.SplittingLengthField.converter = func(b []byte) int {
|
|
||||||
return int(b[0])
|
|
||||||
}
|
|
||||||
case 2:
|
|
||||||
sl.SplittingLengthField.converter = func(b []byte) int {
|
|
||||||
return int(order.Uint16(b))
|
|
||||||
}
|
|
||||||
case 4:
|
|
||||||
sl.SplittingLengthField.converter = func(b []byte) int {
|
|
||||||
return int(order.Uint32(b))
|
|
||||||
}
|
|
||||||
case 8:
|
|
||||||
sl.SplittingLengthField.converter = func(b []byte) int {
|
|
||||||
return int(order.Uint64(b))
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
sl.SplittingLengthField.converter = func(b []byte) int {
|
|
||||||
buf := make([]byte, 8)
|
|
||||||
start := 0
|
|
||||||
if order == binary.BigEndian {
|
|
||||||
start = 8 - len(b)
|
|
||||||
}
|
|
||||||
for i := 0; i < len(b); i++ {
|
|
||||||
buf[start+i] = b[i]
|
|
||||||
}
|
|
||||||
return int(order.Uint64(buf))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we have enough bytes in the header
|
|
||||||
sl.splitter = createScanVariableLength(sl.SplittingLengthField)
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("unknown 'splitting_strategy' %q", sl.SplittingStrategy)
|
|
||||||
}
|
}
|
||||||
|
sl.socket = sock
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -139,128 +45,35 @@ func (sl *SocketListener) SetParser(parser telegraf.Parser) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
||||||
// Resolve the interface to an address if any given
|
// Create the callbacks for parsing the data and recording issues
|
||||||
var ifname string
|
onData := func(data []byte) {
|
||||||
ifregex := regexp.MustCompile(`%([\w\.]+)`)
|
metrics, err := sl.parser.Parse(data)
|
||||||
if matches := ifregex.FindStringSubmatch(sl.ServiceAddress); len(matches) == 2 {
|
if err != nil {
|
||||||
ifname := matches[1]
|
acc.AddError(err)
|
||||||
sl.ServiceAddress = strings.Replace(sl.ServiceAddress, "%"+ifname, "", 1)
|
return
|
||||||
|
}
|
||||||
|
for _, m := range metrics {
|
||||||
|
acc.AddMetric(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
onError := func(err error) {
|
||||||
|
acc.AddError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Preparing TLS configuration
|
// Start the listener
|
||||||
tlsCfg, err := sl.ServerConfig.TLSConfig()
|
if err := sl.socket.Listen(onData, onError); err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return fmt.Errorf("getting TLS config failed: %w", err)
|
|
||||||
}
|
}
|
||||||
|
addr := sl.socket.Address()
|
||||||
// Setup the network connection
|
sl.Log.Infof("Listening on %s://%s", addr.Network(), addr.String())
|
||||||
u, err := url.Parse(sl.ServiceAddress)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("parsing address failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
switch u.Scheme {
|
|
||||||
case "tcp", "tcp4", "tcp6":
|
|
||||||
ssl := &streamListener{
|
|
||||||
ReadBufferSize: int(sl.ReadBufferSize),
|
|
||||||
ReadTimeout: sl.ReadTimeout,
|
|
||||||
KeepAlivePeriod: sl.KeepAlivePeriod,
|
|
||||||
MaxConnections: sl.MaxConnections,
|
|
||||||
Encoding: sl.ContentEncoding,
|
|
||||||
Splitter: sl.splitter,
|
|
||||||
Parser: sl.parser,
|
|
||||||
Log: sl.Log,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ssl.setupTCP(u, tlsCfg); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sl.listener = ssl
|
|
||||||
case "unix", "unixpacket":
|
|
||||||
ssl := &streamListener{
|
|
||||||
ReadBufferSize: int(sl.ReadBufferSize),
|
|
||||||
ReadTimeout: sl.ReadTimeout,
|
|
||||||
KeepAlivePeriod: sl.KeepAlivePeriod,
|
|
||||||
MaxConnections: sl.MaxConnections,
|
|
||||||
Encoding: sl.ContentEncoding,
|
|
||||||
Splitter: sl.splitter,
|
|
||||||
Parser: sl.parser,
|
|
||||||
Log: sl.Log,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ssl.setupUnix(u, tlsCfg, sl.SocketMode); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sl.listener = ssl
|
|
||||||
|
|
||||||
case "udp", "udp4", "udp6":
|
|
||||||
psl := &packetListener{
|
|
||||||
Encoding: sl.ContentEncoding,
|
|
||||||
MaxDecompressionSize: int64(sl.MaxDecompressionSize),
|
|
||||||
Parser: sl.parser,
|
|
||||||
}
|
|
||||||
if err := psl.setupUDP(u, ifname, int(sl.ReadBufferSize)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sl.listener = psl
|
|
||||||
case "ip", "ip4", "ip6":
|
|
||||||
psl := &packetListener{
|
|
||||||
Encoding: sl.ContentEncoding,
|
|
||||||
MaxDecompressionSize: int64(sl.MaxDecompressionSize),
|
|
||||||
Parser: sl.parser,
|
|
||||||
}
|
|
||||||
if err := psl.setupIP(u); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sl.listener = psl
|
|
||||||
case "unixgram":
|
|
||||||
psl := &packetListener{
|
|
||||||
Encoding: sl.ContentEncoding,
|
|
||||||
MaxDecompressionSize: int64(sl.MaxDecompressionSize),
|
|
||||||
Parser: sl.parser,
|
|
||||||
}
|
|
||||||
if err := psl.setupUnixgram(u, sl.SocketMode); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sl.listener = psl
|
|
||||||
case "vsock":
|
|
||||||
ssl := &streamListener{
|
|
||||||
ReadBufferSize: int(sl.ReadBufferSize),
|
|
||||||
ReadTimeout: sl.ReadTimeout,
|
|
||||||
KeepAlivePeriod: sl.KeepAlivePeriod,
|
|
||||||
MaxConnections: sl.MaxConnections,
|
|
||||||
Encoding: sl.ContentEncoding,
|
|
||||||
Splitter: sl.splitter,
|
|
||||||
Parser: sl.parser,
|
|
||||||
Log: sl.Log,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ssl.setupVsock(u); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sl.listener = ssl
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("unknown protocol %q in %q", u.Scheme, sl.ServiceAddress)
|
|
||||||
}
|
|
||||||
|
|
||||||
sl.Log.Infof("Listening on %s://%s", u.Scheme, sl.listener.addr())
|
|
||||||
|
|
||||||
sl.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer sl.wg.Done()
|
|
||||||
sl.listener.listen(acc)
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *SocketListener) Stop() {
|
func (sl *SocketListener) Stop() {
|
||||||
if sl.listener != nil {
|
if sl.socket != nil {
|
||||||
// Ignore the returned error as we cannot do anything about it anyway
|
sl.socket.Close()
|
||||||
_ = sl.listener.close()
|
|
||||||
sl.listener = nil
|
|
||||||
}
|
}
|
||||||
sl.wg.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
@ -22,6 +21,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/plugins/common/socket"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
_ "github.com/influxdata/telegraf/plugins/parsers/all"
|
_ "github.com/influxdata/telegraf/plugins/parsers/all"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
|
|
@ -137,10 +137,12 @@ func TestSocketListener(t *testing.T) {
|
||||||
|
|
||||||
// Setup plugin according to test specification
|
// Setup plugin according to test specification
|
||||||
plugin := &SocketListener{
|
plugin := &SocketListener{
|
||||||
Log: &testutil.Logger{},
|
ServiceAddress: proto + "://" + serverAddr,
|
||||||
ServiceAddress: proto + "://" + serverAddr,
|
Config: socket.Config{
|
||||||
ContentEncoding: tt.encoding,
|
ContentEncoding: tt.encoding,
|
||||||
ReadBufferSize: tt.buffersize,
|
ReadBufferSize: tt.buffersize,
|
||||||
|
},
|
||||||
|
Log: &testutil.Logger{},
|
||||||
}
|
}
|
||||||
if strings.HasSuffix(tt.schema, "tls") {
|
if strings.HasSuffix(tt.schema, "tls") {
|
||||||
plugin.ServerConfig = *serverTLS
|
plugin.ServerConfig = *serverTLS
|
||||||
|
|
@ -158,7 +160,7 @@ func TestSocketListener(t *testing.T) {
|
||||||
require.NoError(t, plugin.Start(&acc))
|
require.NoError(t, plugin.Start(&acc))
|
||||||
defer plugin.Stop()
|
defer plugin.Stop()
|
||||||
|
|
||||||
addr := plugin.listener.addr()
|
addr := plugin.socket.Address()
|
||||||
|
|
||||||
// Create a noop client
|
// Create a noop client
|
||||||
// Server is async, so verify no errors at the end.
|
// Server is async, so verify no errors at the end.
|
||||||
|
|
@ -193,59 +195,6 @@ func TestSocketListener(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSocketListenerStream(t *testing.T) {
|
|
||||||
logger := &testutil.CaptureLogger{}
|
|
||||||
|
|
||||||
plugin := &SocketListener{
|
|
||||||
Log: logger,
|
|
||||||
ServiceAddress: "tcp://127.0.0.1:0",
|
|
||||||
ReadBufferSize: 1024,
|
|
||||||
}
|
|
||||||
parser := &influx.Parser{}
|
|
||||||
require.NoError(t, parser.Init())
|
|
||||||
plugin.SetParser(parser)
|
|
||||||
|
|
||||||
// Start the plugin
|
|
||||||
var acc testutil.Accumulator
|
|
||||||
require.NoError(t, plugin.Init())
|
|
||||||
require.NoError(t, plugin.Start(&acc))
|
|
||||||
defer plugin.Stop()
|
|
||||||
|
|
||||||
addr := plugin.listener.addr()
|
|
||||||
|
|
||||||
// Create a noop client
|
|
||||||
client, err := createClient(plugin.ServiceAddress, addr, nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = client.Write([]byte("test value=42i\n"))
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
|
||||||
acc.Lock()
|
|
||||||
defer acc.Unlock()
|
|
||||||
return acc.NMetrics() >= 1
|
|
||||||
}, time.Second, 100*time.Millisecond, "did not receive metric")
|
|
||||||
|
|
||||||
// This has to be a stream-listener...
|
|
||||||
listener, ok := plugin.listener.(*streamListener)
|
|
||||||
require.True(t, ok)
|
|
||||||
listener.Lock()
|
|
||||||
conns := len(listener.connections)
|
|
||||||
listener.Unlock()
|
|
||||||
require.NotZero(t, conns)
|
|
||||||
|
|
||||||
plugin.Stop()
|
|
||||||
|
|
||||||
// Verify that plugin.Stop() closed the client's connection
|
|
||||||
_ = client.SetReadDeadline(time.Now().Add(time.Second))
|
|
||||||
buf := []byte{1}
|
|
||||||
_, err = client.Read(buf)
|
|
||||||
require.Equal(t, err, io.EOF)
|
|
||||||
|
|
||||||
require.Empty(t, logger.Errors())
|
|
||||||
require.Empty(t, logger.Warnings())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCases(t *testing.T) {
|
func TestCases(t *testing.T) {
|
||||||
// Get all directories in testdata
|
// Get all directories in testdata
|
||||||
folders, err := os.ReadDir("testcases")
|
folders, err := os.ReadDir("testcases")
|
||||||
|
|
@ -314,7 +263,7 @@ func TestCases(t *testing.T) {
|
||||||
defer plugin.Stop()
|
defer plugin.Stop()
|
||||||
|
|
||||||
// Create a client without TLS
|
// Create a client without TLS
|
||||||
addr := plugin.listener.addr()
|
addr := plugin.socket.Address()
|
||||||
client, err := createClient(plugin.ServiceAddress, addr, nil)
|
client, err := createClient(plugin.ServiceAddress, addr, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1 +1 @@
|
||||||
parsing error: metric parse error
|
metric parse error
|
||||||
Loading…
Reference in New Issue