diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index a5a79468f..8e2b6f681 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -1,297 +1,202 @@ package socket_listener import ( - "bytes" "crypto/tls" - "io" - "log" "net" "os" "runtime" + "strings" "testing" "time" "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" - "github.com/influxdata/wlog" ) var pki = testutil.NewPKI("../../../testutil/pki") -// prepareLog is a helper function to ensure no data is written to log. -// Should be called at the start of the test, and returns a function which should run at the end. -func prepareLog(t *testing.T) func() { - buf := bytes.NewBuffer(nil) - log.SetOutput(wlog.NewWriter(buf)) +func TestSocketListener(t *testing.T) { + messages := [][]byte{ + []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n"), + []byte("test,foo=zab v=3i 123456791\n"), + } + expected := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"foo": "bar"}, + map[string]interface{}{"v": int64(1)}, + time.Unix(0, 123456789), + ), + metric.New( + "test", + map[string]string{"foo": "baz"}, + map[string]interface{}{"v": int64(2)}, + time.Unix(0, 123456790), + ), + metric.New( + "test", + map[string]string{"foo": "zab"}, + map[string]interface{}{"v": int64(3)}, + time.Unix(0, 123456791), + ), + } - level := wlog.WARN - wlog.SetLevel(level) + tests := []struct { + name string + schema string + buffersize config.Size + encoding string + }{ + { + name: "TCP", + schema: "tcp", + buffersize: config.Size(1024), + }, + { + name: "TCP with TLS", + schema: "tcp+tls", + }, + { + name: "TCP with gzip encoding", + schema: "tcp", + buffersize: config.Size(1024), + encoding: "gzip", + }, + { + name: "UDP", + schema: "udp", + buffersize: config.Size(1024), + }, + { + name: "UDP with gzip encoding", + schema: "udp", + buffersize: config.Size(1024), + encoding: "gzip", + }, + { + name: "unix socket", + schema: "unix", + buffersize: config.Size(1024), + }, + { + name: "unix socket with TLS", + schema: "unix+tls", + }, + { + name: "unix socket with gzip encoding", + schema: "unix", + encoding: "gzip", + }, + { + name: "unixgram socket", + schema: "unixgram", + buffersize: config.Size(1024), + }, + } - return func() { - log.SetOutput(os.Stderr) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + proto := strings.TrimSuffix(tt.schema, "+tls") - for { - line, err := buf.ReadBytes('\n') - if err != nil { - require.Equal(t, io.EOF, err) - break + // Prepare the address and socket if needed + var serverAddr string + switch proto { + case "tcp", "udp": + serverAddr = "127.0.0.1:0" + case "unix", "unixgram": + if runtime.GOOS == "windows" { + t.Skip("Skipping on Windows, as unixgram sockets are not supported") + } + + // Create a socket + sock, err := os.CreateTemp("", "sock-") + require.NoError(t, err) + defer sock.Close() + defer os.Remove(sock.Name()) + serverAddr = sock.Name() } - require.Empty(t, string(line), "log not empty") - } + + // Setup plugin according to test specification + plugin := &SocketListener{ + Log: &testutil.Logger{}, + ServiceAddress: proto + "://" + serverAddr, + ContentEncoding: tt.encoding, + ReadBufferSize: tt.buffersize, + } + if strings.HasSuffix(tt.schema, "tls") { + plugin.ServerConfig = *pki.TLSServerConfig() + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Start the plugin + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Setup the client for submitting data + var client net.Conn + switch tt.schema { + case "tcp": + var err error + addr := plugin.Closer.(net.Listener).Addr().String() + client, err = net.Dial("tcp", addr) + require.NoError(t, err) + case "tcp+tls": + addr := plugin.Closer.(net.Listener).Addr().String() + tlscfg, err := pki.TLSClientConfig().TLSConfig() + require.NoError(t, err) + client, err = tls.Dial("tcp", addr, tlscfg) + require.NoError(t, err) + case "udp": + var err error + addr := plugin.Closer.(net.PacketConn).LocalAddr().String() + client, err = net.Dial("udp", addr) + require.NoError(t, err) + case "unix": + var err error + client, err = net.Dial("unix", serverAddr) + require.NoError(t, err) + case "unix+tls": + tlscfg, err := pki.TLSClientConfig().TLSConfig() + require.NoError(t, err) + tlscfg.InsecureSkipVerify = true + client, err = tls.Dial("unix", serverAddr, tlscfg) + require.NoError(t, err) + case "unixgram": + var err error + client, err = net.Dial("unixgram", serverAddr) + require.NoError(t, err) + default: + require.Failf(t, "schema %q not supported in test", tt.schema) + } + + // Send the data with the correct encoding + encoder, err := internal.NewContentEncoder(tt.encoding) + require.NoError(t, err) + + for i, msg := range messages { + m, err := encoder.Encode(msg) + require.NoErrorf(t, err, "encoding failed for msg %d", i) + _, err = client.Write(m) + require.NoErrorf(t, err, "sending msg %d failed", i) + } + + // Test the resulting metrics and compare against expected results + require.Eventually(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "did not receive metrics") + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) + }) } } - -func TestSocketListener_tcp_tls(t *testing.T) { - testEmptyLog := prepareLog(t) - defer testEmptyLog() - - sl := &SocketListener{} - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - sl.SetParser(parser) - sl.Log = testutil.Logger{} - sl.ServiceAddress = "tcp://127.0.0.1:0" - sl.ServerConfig = *pki.TLSServerConfig() - - acc := &testutil.Accumulator{} - err := sl.Start(acc) - require.NoError(t, err) - defer sl.Stop() - - tlsCfg, err := pki.TLSClientConfig().TLSConfig() - require.NoError(t, err) - - secureClient, err := tls.Dial("tcp", sl.Closer.(net.Listener).Addr().String(), tlsCfg) - require.NoError(t, err) - - testSocketListener(t, sl, secureClient) -} - -func TestSocketListener_unix_tls(t *testing.T) { - sock := testutil.TempSocket(t) - - sl := &SocketListener{} - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - sl.SetParser(parser) - sl.Log = testutil.Logger{} - sl.ServiceAddress = "unix://" + sock - sl.ServerConfig = *pki.TLSServerConfig() - - acc := &testutil.Accumulator{} - err := sl.Start(acc) - require.NoError(t, err) - defer sl.Stop() - - tlsCfg, err := pki.TLSClientConfig().TLSConfig() - require.NoError(t, err) - tlsCfg.InsecureSkipVerify = true - - secureClient, err := tls.Dial("unix", sock, tlsCfg) - require.NoError(t, err) - - testSocketListener(t, sl, secureClient) -} - -func TestSocketListener_tcp(t *testing.T) { - testEmptyLog := prepareLog(t) - defer testEmptyLog() - - sl := &SocketListener{} - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - sl.SetParser(parser) - sl.Log = testutil.Logger{} - sl.ServiceAddress = "tcp://127.0.0.1:0" - sl.ReadBufferSize = config.Size(1024) - - acc := &testutil.Accumulator{} - err := sl.Start(acc) - require.NoError(t, err) - defer sl.Stop() - - client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String()) - require.NoError(t, err) - - testSocketListener(t, sl, client) -} - -func TestSocketListener_udp(t *testing.T) { - testEmptyLog := prepareLog(t) - defer testEmptyLog() - - sl := &SocketListener{} - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - sl.SetParser(parser) - sl.Log = testutil.Logger{} - sl.ServiceAddress = "udp://127.0.0.1:0" - sl.ReadBufferSize = config.Size(1024) - - acc := &testutil.Accumulator{} - err := sl.Start(acc) - require.NoError(t, err) - defer sl.Stop() - - client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String()) - require.NoError(t, err) - - testSocketListener(t, sl, client) -} - -func TestSocketListener_unix(t *testing.T) { - sock := testutil.TempSocket(t) - - testEmptyLog := prepareLog(t) - defer testEmptyLog() - - f, _ := os.Create(sock) - require.NoError(t, f.Close()) - sl := &SocketListener{} - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - sl.SetParser(parser) - sl.Log = testutil.Logger{} - sl.ServiceAddress = "unix://" + sock - sl.ReadBufferSize = config.Size(1024) - - acc := &testutil.Accumulator{} - err := sl.Start(acc) - require.NoError(t, err) - defer sl.Stop() - - client, err := net.Dial("unix", sock) - require.NoError(t, err) - - testSocketListener(t, sl, client) -} - -func TestSocketListener_unixgram(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Skipping on Windows, as unixgram sockets are not supported") - } - - sock := testutil.TempSocket(t) - - testEmptyLog := prepareLog(t) - defer testEmptyLog() - - f, err := os.Create(sock) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, f.Close()) }) - - sl := &SocketListener{} - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - sl.SetParser(parser) - sl.Log = testutil.Logger{} - sl.ServiceAddress = "unixgram://" + sock - sl.ReadBufferSize = config.Size(1024) - - acc := &testutil.Accumulator{} - err = sl.Start(acc) - require.NoError(t, err) - defer sl.Stop() - - client, err := net.Dial("unixgram", sock) - require.NoError(t, err) - - testSocketListener(t, sl, client) -} - -func TestSocketListenerDecode_tcp(t *testing.T) { - testEmptyLog := prepareLog(t) - defer testEmptyLog() - - sl := &SocketListener{} - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - sl.SetParser(parser) - sl.Log = testutil.Logger{} - sl.ServiceAddress = "tcp://127.0.0.1:0" - sl.ReadBufferSize = config.Size(1024) - sl.ContentEncoding = "gzip" - - acc := &testutil.Accumulator{} - err := sl.Start(acc) - require.NoError(t, err) - defer sl.Stop() - - client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String()) - require.NoError(t, err) - - testSocketListener(t, sl, client) -} - -func TestSocketListenerDecode_udp(t *testing.T) { - testEmptyLog := prepareLog(t) - defer testEmptyLog() - - sl := &SocketListener{} - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - sl.SetParser(parser) - sl.Log = testutil.Logger{} - sl.ServiceAddress = "udp://127.0.0.1:0" - sl.ReadBufferSize = config.Size(1024) - sl.ContentEncoding = "gzip" - - acc := &testutil.Accumulator{} - err := sl.Start(acc) - require.NoError(t, err) - defer sl.Stop() - - client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String()) - require.NoError(t, err) - - testSocketListener(t, sl, client) -} - -func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) { - mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n") - mstr3 := []byte("test,foo=zab v=3i 123456791\n") - - if sl.ContentEncoding == "gzip" { - encoder, err := internal.NewContentEncoder(sl.ContentEncoding) - require.NoError(t, err) - mstr12, err = encoder.Encode(mstr12) - require.NoError(t, err) - - encoder, err = internal.NewContentEncoder(sl.ContentEncoding) - require.NoError(t, err) - mstr3, err = encoder.Encode(mstr3) - require.NoError(t, err) - } - - _, err := client.Write(mstr12) - require.NoError(t, err) - _, err = client.Write(mstr3) - require.NoError(t, err) - acc := sl.Accumulator.(*testutil.Accumulator) - - acc.Wait(3) - acc.Lock() - m1 := acc.Metrics[0] - m2 := acc.Metrics[1] - m3 := acc.Metrics[2] - acc.Unlock() - - require.Equal(t, "test", m1.Measurement) - require.Equal(t, map[string]string{"foo": "bar"}, m1.Tags) - require.Equal(t, map[string]interface{}{"v": int64(1)}, m1.Fields) - require.True(t, time.Unix(0, 123456789).Equal(m1.Time)) - - require.Equal(t, "test", m2.Measurement) - require.Equal(t, map[string]string{"foo": "baz"}, m2.Tags) - require.Equal(t, map[string]interface{}{"v": int64(2)}, m2.Fields) - require.True(t, time.Unix(0, 123456790).Equal(m2.Time)) - - require.Equal(t, "test", m3.Measurement) - require.Equal(t, map[string]string{"foo": "zab"}, m3.Tags) - require.Equal(t, map[string]interface{}{"v": int64(3)}, m3.Fields) - require.True(t, time.Unix(0, 123456791).Equal(m3.Time)) -}