fix(inputs.socket_listener): Fix race in tests (#13300)

This commit is contained in:
Sven Rebhan 2023-05-22 14:31:22 +02:00 committed by GitHub
parent b08a2bb324
commit 759691a1fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 54 additions and 21 deletions

View File

@ -136,9 +136,8 @@ func TestSocketListener(t *testing.T) {
}
// Setup plugin according to test specification
logger := &testutil.CaptureLogger{}
plugin := &SocketListener{
Log: logger,
Log: &testutil.Logger{},
ServiceAddress: proto + "://" + serverAddr,
ContentEncoding: tt.encoding,
ReadBufferSize: tt.buffersize,
@ -190,29 +189,63 @@ func TestSocketListener(t *testing.T) {
}, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics())
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
if sl, ok := plugin.listener.(*streamListener); ok {
require.NotEmpty(t, sl.connections)
}
plugin.Stop()
// Make sure we clear out old messages
logger.Clear()
if _, ok := plugin.listener.(*streamListener); ok {
// Verify that plugin.Stop() closed the client's connection
_ = client.SetReadDeadline(time.Now().Add(time.Second))
buf := []byte{1}
_, err = client.Read(buf)
require.Equal(t, err, io.EOF)
}
require.Empty(t, logger.Errors())
require.Empty(t, logger.Warnings())
})
}
}
func TestSocketListenerStream(t *testing.T) {
logger := &testutil.CaptureLogger{}
plugin := &SocketListener{
Log: logger,
ServiceAddress: "tcp://127.0.0.1:0",
ReadBufferSize: 1024,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Start the plugin
var acc testutil.Accumulator
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
addr := plugin.listener.addr()
// Create a noop client
client, err := createClient(plugin.ServiceAddress, addr, nil)
require.NoError(t, err)
_, err = client.Write([]byte("test value=42i\n"))
require.NoError(t, err)
require.Eventually(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= 1
}, time.Second, 100*time.Millisecond, "did not receive metric")
// This has to be a stream-listener...
listener, ok := plugin.listener.(*streamListener)
require.True(t, ok)
listener.Lock()
conns := len(listener.connections)
listener.Unlock()
require.NotZero(t, conns)
plugin.Stop()
// Verify that plugin.Stop() closed the client's connection
_ = client.SetReadDeadline(time.Now().Add(time.Second))
buf := []byte{1}
_, err = client.Read(buf)
require.Equal(t, err, io.EOF)
require.Empty(t, logger.Errors())
require.Empty(t, logger.Warnings())
}
func TestCases(t *testing.T) {
// Get all directories in testdata
folders, err := os.ReadDir("testcases")