chore(inputs.socket_listener): Reorganize test to be table-based. (#11970)

This commit is contained in:
Sven Rebhan 2022-10-11 17:13:50 +02:00 committed by GitHub
parent f5dad77d93
commit a94da1d235
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 176 additions and 271 deletions

View File

@ -1,297 +1,202 @@
package socket_listener
import (
"bytes"
"crypto/tls"
"io"
"log"
"net"
"os"
"runtime"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
"github.com/influxdata/wlog"
)
var pki = testutil.NewPKI("../../../testutil/pki")
// prepareLog is a helper function to ensure no data is written to log.
// Should be called at the start of the test, and returns a function which should run at the end.
func prepareLog(t *testing.T) func() {
buf := bytes.NewBuffer(nil)
log.SetOutput(wlog.NewWriter(buf))
func TestSocketListener(t *testing.T) {
messages := [][]byte{
[]byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n"),
[]byte("test,foo=zab v=3i 123456791\n"),
}
expected := []telegraf.Metric{
metric.New(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{"v": int64(1)},
time.Unix(0, 123456789),
),
metric.New(
"test",
map[string]string{"foo": "baz"},
map[string]interface{}{"v": int64(2)},
time.Unix(0, 123456790),
),
metric.New(
"test",
map[string]string{"foo": "zab"},
map[string]interface{}{"v": int64(3)},
time.Unix(0, 123456791),
),
}
level := wlog.WARN
wlog.SetLevel(level)
tests := []struct {
name string
schema string
buffersize config.Size
encoding string
}{
{
name: "TCP",
schema: "tcp",
buffersize: config.Size(1024),
},
{
name: "TCP with TLS",
schema: "tcp+tls",
},
{
name: "TCP with gzip encoding",
schema: "tcp",
buffersize: config.Size(1024),
encoding: "gzip",
},
{
name: "UDP",
schema: "udp",
buffersize: config.Size(1024),
},
{
name: "UDP with gzip encoding",
schema: "udp",
buffersize: config.Size(1024),
encoding: "gzip",
},
{
name: "unix socket",
schema: "unix",
buffersize: config.Size(1024),
},
{
name: "unix socket with TLS",
schema: "unix+tls",
},
{
name: "unix socket with gzip encoding",
schema: "unix",
encoding: "gzip",
},
{
name: "unixgram socket",
schema: "unixgram",
buffersize: config.Size(1024),
},
}
return func() {
log.SetOutput(os.Stderr)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
proto := strings.TrimSuffix(tt.schema, "+tls")
for {
line, err := buf.ReadBytes('\n')
if err != nil {
require.Equal(t, io.EOF, err)
break
// Prepare the address and socket if needed
var serverAddr string
switch proto {
case "tcp", "udp":
serverAddr = "127.0.0.1:0"
case "unix", "unixgram":
if runtime.GOOS == "windows" {
t.Skip("Skipping on Windows, as unixgram sockets are not supported")
}
// Create a socket
sock, err := os.CreateTemp("", "sock-")
require.NoError(t, err)
defer sock.Close()
defer os.Remove(sock.Name())
serverAddr = sock.Name()
}
require.Empty(t, string(line), "log not empty")
}
// Setup plugin according to test specification
plugin := &SocketListener{
Log: &testutil.Logger{},
ServiceAddress: proto + "://" + serverAddr,
ContentEncoding: tt.encoding,
ReadBufferSize: tt.buffersize,
}
if strings.HasSuffix(tt.schema, "tls") {
plugin.ServerConfig = *pki.TLSServerConfig()
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Start the plugin
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
// Setup the client for submitting data
var client net.Conn
switch tt.schema {
case "tcp":
var err error
addr := plugin.Closer.(net.Listener).Addr().String()
client, err = net.Dial("tcp", addr)
require.NoError(t, err)
case "tcp+tls":
addr := plugin.Closer.(net.Listener).Addr().String()
tlscfg, err := pki.TLSClientConfig().TLSConfig()
require.NoError(t, err)
client, err = tls.Dial("tcp", addr, tlscfg)
require.NoError(t, err)
case "udp":
var err error
addr := plugin.Closer.(net.PacketConn).LocalAddr().String()
client, err = net.Dial("udp", addr)
require.NoError(t, err)
case "unix":
var err error
client, err = net.Dial("unix", serverAddr)
require.NoError(t, err)
case "unix+tls":
tlscfg, err := pki.TLSClientConfig().TLSConfig()
require.NoError(t, err)
tlscfg.InsecureSkipVerify = true
client, err = tls.Dial("unix", serverAddr, tlscfg)
require.NoError(t, err)
case "unixgram":
var err error
client, err = net.Dial("unixgram", serverAddr)
require.NoError(t, err)
default:
require.Failf(t, "schema %q not supported in test", tt.schema)
}
// Send the data with the correct encoding
encoder, err := internal.NewContentEncoder(tt.encoding)
require.NoError(t, err)
for i, msg := range messages {
m, err := encoder.Encode(msg)
require.NoErrorf(t, err, "encoding failed for msg %d", i)
_, err = client.Write(m)
require.NoErrorf(t, err, "sending msg %d failed", i)
}
// Test the resulting metrics and compare against expected results
require.Eventually(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "did not receive metrics")
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
})
}
}
func TestSocketListener_tcp_tls(t *testing.T) {
testEmptyLog := prepareLog(t)
defer testEmptyLog()
sl := &SocketListener{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ServerConfig = *pki.TLSServerConfig()
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
tlsCfg, err := pki.TLSClientConfig().TLSConfig()
require.NoError(t, err)
secureClient, err := tls.Dial("tcp", sl.Closer.(net.Listener).Addr().String(), tlsCfg)
require.NoError(t, err)
testSocketListener(t, sl, secureClient)
}
func TestSocketListener_unix_tls(t *testing.T) {
sock := testutil.TempSocket(t)
sl := &SocketListener{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "unix://" + sock
sl.ServerConfig = *pki.TLSServerConfig()
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
tlsCfg, err := pki.TLSClientConfig().TLSConfig()
require.NoError(t, err)
tlsCfg.InsecureSkipVerify = true
secureClient, err := tls.Dial("unix", sock, tlsCfg)
require.NoError(t, err)
testSocketListener(t, sl, secureClient)
}
func TestSocketListener_tcp(t *testing.T) {
testEmptyLog := prepareLog(t)
defer testEmptyLog()
sl := &SocketListener{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ReadBufferSize = config.Size(1024)
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String())
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func TestSocketListener_udp(t *testing.T) {
testEmptyLog := prepareLog(t)
defer testEmptyLog()
sl := &SocketListener{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "udp://127.0.0.1:0"
sl.ReadBufferSize = config.Size(1024)
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String())
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func TestSocketListener_unix(t *testing.T) {
sock := testutil.TempSocket(t)
testEmptyLog := prepareLog(t)
defer testEmptyLog()
f, _ := os.Create(sock)
require.NoError(t, f.Close())
sl := &SocketListener{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "unix://" + sock
sl.ReadBufferSize = config.Size(1024)
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("unix", sock)
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func TestSocketListener_unixgram(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping on Windows, as unixgram sockets are not supported")
}
sock := testutil.TempSocket(t)
testEmptyLog := prepareLog(t)
defer testEmptyLog()
f, err := os.Create(sock)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
sl := &SocketListener{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "unixgram://" + sock
sl.ReadBufferSize = config.Size(1024)
acc := &testutil.Accumulator{}
err = sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("unixgram", sock)
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func TestSocketListenerDecode_tcp(t *testing.T) {
testEmptyLog := prepareLog(t)
defer testEmptyLog()
sl := &SocketListener{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ReadBufferSize = config.Size(1024)
sl.ContentEncoding = "gzip"
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String())
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func TestSocketListenerDecode_udp(t *testing.T) {
testEmptyLog := prepareLog(t)
defer testEmptyLog()
sl := &SocketListener{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "udp://127.0.0.1:0"
sl.ReadBufferSize = config.Size(1024)
sl.ContentEncoding = "gzip"
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String())
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n")
mstr3 := []byte("test,foo=zab v=3i 123456791\n")
if sl.ContentEncoding == "gzip" {
encoder, err := internal.NewContentEncoder(sl.ContentEncoding)
require.NoError(t, err)
mstr12, err = encoder.Encode(mstr12)
require.NoError(t, err)
encoder, err = internal.NewContentEncoder(sl.ContentEncoding)
require.NoError(t, err)
mstr3, err = encoder.Encode(mstr3)
require.NoError(t, err)
}
_, err := client.Write(mstr12)
require.NoError(t, err)
_, err = client.Write(mstr3)
require.NoError(t, err)
acc := sl.Accumulator.(*testutil.Accumulator)
acc.Wait(3)
acc.Lock()
m1 := acc.Metrics[0]
m2 := acc.Metrics[1]
m3 := acc.Metrics[2]
acc.Unlock()
require.Equal(t, "test", m1.Measurement)
require.Equal(t, map[string]string{"foo": "bar"}, m1.Tags)
require.Equal(t, map[string]interface{}{"v": int64(1)}, m1.Fields)
require.True(t, time.Unix(0, 123456789).Equal(m1.Time))
require.Equal(t, "test", m2.Measurement)
require.Equal(t, map[string]string{"foo": "baz"}, m2.Tags)
require.Equal(t, map[string]interface{}{"v": int64(2)}, m2.Fields)
require.True(t, time.Unix(0, 123456790).Equal(m2.Time))
require.Equal(t, "test", m3.Measurement)
require.Equal(t, map[string]string{"foo": "zab"}, m3.Tags)
require.Equal(t, map[string]interface{}{"v": int64(3)}, m3.Fields)
require.True(t, time.Unix(0, 123456791).Equal(m3.Time))
}