From 438653591b5bc6bc5eb78bf08a5a639d9cd9a893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20=C5=BBak?= Date: Wed, 9 Oct 2024 16:33:36 +0200 Subject: [PATCH] chore(linters): Fix findings found by `testifylint`: `go-require` for `plugins/outputs` (#15985) --- plugins/outputs/file/file_test.go | 18 +- plugins/outputs/graphite/graphite_test.go | 228 ++++++------------ plugins/outputs/graylog/graylog_test_linux.go | 23 +- .../opentelemetry/opentelemetry_test.go | 6 +- .../postgresql/postgresql_bench_test.go | 3 +- plugins/outputs/postgresql/postgresql_test.go | 100 +++++--- .../outputs/postgresql/table_manager_test.go | 65 +++-- .../outputs/postgresql/table_source_test.go | 30 ++- 8 files changed, 248 insertions(+), 225 deletions(-) diff --git a/plugins/outputs/file/file_test.go b/plugins/outputs/file/file_test.go index 66249a21a..640b9e76d 100644 --- a/plugins/outputs/file/file_test.go +++ b/plugins/outputs/file/file_test.go @@ -234,6 +234,11 @@ func TestFileBoth(t *testing.T) { require.NoError(t, err) } +type erroredString struct { + str string + err error +} + func TestFileStdout(t *testing.T) { // keep backup of the real stdout old := os.Stdout @@ -261,13 +266,17 @@ func TestFileStdout(t *testing.T) { err = f.Close() require.NoError(t, err) - outC := make(chan string) + outC := make(chan erroredString) // copy the output in a separate goroutine so printing can't block indefinitely go func() { var buf bytes.Buffer _, err := io.Copy(&buf, r) - require.NoError(t, err) - outC <- buf.String() + if err != nil { + outC <- erroredString{err: err} + return + } + + outC <- erroredString{str: buf.String()} }() // back to normal state @@ -278,7 +287,8 @@ func TestFileStdout(t *testing.T) { os.Stdout = old out := <-outC - require.Equal(t, expNewFile, out) + require.NoError(t, out.err) + require.Equal(t, expNewFile, out.str) } func createFile(t *testing.T) *os.File { diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index edbcad25d..93ea93bd9 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -77,18 +77,7 @@ func TestGraphiteReconnect(t *testing.T) { t.Log("Writing metric after server came up, we expect automatic reconnect on write without calling Connect() again") require.NoError(t, g.Write([]telegraf.Metric{m})) - go func() { - defer wg.Done() - conn, err := (tcpServer).Accept() - require.NoError(t, err) - reader := bufio.NewReader(conn) - tp := textproto.NewReader(reader) - data1, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "192_168_0_1.|us-west-2|.mymeasurement.myfield 0.123 1289430000", data1) - require.NoError(t, conn.Close()) - require.NoError(t, tcpServer.Close()) - }() + simulateTCPServer(t, &wg, tcpServer, "192_168_0_1.|us-west-2|.mymeasurement.myfield 0.123 1289430000") wg.Wait() require.NoError(t, g.Close()) @@ -99,7 +88,7 @@ func TestGraphiteOK(t *testing.T) { // Start TCP server wg.Add(1) t.Log("Starting server") - TCPServer1(t, &wg) + tcpServer1(t, &wg) // Init plugin g := Graphite{ @@ -145,7 +134,7 @@ func TestGraphiteOK(t *testing.T) { var wg2 sync.WaitGroup // Start TCP server wg2.Add(1) - TCPServer2(t, &wg2) + tcpServer2(t, &wg2) // Write but expect an error, but reconnect err3 := g.Write(metrics2) t.Log("Finished writing second data, it should have reconnected automatically") @@ -163,18 +152,8 @@ func TestGraphiteStrictRegex(t *testing.T) { t.Log("Starting server") tcpServer, err := net.Listen("tcp", "127.0.0.1:12042") require.NoError(t, err) - go func() { - defer wg.Done() - conn, err := (tcpServer).Accept() - require.NoError(t, err) - reader := bufio.NewReader(conn) - tp := textproto.NewReader(reader) - data1, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "192_168_0_1.|us-west-2|.mymeasurement.myfield 0.123 1289430000", data1) - require.NoError(t, conn.Close()) - require.NoError(t, tcpServer.Close()) - }() + + simulateTCPServer(t, &wg, tcpServer, "192_168_0_1.|us-west-2|.mymeasurement.myfield 0.123 1289430000") m := metric.New( "mymeasurement", @@ -204,7 +183,7 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) { // Start TCP server wg.Add(1) t.Log("Starting server") - TCPServer1(t, &wg) + tcpServer1(t, &wg) // Init plugin g := Graphite{ @@ -251,7 +230,7 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) { var wg2 sync.WaitGroup // Start TCP server wg2.Add(1) - TCPServer2(t, &wg2) + tcpServer2(t, &wg2) // Write but expect an error, but reconnect err3 := g.Write(metrics2) t.Log("Finished writing second data, it should have reconnected automatically") @@ -268,7 +247,7 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) { // Start TCP server wg.Add(1) t.Log("Starting server") - TCPServer1(t, &wg) + tcpServer1(t, &wg) // Init plugin g := Graphite{ @@ -315,7 +294,7 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) { var wg2 sync.WaitGroup // Start TCP server wg2.Add(1) - TCPServer2(t, &wg2) + tcpServer2(t, &wg2) // Write but expect an error, but reconnect err3 := g.Write(metrics2) t.Log("Finished writing second data, it should have reconnected automatically") @@ -332,7 +311,7 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) { // Start TCP server wg.Add(1) t.Log("Starting server") - TCPServer1WithMultipleTemplates(t, &wg) + tcpServer1WithMultipleTemplates(t, &wg) // Init plugin g := Graphite{ @@ -383,7 +362,7 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) { var wg2 sync.WaitGroup // Start TCP server wg2.Add(1) - TCPServer2WithMultipleTemplates(t, &wg2) + tcpServer2WithMultipleTemplates(t, &wg2) // Write but expect an error, but reconnect err3 := g.Write(metrics2) t.Log("Finished writing second data, it should have reconnected automatically") @@ -400,7 +379,7 @@ func TestGraphiteOkWithTags(t *testing.T) { // Start TCP server wg.Add(1) t.Log("Starting server") - TCPServer1WithTags(t, &wg) + tcpServer1WithTags(t, &wg) // Init plugin g := Graphite{ @@ -447,7 +426,7 @@ func TestGraphiteOkWithTags(t *testing.T) { var wg2 sync.WaitGroup // Start TCP server wg2.Add(1) - TCPServer2WithTags(t, &wg2) + tcpServer2WithTags(t, &wg2) // Write but expect an error, but reconnect err3 := g.Write(metrics2) t.Log("Finished writing second data, it should have reconnected automatically") @@ -464,7 +443,7 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) { // Start TCP server wg.Add(1) t.Log("Starting server") - TCPServer1WithTags(t, &wg) + tcpServer1WithTags(t, &wg) // Init plugin g := Graphite{ @@ -512,7 +491,7 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) { var wg2 sync.WaitGroup // Start TCP server wg2.Add(1) - TCPServer2WithTags(t, &wg2) + tcpServer2WithTags(t, &wg2) // Write but expect an error, but reconnect err3 := g.Write(metrics2) t.Log("Finished writing second data, it should have reconnected automatically") @@ -529,7 +508,7 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) { // Start TCP server wg.Add(1) t.Log("Starting server") - TCPServer1WithTagsSeparatorUnderscore(t, &wg) + tcpServer1WithTagsSeparatorUnderscore(t, &wg) // Init plugin g := Graphite{ @@ -577,7 +556,7 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) { var wg2 sync.WaitGroup // Start TCP server wg2.Add(1) - TCPServer2WithTagsSeparatorUnderscore(t, &wg2) + tcpServer2WithTagsSeparatorUnderscore(t, &wg2) // Write but expect an error, but reconnect err3 := g.Write(metrics2) t.Log("Finished writing second data, it should have reconnected automatically") @@ -695,150 +674,97 @@ func query(url string, data interface{}) error { return json.Unmarshal(raw, &data) } -func TCPServer1(t *testing.T, wg *sync.WaitGroup) { - tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") - require.NoError(t, err) +func simulateTCPServer(t *testing.T, wg *sync.WaitGroup, tcpServer net.Listener, lines ...string) { go func() { defer wg.Done() - conn, err := (tcpServer).Accept() - require.NoError(t, err) + conn, err := tcpServer.Accept() + if err != nil { + t.Error(err) + return + } + defer func() { + if err := conn.Close(); err != nil { + t.Error(err) + } + if err := tcpServer.Close(); err != nil { + t.Error(err) + } + }() + reader := bufio.NewReader(conn) tp := textproto.NewReader(reader) - data1, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) - require.NoError(t, conn.Close()) - require.NoError(t, tcpServer.Close()) + + for _, line := range lines { + readLine, err := tp.ReadLine() + if err != nil { + t.Error(err) + return + } + + if line != readLine { + t.Error(err) + return + } + } }() } -func TCPServer2(t *testing.T, wg *sync.WaitGroup) { +func tcpServer1(t *testing.T, wg *sync.WaitGroup) { tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") require.NoError(t, err) - go func() { - defer wg.Done() - conn2, err := (tcpServer).Accept() - require.NoError(t, err) - reader := bufio.NewReader(conn2) - tp := textproto.NewReader(reader) - data2, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) - data3, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3) - require.NoError(t, conn2.Close()) - require.NoError(t, tcpServer.Close()) - }() + + simulateTCPServer(t, wg, tcpServer, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000") } -func TCPServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) { +func tcpServer2(t *testing.T, wg *sync.WaitGroup) { tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") require.NoError(t, err) - go func() { - defer wg.Done() - conn, err := (tcpServer).Accept() - require.NoError(t, err) - reader := bufio.NewReader(conn) - tp := textproto.NewReader(reader) - data1, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1) - require.NoError(t, conn.Close()) - require.NoError(t, tcpServer.Close()) - }() + + simulateTCPServer(t, wg, tcpServer, + "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", "my.prefix.192_168_0_1.my_measurement 3.14 1289430000") } -func TCPServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) { +func tcpServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) { tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") require.NoError(t, err) - go func() { - defer wg.Done() - conn2, err := (tcpServer).Accept() - require.NoError(t, err) - reader := bufio.NewReader(conn2) - tp := textproto.NewReader(reader) - data2, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2) - data3, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3) - require.NoError(t, conn2.Close()) - require.NoError(t, tcpServer.Close()) - }() + + simulateTCPServer(t, wg, tcpServer, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000") } -func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) { +func tcpServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) { tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") require.NoError(t, err) - go func() { - defer wg.Done() - conn, err := (tcpServer).Accept() - require.NoError(t, err) - reader := bufio.NewReader(conn) - tp := textproto.NewReader(reader) - data1, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1) - require.NoError(t, conn.Close()) - require.NoError(t, tcpServer.Close()) - }() + + simulateTCPServer(t, wg, tcpServer, + "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000") } -func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) { +func tcpServer1WithTags(t *testing.T, wg *sync.WaitGroup) { tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") require.NoError(t, err) - go func() { - defer wg.Done() - conn2, err := (tcpServer).Accept() - require.NoError(t, err) - reader := bufio.NewReader(conn2) - tp := textproto.NewReader(reader) - data2, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2) - data3, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3) - require.NoError(t, conn2.Close()) - require.NoError(t, tcpServer.Close()) - }() + + simulateTCPServer(t, wg, tcpServer, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000") } -func TCPServer1WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) { +func tcpServer2WithTags(t *testing.T, wg *sync.WaitGroup) { tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") require.NoError(t, err) - go func() { - defer wg.Done() - conn, err := (tcpServer).Accept() - require.NoError(t, err) - reader := bufio.NewReader(conn) - tp := textproto.NewReader(reader) - data1, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my_prefix_mymeasurement_myfield;host=192.168.0.1 3.14 1289430000", data1) - require.NoError(t, conn.Close()) - require.NoError(t, tcpServer.Close()) - }() + + simulateTCPServer(t, wg, tcpServer, + "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000") } -func TCPServer2WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) { +func tcpServer1WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) { tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") require.NoError(t, err) - go func() { - defer wg.Done() - conn2, err := (tcpServer).Accept() - require.NoError(t, err) - reader := bufio.NewReader(conn2) - tp := textproto.NewReader(reader) - data2, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my_prefix_mymeasurement;host=192.168.0.1 3.14 1289430000", data2) - data3, err := tp.ReadLine() - require.NoError(t, err) - require.Equal(t, "my_prefix_my_measurement;host=192.168.0.1 3.14 1289430000", data3) - require.NoError(t, conn2.Close()) - require.NoError(t, tcpServer.Close()) - }() + + simulateTCPServer(t, wg, tcpServer, "my_prefix_mymeasurement_myfield;host=192.168.0.1 3.14 1289430000") +} + +func tcpServer2WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) { + tcpServer, err := net.Listen("tcp", "127.0.0.1:12003") + require.NoError(t, err) + + simulateTCPServer(t, wg, tcpServer, + "my_prefix_mymeasurement;host=192.168.0.1 3.14 1289430000", "my_prefix_my_measurement;host=192.168.0.1 3.14 1289430000") } diff --git a/plugins/outputs/graylog/graylog_test_linux.go b/plugins/outputs/graylog/graylog_test_linux.go index fd5549102..5bb733c71 100644 --- a/plugins/outputs/graylog/graylog_test_linux.go +++ b/plugins/outputs/graylog/graylog_test_linux.go @@ -191,10 +191,25 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, namefieldnoprefix bool) string defer wg.Done() // in UDP scenario all 4 messages are received - require.NoError(t, recv()) - require.NoError(t, recv()) - require.NoError(t, recv()) - require.NoError(t, recv()) + err := recv() + if err != nil { + t.Error(err) + } + + err = recv() + if err != nil { + t.Error(err) + } + + err = recv() + if err != nil { + t.Error(err) + } + + err = recv() + if err != nil { + t.Error(err) + } }() return address } diff --git a/plugins/outputs/opentelemetry/opentelemetry_test.go b/plugins/outputs/opentelemetry/opentelemetry_test.go index 31de47359..6ccbf6553 100644 --- a/plugins/outputs/opentelemetry/opentelemetry_test.go +++ b/plugins/outputs/opentelemetry/opentelemetry_test.go @@ -105,7 +105,11 @@ func newMockOtelService(t *testing.T) *mockOtelService { } pmetricotlp.RegisterGRPCServer(grpcServer, mockOtelService) - go func() { require.NoError(t, grpcServer.Serve(listener)) }() + go func() { + if err := grpcServer.Serve(listener); err != nil { + t.Error(err) + } + }() grpcClient, err := grpc.NewClient(listener.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) diff --git a/plugins/outputs/postgresql/postgresql_bench_test.go b/plugins/outputs/postgresql/postgresql_bench_test.go index 2d7832474..0e9a5255f 100644 --- a/plugins/outputs/postgresql/postgresql_bench_test.go +++ b/plugins/outputs/postgresql/postgresql_bench_test.go @@ -24,7 +24,8 @@ func BenchmarkPostgresql_concurrent(b *testing.B) { } func benchmarkPostgresql(b *testing.B, gen <-chan []telegraf.Metric, concurrency int, foreignTags bool) { - p := newPostgresqlTest(b) + p, err := newPostgresqlTest(b) + require.NoError(b, err) connection, err := p.Connection.Get() require.NoError(b, err) diff --git a/plugins/outputs/postgresql/postgresql_test.go b/plugins/outputs/postgresql/postgresql_test.go index 38dffff80..846f5c27f 100644 --- a/plugins/outputs/postgresql/postgresql_test.go +++ b/plugins/outputs/postgresql/postgresql_test.go @@ -230,7 +230,7 @@ type PostgresqlTest struct { Logger *LogAccumulator } -func newPostgresqlTest(tb testing.TB) *PostgresqlTest { +func newPostgresqlTest(tb testing.TB) (*PostgresqlTest, error) { if testing.Short() { tb.Skip("Skipping integration test in short mode") } @@ -257,8 +257,9 @@ func newPostgresqlTest(tb testing.TB) *PostgresqlTest { } tb.Cleanup(container.Terminate) - err := container.Start() - require.NoError(tb, err, "failed to start container") + if err := container.Start(); err != nil { + return nil, fmt.Errorf("failed to start container: %w", err) + } p := newPostgresql() connection := fmt.Sprintf( @@ -273,12 +274,15 @@ func newPostgresqlTest(tb testing.TB) *PostgresqlTest { logger := NewLogAccumulator(tb) p.Logger = logger p.LogLevel = "debug" - require.NoError(tb, p.Init()) + + if err := p.Init(); err != nil { + return nil, fmt.Errorf("failed to init plugin: %w", err) + } pt := &PostgresqlTest{Postgresql: p} pt.Logger = logger - return pt + return pt, nil } func TestPostgresqlConnectIntegration(t *testing.T) { @@ -286,11 +290,13 @@ func TestPostgresqlConnectIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) require.EqualValues(t, 1, p.db.Stat().MaxConns()) - p = newPostgresqlTest(t) + p, err = newPostgresqlTest(t) + require.NoError(t, err) connection, err := p.Connection.Get() require.NoError(t, err) p.Connection = config.NewSecret([]byte(connection.String() + " pool_max_conns=2")) @@ -411,7 +417,8 @@ func TestWriteIntegration_sequential(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) metrics := []telegraf.Metric{ @@ -448,7 +455,8 @@ func TestWriteIntegration_concurrent(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.dbConfig.MaxConns = 3 require.NoError(t, p.Connect()) @@ -507,7 +515,8 @@ func TestWriteIntegration_sequentialPermError(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) metrics := []telegraf.Metric{ @@ -543,7 +552,8 @@ func TestWriteIntegration_sequentialSinglePermError(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) metrics := []telegraf.Metric{ @@ -563,7 +573,8 @@ func TestWriteIntegration_concurrentPermError(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.dbConfig.MaxConns = 2 require.NoError(t, p.Connect()) @@ -595,7 +606,8 @@ func TestWriteIntegration_sequentialTempError(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) // To avoid a race condition, we need to know when our goroutine has started listening to the log. @@ -620,10 +632,13 @@ func TestWriteIntegration_sequentialTempError(t *testing.T) { conf.Logger = nil c, err := pgx.ConnectConfig(context.Background(), conf) if err != nil { + t.Error(err) return true } _, err = c.Exec(context.Background(), "SELECT pg_terminate_backend($1)", pid) - require.NoError(t, err) + if err != nil { + t.Error(err) + } return true }, false) }() @@ -643,7 +658,8 @@ func TestWriteIntegration_concurrentTempError(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.dbConfig.MaxConns = 2 require.NoError(t, p.Connect()) @@ -669,10 +685,13 @@ func TestWriteIntegration_concurrentTempError(t *testing.T) { conf.Logger = nil c, err := pgx.ConnectConfig(context.Background(), conf) if err != nil { + t.Error(err) return true } _, err = c.Exec(context.Background(), "SELECT pg_terminate_backend($1)", pid) - require.NoError(t, err) + if err != nil { + t.Error(err) + } return true }, false) }() @@ -703,7 +722,8 @@ func TestTimestampColumnNameIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TimestampColumnName = "timestamp" require.NoError(t, p.Init()) require.NoError(t, p.Connect()) @@ -736,7 +756,8 @@ func TestWriteTagTableIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true require.NoError(t, p.Connect()) @@ -772,7 +793,8 @@ func TestWriteIntegration_tagError(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true require.NoError(t, p.Connect()) @@ -782,7 +804,7 @@ func TestWriteIntegration_tagError(t *testing.T) { require.NoError(t, p.Write(metrics)) // It'll have the table cached, so won't know we dropped it, will try insert, and get error. - _, err := p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"") + _, err = p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"") require.NoError(t, err) metrics = []telegraf.Metric{ @@ -802,7 +824,8 @@ func TestWriteIntegration_tagError_foreignConstraint(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true p.ForeignTagConstraint = true require.NoError(t, p.Connect()) @@ -813,7 +836,7 @@ func TestWriteIntegration_tagError_foreignConstraint(t *testing.T) { require.NoError(t, p.Write(metrics)) // It'll have the table cached, so won't know we dropped it, will try insert, and get error. - _, err := p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"") + _, err = p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"") require.NoError(t, err) metrics = []telegraf.Metric{ @@ -839,7 +862,8 @@ func TestWriteIntegration_utf8(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true require.NoError(t, p.Connect()) @@ -866,7 +890,8 @@ func TestWriteIntegration_UnsignedIntegers(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.Uint64Type = PgUint8 require.NoError(t, p.Init()) if err := p.Connect(); err != nil { @@ -902,7 +927,8 @@ func TestStressConcurrencyIntegration(t *testing.T) { concurrency := 4 loops := 100 - pctl := newPostgresqlTest(t) + pctl, err := newPostgresqlTest(t) + require.NoError(t, err) pctl.Logger.emitLevel = pgx.LogLevelWarn require.NoError(t, pctl.Connect()) @@ -916,18 +942,30 @@ func TestStressConcurrencyIntegration(t *testing.T) { copy(mShuf, metrics) rand.Shuffle(len(mShuf), func(a, b int) { mShuf[a], mShuf[b] = mShuf[b], mShuf[a] }) - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + if err != nil { + t.Error(err) + } + p.TagsAsForeignKeys = true p.Logger.emitLevel = pgx.LogLevelWarn p.dbConfig.MaxConns = int32(rand.Intn(3) + 1) - require.NoError(t, p.Connect()) + if err := p.Connect(); err != nil { + t.Error(err) + } wgStart.Done() wgStart.Wait() - err := p.Write(mShuf) - require.NoError(t, err) - require.NoError(t, p.Close()) - require.False(t, p.Logger.HasLevel(pgx.LogLevelWarn)) + if err := p.Write(mShuf); err != nil { + t.Error(err) + } + if err := p.Close(); err != nil { + t.Error(err) + } + if p.Logger.HasLevel(pgx.LogLevelWarn) { + t.Errorf("logger mustn't have a warning level") + } + wgDone.Done() }() } diff --git a/plugins/outputs/postgresql/table_manager_test.go b/plugins/outputs/postgresql/table_manager_test.go index 2f7b82f71..520419fb3 100644 --- a/plugins/outputs/postgresql/table_manager_test.go +++ b/plugins/outputs/postgresql/table_manager_test.go @@ -16,7 +16,8 @@ func TestTableManagerIntegration_EnsureStructure(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) cols := []utils.Column{ @@ -46,14 +47,15 @@ func TestTableManagerIntegration_EnsureStructure_alter(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) cols := []utils.Column{ p.columnFromTag("foo", ""), p.columnFromField("bar", 0), } - _, err := p.tableManager.EnsureStructure( + _, err = p.tableManager.EnsureStructure( ctx, p.db, p.tableManager.table(t.Name()), @@ -90,14 +92,15 @@ func TestTableManagerIntegration_EnsureStructure_overflowTableName(t *testing.T) t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) tbl := p.tableManager.table("ăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăă") // 32 2-byte unicode characters = 64 bytes cols := []utils.Column{ p.columnFromField("foo", 0), } - _, err := p.tableManager.EnsureStructure( + _, err = p.tableManager.EnsureStructure( ctx, p.db, tbl, @@ -117,7 +120,8 @@ func TestTableManagerIntegration_EnsureStructure_overflowTagName(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) tbl := p.tableManager.table(t.Name()) @@ -125,7 +129,7 @@ func TestTableManagerIntegration_EnsureStructure_overflowTagName(t *testing.T) { p.columnFromTag("ăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăă", "a"), // 32 2-byte unicode characters = 64 bytes p.columnFromField("foo", 0), } - _, err := p.tableManager.EnsureStructure( + _, err = p.tableManager.EnsureStructure( ctx, p.db, tbl, @@ -144,7 +148,8 @@ func TestTableManagerIntegration_EnsureStructure_overflowFieldName(t *testing.T) t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) tbl := p.tableManager.table(t.Name()) @@ -172,14 +177,15 @@ func TestTableManagerIntegration_getColumns(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) require.NoError(t, p.Connect()) cols := []utils.Column{ p.columnFromTag("foo", ""), p.columnFromField("baz", 0), } - _, err := p.tableManager.EnsureStructure( + _, err = p.tableManager.EnsureStructure( ctx, p.db, p.tableManager.table(t.Name()), @@ -206,7 +212,8 @@ func TestTableManagerIntegration_MatchSource(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true require.NoError(t, p.Connect()) @@ -225,7 +232,8 @@ func TestTableManagerIntegration_MatchSource_UnsignedIntegers(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.Uint64Type = PgUint8 require.NoError(t, p.Init()) if err := p.Connect(); err != nil { @@ -250,7 +258,8 @@ func TestTableManagerIntegration_noCreateTable(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.CreateTemplates = nil require.NoError(t, p.Connect()) @@ -267,7 +276,8 @@ func TestTableManagerIntegration_noCreateTagTable(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagTableCreateTemplates = nil p.TagsAsForeignKeys = true require.NoError(t, p.Connect()) @@ -286,7 +296,8 @@ func TestTableManagerIntegration_cache(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true require.NoError(t, p.Connect()) @@ -304,7 +315,8 @@ func TestTableManagerIntegration_noAlterMissingTag(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.AddColumnTemplates = []*sqltemplate.Template{} require.NoError(t, p.Connect()) @@ -330,7 +342,8 @@ func TestTableManagerIntegration_noAlterMissingTagTableTag(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true p.TagTableAddColumnTemplates = []*sqltemplate.Template{} require.NoError(t, p.Connect()) @@ -358,7 +371,8 @@ func TestTableManagerIntegration_badAlterTagTable(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true tmpl := &sqltemplate.Template{} require.NoError(t, tmpl.UnmarshalText([]byte("bad"))) @@ -387,7 +401,8 @@ func TestTableManagerIntegration_noAlterMissingField(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.AddColumnTemplates = []*sqltemplate.Template{} require.NoError(t, p.Connect()) @@ -412,7 +427,8 @@ func TestTableManagerIntegration_badAlterField(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) tmpl := &sqltemplate.Template{} require.NoError(t, tmpl.UnmarshalText([]byte("bad"))) p.AddColumnTemplates = []*sqltemplate.Template{tmpl} @@ -434,7 +450,8 @@ func TestTableManagerIntegration_badAlterField(t *testing.T) { } func TestTableManager_addColumnTemplates(t *testing.T) { - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true require.NoError(t, p.Connect()) @@ -444,7 +461,8 @@ func TestTableManager_addColumnTemplates(t *testing.T) { tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()] require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc)) - p = newPostgresqlTest(t) + p, err = newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true tmpl := &sqltemplate.Template{} require.NoError(t, tmpl.UnmarshalText([]byte(`-- addColumnTemplate: {{ . }}`))) @@ -471,7 +489,8 @@ func TestTableManager_addColumnTemplates(t *testing.T) { } func TestTableManager_TimeWithTimezone(t *testing.T) { - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true p.TimestampColumnType = "timestamp with time zone" require.NoError(t, p.Init()) diff --git a/plugins/outputs/postgresql/table_source_test.go b/plugins/outputs/postgresql/table_source_test.go index ab24eebf8..6ea957dae 100644 --- a/plugins/outputs/postgresql/table_source_test.go +++ b/plugins/outputs/postgresql/table_source_test.go @@ -41,7 +41,8 @@ func TestTableSourceIntegration_tagJSONB(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsJsonb = true metrics := []telegraf.Metric{ @@ -64,7 +65,8 @@ func TestTableSourceIntegration_tagTable(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true p.tagsCache = freecache.NewCache(5 * 1024 * 1024) @@ -87,7 +89,8 @@ func TestTableSourceIntegration_tagTableJSONB(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true p.TagsAsJsonb = true p.tagsCache = freecache.NewCache(5 * 1024 * 1024) @@ -109,7 +112,8 @@ func TestTableSourceIntegration_fieldsJSONB(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.FieldsAsJsonb = true metrics := []telegraf.Metric{ @@ -131,7 +135,8 @@ func TestTableSourceIntegration_DropColumn_tag(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) metrics := []telegraf.Metric{ newMetric(t, "", MSS{"a": "one", "b": "two"}, MSI{"v": 1}), @@ -162,7 +167,8 @@ func TestTableSourceIntegration_DropColumn_tag_fkTrue_fcTrue(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true p.ForeignTagConstraint = true p.tagsCache = freecache.NewCache(5 * 1024 * 1024) @@ -200,7 +206,8 @@ func TestTableSourceIntegration_DropColumn_tag_fkTrue_fcFalse(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true p.ForeignTagConstraint = false p.tagsCache = freecache.NewCache(5 * 1024 * 1024) @@ -238,7 +245,8 @@ func TestTableSourceIntegration_DropColumn_field(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) metrics := []telegraf.Metric{ newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}), @@ -267,7 +275,8 @@ func TestTableSourceIntegration_InconsistentTags(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) metrics := []telegraf.Metric{ newMetric(t, "", MSS{"a": "1"}, MSI{"b": 2}), @@ -289,7 +298,8 @@ func TestTagTableSourceIntegration_InconsistentTags(t *testing.T) { t.Skip("Skipping integration test in short mode") } - p := newPostgresqlTest(t) + p, err := newPostgresqlTest(t) + require.NoError(t, err) p.TagsAsForeignKeys = true p.tagsCache = freecache.NewCache(5 * 1024 * 1024)