diff --git a/plugins/outputs/instrumental/instrumental_test.go b/plugins/outputs/instrumental/instrumental_test.go index a6b233d80..73ef60fd7 100644 --- a/plugins/outputs/instrumental/instrumental_test.go +++ b/plugins/outputs/instrumental/instrumental_test.go @@ -2,6 +2,7 @@ package instrumental import ( "bufio" + "errors" "io" "net" "net/textproto" @@ -88,63 +89,154 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) int { go func() { defer wg.Done() + defer tcpServer.Close() + conn, err := tcpServer.Accept() - require.NoError(t, err) + if err != nil { + t.Error(err) + return + } + defer func() { + if err := conn.Close(); err != nil { + t.Error(err) + } + }() + err = conn.SetDeadline(time.Now().Add(1 * time.Second)) - require.NoError(t, err) + if err != nil { + t.Error(err) + return + } + reader := bufio.NewReader(conn) tp := textproto.NewReader(reader) + helloExpected := "hello version go/telegraf/1.1" hello, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "hello version go/telegraf/1.1", hello) - auth, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "authenticate abc123token", auth) - _, err = conn.Write([]byte("ok\nok\n")) - require.NoError(t, err) + if err != nil { + t.Error(err) + return + } else if hello != helloExpected { + t.Errorf("expected %q, got %q", helloExpected, hello) + return + } + authExpected := "authenticate abc123token" + auth, err := tp.ReadLine() + if err != nil { + t.Error(err) + return + } else if auth != authExpected { + t.Errorf("expected %q, got %q", authExpected, auth) + return + } + + _, err = conn.Write([]byte("ok\nok\n")) + if err != nil { + t.Error(err) + return + } + + data1Expected := "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000" data1, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) + if err != nil { + t.Error(err) + return + } else if data1 != data1Expected { + t.Errorf("expected %q, got %q", data1Expected, data1) + return + } + + data2Expected := "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000" data2, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) + if err != nil { + t.Error(err) + return + } else if data2 != data2Expected { + t.Errorf("expected %q, got %q", data2Expected, data2) + return + } conn, err = tcpServer.Accept() - require.NoError(t, err) + if err != nil { + t.Error(err) + return + } + err = conn.SetDeadline(time.Now().Add(1 * time.Second)) - require.NoError(t, err) + if err != nil { + t.Error(err) + return + } + reader = bufio.NewReader(conn) tp = textproto.NewReader(reader) + helloExpected = "hello version go/telegraf/1.1" hello, err = tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "hello version go/telegraf/1.1", hello) + if err != nil { + t.Error(err) + return + } else if hello != helloExpected { + t.Errorf("expected %q, got %q", helloExpected, hello) + return + } + + authExpected = "authenticate abc123token" auth, err = tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "authenticate abc123token", auth) + if err != nil { + t.Error(err) + return + } else if auth != authExpected { + t.Errorf("expected %q, got %q", authExpected, auth) + return + } + _, err = conn.Write([]byte("ok\nok\n")) - require.NoError(t, err) + if err != nil { + t.Error(err) + return + } + data3Expected := "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000" data3, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3) + if err != nil { + t.Error(err) + return + } else if data3 != data3Expected { + t.Errorf("expected %q, got %q", data3Expected, data3) + return + } + data4Expected := "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000" data4, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000", data4) + if err != nil { + t.Error(err) + return + } else if data4 != data4Expected { + t.Errorf("expected %q, got %q", data4Expected, data4) + return + } + data5Expected := "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000" data5, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data5) + if err != nil { + t.Error(err) + return + } else if data5 != data5Expected { + t.Errorf("expected %q, got %q", data5Expected, data5) + return + } + data6Expected := "" data6, err := tp.ReadLine() - require.ErrorIs(t, err, io.EOF) - require.Equal(t, "", data6) - - err = conn.Close() - require.NoError(t, err) + if !errors.Is(err, io.EOF) { + t.Error(err) + return + } else if data6 != data6Expected { + t.Errorf("expected %q, got %q", data6Expected, data6) + return + } }() return tcpServer.Addr().(*net.TCPAddr).Port diff --git a/plugins/parsers/influx/influx_upstream/parser_test.go b/plugins/parsers/influx/influx_upstream/parser_test.go index ce04231c6..d9ad173bd 100644 --- a/plugins/parsers/influx/influx_upstream/parser_test.go +++ b/plugins/parsers/influx/influx_upstream/parser_test.go @@ -1006,9 +1006,11 @@ func TestStreamParserProducesAllAvailableMetrics(t *testing.T) { parser := NewStreamParser(r) parser.SetTimeFunc(DefaultTime) + ch := make(chan error) go func() { _, err := w.Write([]byte("metric value=1\nmetric2 value=1\n")) - require.NoError(t, err) + ch <- err + close(ch) }() _, err := parser.Next() @@ -1017,6 +1019,9 @@ func TestStreamParserProducesAllAvailableMetrics(t *testing.T) { // should not block on second read _, err = parser.Next() require.NoError(t, err) + + err = <-ch + require.NoError(t, err) } const benchmarkData = `benchmark,tags_host=myhost,tags_platform=python,tags_sdkver=3.11.5 value=5 1653643421 diff --git a/plugins/parsers/influx/parser_test.go b/plugins/parsers/influx/parser_test.go index 9de864c53..f1da1dba4 100644 --- a/plugins/parsers/influx/parser_test.go +++ b/plugins/parsers/influx/parser_test.go @@ -973,9 +973,11 @@ func TestStreamParserProducesAllAvailableMetrics(t *testing.T) { parser := NewStreamParser(r) parser.SetTimeFunc(DefaultTime) + ch := make(chan error) go func() { _, err := w.Write([]byte("metric value=1\nmetric2 value=1\n")) - require.NoError(t, err) + ch <- err + close(ch) }() _, err := parser.Next() @@ -984,6 +986,9 @@ func TestStreamParserProducesAllAvailableMetrics(t *testing.T) { // should not block on second read _, err = parser.Next() require.NoError(t, err) + + err = <-ch + require.NoError(t, err) } const benchmarkData = `benchmark,tags_host=myhost,tags_platform=python,tags_sdkver=3.11.5 value=5 1653643421 diff --git a/plugins/processors/ifname/ifname_test.go b/plugins/processors/ifname/ifname_test.go index 622147a61..618cb3a45 100644 --- a/plugins/processors/ifname/ifname_test.go +++ b/plugins/processors/ifname/ifname_test.go @@ -15,6 +15,12 @@ import ( "github.com/influxdata/telegraf/testutil" ) +type item struct { + entry nameMap + age time.Duration + err error +} + func TestTableIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -119,20 +125,26 @@ func TestGetMap(t *testing.T) { // Remote call should happen the first time getMap runs require.Equal(t, int32(1), remoteCalls) - var wg sync.WaitGroup const thMax = 3 + ch := make(chan item, thMax) + var wg sync.WaitGroup for th := 0; th < thMax; th++ { wg.Add(1) go func() { defer wg.Done() m, age, err := d.getMap("agent") - require.NoError(t, err) - require.NotZero(t, age) // Age is nonzero when map comes from cache - require.Equal(t, expected, m) + ch <- item{entry: m, age: age, err: err} }() } wg.Wait() + close(ch) + + for entry := range ch { + require.NoError(t, entry.err) + require.NotZero(t, entry.age) // Age is nonzero when map comes from cache + require.Equal(t, expected, entry.entry) + } // Remote call should not happen subsequent times getMap runs require.Equal(t, int32(1), remoteCalls) diff --git a/plugins/processors/reverse_dns/rdnscache_test.go b/plugins/processors/reverse_dns/rdnscache_test.go index eff64c2e6..ce59e1dd2 100644 --- a/plugins/processors/reverse_dns/rdnscache_test.go +++ b/plugins/processors/reverse_dns/rdnscache_test.go @@ -46,25 +46,24 @@ func TestParallelReverseDNSLookup(t *testing.T) { defer d.Stop() d.Resolver = &localResolver{} - var answer1 []string - var answer2 []string + var answer1, answer2 []string + var err1, err2 error wg := &sync.WaitGroup{} wg.Add(2) go func() { - answer, err := d.Lookup("127.0.0.1") - require.NoError(t, err) - answer1 = answer + answer1, err1 = d.Lookup("127.0.0.1") wg.Done() }() go func() { - answer, err := d.Lookup("127.0.0.1") - require.NoError(t, err) - answer2 = answer + answer2, err2 = d.Lookup("127.0.0.1") wg.Done() }() wg.Wait() + require.NoError(t, err1) + require.NoError(t, err2) + t.Log(answer1) t.Log(answer2)