feat(inputs.socket_ listener): Allow to specify message separator for streams (#12187)

This commit is contained in:
Sven Rebhan 2022-11-17 15:02:32 +01:00 committed by GitHub
parent 7ef5993d35
commit 3160d52187
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 543 additions and 11 deletions

View File

@ -77,6 +77,44 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
# content_encoding = "identity"
## Message splitting strategy and corresponding settings for stream sockets
## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet
## listeners such as udp.
## Available strategies are:
## newline -- split at newlines (default)
## null -- split at null bytes
## delimiter -- split at delimiter byte-sequence in hex-format
## given in `splitting_delimiter`
## fixed length -- split after number of bytes given in `splitting_length`
## variable length -- split depending on length information received in the
## data. The length field information is specified in
## `splitting_length_field`.
# splitting_strategy = "newline"
## Delimiter used to split received data to messages consumed by the parser.
## The delimiter is a hex byte-sequence marking the end of a message
## e.g. "0x0D0A", "x0d0a" or "0d0a" marks a Windows line-break (CR LF).
## The value is case-insensitive and can be specifed with "0x" or "x" prefix
## or withou.
## Note: This setting is only used for splitting_strategy = "delimiter".
# splitting_delimiter = ""
## Fixed length of a message in bytes.
## Note: This setting is only used for splitting_strategy = "fixed length".
# splitting_length = 0
## Specification of the length field contained in the data to split messages
## with variable length. The specification contains the following fields:
## offset -- start of length field in bytes from begin of data
## bytes -- length of length field in bytes
## endianness -- endianness of the value, either "be" for big endian or
## "le" for little endian
## header_length -- total length of header to be skipped when passing
## data on to the parser. If zero (default), the header
## is passed on to the parser together with the message.
## Note: This setting is only used for splitting_strategy = "variable length".
# splitting_length_field = {offset = 0, bytes = 0, endianness = "be", header_length = 0}
```
## A Note on UDP OS Buffer Sizes
@ -148,3 +186,12 @@ sysctl -w kern.ipc.maxsockbuf=9646900
```
[1]: https://github.com/freebsd/freebsd/blob/master/sys/kern/uipc_sockbuf.c#L63-L64
## Metrics
The plugin accepts arbitrary input and parses it according to the `data_format`
setting. There is no predefined metric format.
## Example Output
There is no predefined metric format, so output depends on plugin input.

View File

@ -57,3 +57,41 @@
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
# content_encoding = "identity"
## Message splitting strategy and corresponding settings for stream sockets
## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet
## listeners such as udp.
## Available strategies are:
## newline -- split at newlines (default)
## null -- split at null bytes
## delimiter -- split at delimiter byte-sequence in hex-format
## given in `splitting_delimiter`
## fixed length -- split after number of bytes given in `splitting_length`
## variable length -- split depending on length information received in the
## data. The length field information is specified in
## `splitting_length_field`.
# splitting_strategy = "newline"
## Delimiter used to split received data to messages consumed by the parser.
## The delimiter is a hex byte-sequence marking the end of a message
## e.g. "0x0D0A", "x0d0a" or "0d0a" marks a Windows line-break (CR LF).
## The value is case-insensitive and can be specifed with "0x" or "x" prefix
## or withou.
## Note: This setting is only used for splitting_strategy = "delimiter".
# splitting_delimiter = ""
## Fixed length of a message in bytes.
## Note: This setting is only used for splitting_strategy = "fixed length".
# splitting_length = 0
## Specification of the length field contained in the data to split messages
## with variable length. The specification contains the following fields:
## offset -- start of length field in bytes from begin of data
## bytes -- length of length field in bytes
## endianness -- endianness of the value, either "be" for big endian or
## "le" for little endian
## header_length -- total length of header to be skipped when passing
## data on to the parser. If zero (default), the header
## is passed on to the parser together with the message.
## Note: This setting is only used for splitting_strategy = "variable length".
# splitting_length_field = {offset = 0, bytes = 0, endianness = "be", header_length = 0}

View File

@ -2,7 +2,10 @@
package socket_listener
import (
"bufio"
_ "embed"
"encoding/binary"
"encoding/hex"
"fmt"
"net"
"net/url"
@ -26,19 +29,32 @@ type listener interface {
close() error
}
type lengthFieldSpec struct {
Offset int64 `toml:"offset"`
Bytes int64 `toml:"bytes"`
Endianness string `toml:"endianness"`
HeaderLength int64 `toml:"header_length"`
converter func([]byte) int
}
type SocketListener struct {
ServiceAddress string `toml:"service_address"`
MaxConnections int `toml:"max_connections"`
ReadBufferSize config.Size `toml:"read_buffer_size"`
ReadTimeout config.Duration `toml:"read_timeout"`
KeepAlivePeriod *config.Duration `toml:"keep_alive_period"`
SocketMode string `toml:"socket_mode"`
ContentEncoding string `toml:"content_encoding"`
Log telegraf.Logger `toml:"-"`
ServiceAddress string `toml:"service_address"`
MaxConnections int `toml:"max_connections"`
ReadBufferSize config.Size `toml:"read_buffer_size"`
ReadTimeout config.Duration `toml:"read_timeout"`
KeepAlivePeriod *config.Duration `toml:"keep_alive_period"`
SocketMode string `toml:"socket_mode"`
ContentEncoding string `toml:"content_encoding"`
SplittingStrategy string `toml:"splitting_strategy"`
SplittingDelimiter string `toml:"splitting_delimiter"`
SplittingLength int `toml:"splitting_length"`
SplittingLengthField lengthFieldSpec `toml:"splitting_length_field"`
Log telegraf.Logger `toml:"-"`
tlsint.ServerConfig
wg sync.WaitGroup
parser parsers.Parser
wg sync.WaitGroup
parser parsers.Parser
splitter bufio.SplitFunc
listener listener
}
@ -47,6 +63,73 @@ func (*SocketListener) SampleConfig() string {
return sampleConfig
}
func (sl *SocketListener) Init() error {
switch sl.SplittingStrategy {
case "", "newline":
sl.splitter = bufio.ScanLines
case "null":
sl.splitter = scanNull
case "delimiter":
re := regexp.MustCompile(`(\s*0?x)`)
d := re.ReplaceAllString(strings.ToLower(sl.SplittingDelimiter), "")
delimiter, err := hex.DecodeString(d)
if err != nil {
return fmt.Errorf("decoding delimiter failed: %w", err)
}
sl.splitter = createScanDelimiter(delimiter)
case "fixed length":
sl.splitter = createScanFixedLength(sl.SplittingLength)
case "variable length":
// Create the converter function
var order binary.ByteOrder
switch strings.ToLower(sl.SplittingLengthField.Endianness) {
case "", "be":
order = binary.BigEndian
case "le":
order = binary.LittleEndian
default:
return fmt.Errorf("invalid 'endianess' %q", sl.SplittingLengthField.Endianness)
}
switch sl.SplittingLengthField.Bytes {
case 1:
sl.SplittingLengthField.converter = func(b []byte) int {
return int(b[0])
}
case 2:
sl.SplittingLengthField.converter = func(b []byte) int {
return int(order.Uint16(b))
}
case 4:
sl.SplittingLengthField.converter = func(b []byte) int {
return int(order.Uint32(b))
}
case 8:
sl.SplittingLengthField.converter = func(b []byte) int {
return int(order.Uint64(b))
}
default:
sl.SplittingLengthField.converter = func(b []byte) int {
buf := make([]byte, 8)
start := 0
if order == binary.BigEndian {
start = 8 - len(b)
}
for i := 0; i < len(b); i++ {
buf[start+i] = b[i]
}
return int(order.Uint64(buf))
}
}
// Check if we have enough bytes in the header
sl.splitter = createScanVariableLength(sl.SplittingLengthField)
default:
return fmt.Errorf("unknown 'splitting_strategy' %q", sl.SplittingStrategy)
}
return nil
}
func (sl *SocketListener) Gather(_ telegraf.Accumulator) error {
return nil
}
@ -84,6 +167,7 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
KeepAlivePeriod: sl.KeepAlivePeriod,
MaxConnections: sl.MaxConnections,
Encoding: sl.ContentEncoding,
Splitter: sl.splitter,
Parser: sl.parser,
Log: sl.Log,
}
@ -99,6 +183,7 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
KeepAlivePeriod: sl.KeepAlivePeriod,
MaxConnections: sl.MaxConnections,
Encoding: sl.ContentEncoding,
Splitter: sl.splitter,
Parser: sl.parser,
Log: sl.Log,
}

View File

@ -2,20 +2,27 @@ package socket_listener
import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/parsers/all"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -111,7 +118,6 @@ func TestSocketListener(t *testing.T) {
// Prepare the address and socket if needed
var serverAddr string
var tlsCfg *tls.Config
switch proto {
case "tcp", "udp":
serverAddr = "127.0.0.1:0"
@ -147,6 +153,7 @@ func TestSocketListener(t *testing.T) {
// Start the plugin
var acc testutil.Accumulator
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
@ -178,6 +185,159 @@ func TestSocketListener(t *testing.T) {
}
}
func TestCases(t *testing.T) {
// Get all directories in testdata
folders, err := os.ReadDir("testcases")
require.NoError(t, err)
// Register the plugin
inputs.Add("socket_listener", func() telegraf.Input {
return &SocketListener{}
})
for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}
// Compare options
options := []cmp.Option{
testutil.IgnoreTime(),
testutil.SortMetrics(),
}
t.Run(f.Name(), func(t *testing.T) {
testcasePath := filepath.Join("testcases", f.Name())
configFilename := filepath.Join(testcasePath, "telegraf.conf")
inputFilename := filepath.Join(testcasePath, "sequence.json")
expectedFilename := filepath.Join(testcasePath, "expected.out")
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")
// Prepare the influx parser for expectations
parser := &influx.Parser{}
require.NoError(t, parser.Init())
// Read the input sequence
sequence, err := readInputData(inputFilename)
require.NoError(t, err)
require.NotEmpty(t, sequence)
// Read the expected output if any
var expected []telegraf.Metric
if _, err := os.Stat(expectedFilename); err == nil {
var err error
expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser)
require.NoError(t, err)
}
// Read the expected output if any
var expectedErrors []string
if _, err := os.Stat(expectedErrorFilename); err == nil {
var err error
expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename)
require.NoError(t, err)
require.NotEmpty(t, expectedErrors)
}
// Configure the plugin
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfig(configFilename))
require.Len(t, cfg.Inputs, 1)
// Setup and start the plugin
var acc testutil.Accumulator
plugin := cfg.Inputs[0].Input.(*SocketListener)
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
// Create a client without TLS
addr := plugin.listener.addr()
client, err := createClient(plugin.ServiceAddress, addr, nil)
require.NoError(t, err)
// Write the given sequence
for i, step := range sequence {
if step.Wait > 0 {
time.Sleep(time.Duration(step.Wait))
continue
}
require.NotEmpty(t, step.raw, "nothing to send")
_, err := client.Write(step.raw)
require.NoErrorf(t, err, "writing step %d failed: %v", i, err)
}
require.NoError(t, client.Close())
getNErrors := func() int {
acc.Lock()
defer acc.Unlock()
return len(acc.Errors)
}
require.Eventuallyf(t, func() bool {
return getNErrors() >= len(expectedErrors)
}, 3*time.Second, 100*time.Millisecond, "did not receive errors (%d/%d)", getNErrors(), len(expectedErrors))
require.Len(t, acc.Errors, len(expectedErrors))
sort.SliceStable(acc.Errors, func(i, j int) bool {
return acc.Errors[i].Error() < acc.Errors[j].Error()
})
for i, err := range acc.Errors {
require.ErrorContains(t, err, expectedErrors[i])
}
require.Eventuallyf(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, 3*time.Second, 100*time.Millisecond, "did not receive metrics (%d/%d)", acc.NMetrics(), len(expected))
// Check the metric nevertheless as we might get some metrics despite errors.
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, options...)
})
}
}
// element provides a way to configure the
// write sequence for the socket.
type element struct {
Message string `json:"message"`
File string `json:"file"`
Wait config.Duration `json:"wait"`
raw []byte
}
func readInputData(filename string) ([]element, error) {
content, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
var sequence []element
if err := json.Unmarshal(content, &sequence); err != nil {
return nil, err
}
for i, step := range sequence {
if step.Message != "" && step.File != "" {
return nil, errors.New("both message and file set in sequence")
} else if step.Message != "" {
step.raw = []byte(step.Message)
} else if step.File != "" {
path := filepath.Dir(filename)
path = filepath.Join(path, step.File)
step.raw, err = os.ReadFile(path)
if err != nil {
return nil, err
}
}
sequence[i] = step
}
return sequence, nil
}
func createClient(endpoint string, addr net.Addr, tlsCfg *tls.Config) (net.Conn, error) {
// Determine the protocol in a crude fashion
parts := strings.SplitN(endpoint, "://", 2)

View File

@ -0,0 +1,81 @@
package socket_listener
import (
"bufio"
"bytes"
)
func scanNull(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, 0); i >= 0 {
return i + 1, data[:i], nil
}
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
func createScanDelimiter(delimiter []byte) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.Index(data, delimiter); i >= 0 {
return i + len(delimiter), data[:i], nil
}
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
}
func createScanFixedLength(length int) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if len(data) >= length {
return length, data[:length], nil
}
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
}
func createScanVariableLength(spec lengthFieldSpec) bufio.SplitFunc {
minlen := int(spec.Offset)
minlen += int(spec.Bytes)
headerLen := int(spec.HeaderLength)
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
dataLen := len(data)
if dataLen >= minlen {
// Extract the length field and convert it to a number
lf := data[spec.Offset : spec.Offset+spec.Bytes]
length := spec.converter(lf)
start := headerLen
end := length + headerLen
// If we have enough data return it without the header
if end <= dataLen {
return end, data[start:end], nil
}
}
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
}

View File

@ -27,6 +27,7 @@ type streamListener struct {
MaxConnections int
ReadTimeout config.Duration
KeepAlivePeriod *config.Duration
Splitter bufio.SplitFunc
Parser telegraf.Parser
Log telegraf.Logger
@ -202,6 +203,7 @@ func (l *streamListener) read(acc telegraf.Accumulator, conn net.Conn) error {
timeout := time.Duration(l.ReadTimeout)
scanner := bufio.NewScanner(decoder)
scanner.Split(l.Splitter)
for {
// Set the read deadline, if any, then start reading. The read
// will accept the deadline and return if no or insufficient data

View File

@ -0,0 +1 @@
parsing error: metric parse error

View File

@ -0,0 +1,3 @@
test,foo=bar v=1i 123456789
test,foo=baz v=2i 123456790
test,foo=zab v=3i 123456791

View File

@ -0,0 +1,14 @@
[
{
"message": "test,foo=bar v=1i 123456789\n"
},
{
"message": "wild boyz\n"
},
{
"message": "test,foo=baz v=2i 123456790\n"
},
{
"message": "test,foo=zab v=3i 123456791\n"
}
]

View File

@ -0,0 +1,3 @@
# Test with broken line protocol lines
[[inputs.socket_listener]]
service_address = "tcp://127.0.0.1:0"

View File

@ -0,0 +1,3 @@
test,foo=bar v=1i 123456789
test,foo=baz v=2i 123456790
test,foo=zab v=3i 123456791

View File

@ -0,0 +1,5 @@
[
{
"file": "message.bin"
}
]

View File

@ -0,0 +1,5 @@
# Test with broken line protocol lines
[[inputs.socket_listener]]
service_address = "tcp://127.0.0.1:0"
splitting_strategy = "delimiter"
splitting_delimiter = "0xBE 0xEF 0x00"

View File

@ -0,0 +1,4 @@
socket_listener value="foobar"
socket_listener value="fuubar"
socket_listener value="foobaz"
socket_listener value="123456"

View File

@ -0,0 +1,11 @@
[
{
"message": "foobarfuubarfoobaz"
},
{
"message": "123"
},
{
"message": "456"
}
]

View File

@ -0,0 +1,7 @@
# Test with broken line protocol lines
[[inputs.socket_listener]]
service_address = "tcp://127.0.0.1:0"
splitting_strategy = "fixed length"
splitting_length = 6
data_format = "value"
data_type = "string"

View File

@ -0,0 +1,3 @@
test,foo=bar v=1i 123456789
test,foo=baz v=2i 123456790
test,foo=zab v=3i 123456791

View File

@ -0,0 +1,5 @@
[
{
"message": "test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\ntest,foo=zab v=3i 123456791\n"
}
]

View File

@ -0,0 +1,4 @@
# Test with broken line protocol lines
[[inputs.socket_listener]]
service_address = "tcp://127.0.0.1:0"
splitting_strategy = "newline"

View File

@ -0,0 +1,3 @@
test,foo=bar v=1i 123456789
test,foo=baz v=2i 123456790
test,foo=zab v=3i 123456791

View File

@ -0,0 +1,5 @@
[
{
"file": "message.bin"
}
]

View File

@ -0,0 +1,4 @@
# Test with broken line protocol lines
[[inputs.socket_listener]]
service_address = "tcp://127.0.0.1:0"
splitting_strategy = "null"

View File

@ -0,0 +1,4 @@
test,foo=bar v=1i 123456789
test,foo=baz v=2i 123456790
test,foo=zab v=3i 123456791
test,foo=bie v=42i,temp=24.1 1665510032

View File

@ -0,0 +1 @@
4.1 1665510032

View File

@ -0,0 +1,20 @@
[
{
"file": "message_1.bin"
},
{
"file": "message_2.bin"
},
{
"file": "message_3.bin"
},
{
"file": "message_4.bin"
},
{
"wait": "100ms"
},
{
"file": "message_5.bin"
}
]

View File

@ -0,0 +1,5 @@
# Test with broken line protocol lines
[[inputs.socket_listener]]
service_address = "tcp://127.0.0.1:0"
splitting_strategy = "variable length"
splitting_length_field = {offset = 1, bytes = 2, endianness = "be", header_length = 3}

View File

@ -0,0 +1,5 @@
[
{
"wait": "1s"
}
]

View File

@ -0,0 +1,4 @@
# Test with broken line protocol lines
[[inputs.socket_listener]]
service_address = "tcp://127.0.0.1:0"
read_timeout = "100ms"