chore(linters): Fix findings found by `testifylint`: `go-require` for `plugins/outputs` (#15985)

This commit is contained in:
Paweł Żak 2024-10-09 16:33:36 +02:00 committed by GitHub
parent a16763497b
commit 438653591b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 248 additions and 225 deletions

View File

@ -234,6 +234,11 @@ func TestFileBoth(t *testing.T) {
require.NoError(t, err)
}
type erroredString struct {
str string
err error
}
func TestFileStdout(t *testing.T) {
// keep backup of the real stdout
old := os.Stdout
@ -261,13 +266,17 @@ func TestFileStdout(t *testing.T) {
err = f.Close()
require.NoError(t, err)
outC := make(chan string)
outC := make(chan erroredString)
// copy the output in a separate goroutine so printing can't block indefinitely
go func() {
var buf bytes.Buffer
_, err := io.Copy(&buf, r)
require.NoError(t, err)
outC <- buf.String()
if err != nil {
outC <- erroredString{err: err}
return
}
outC <- erroredString{str: buf.String()}
}()
// back to normal state
@ -278,7 +287,8 @@ func TestFileStdout(t *testing.T) {
os.Stdout = old
out := <-outC
require.Equal(t, expNewFile, out)
require.NoError(t, out.err)
require.Equal(t, expNewFile, out.str)
}
func createFile(t *testing.T) *os.File {

View File

@ -77,18 +77,7 @@ func TestGraphiteReconnect(t *testing.T) {
t.Log("Writing metric after server came up, we expect automatic reconnect on write without calling Connect() again")
require.NoError(t, g.Write([]telegraf.Metric{m}))
go func() {
defer wg.Done()
conn, err := (tcpServer).Accept()
require.NoError(t, err)
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "192_168_0_1.|us-west-2|.mymeasurement.myfield 0.123 1289430000", data1)
require.NoError(t, conn.Close())
require.NoError(t, tcpServer.Close())
}()
simulateTCPServer(t, &wg, tcpServer, "192_168_0_1.|us-west-2|.mymeasurement.myfield 0.123 1289430000")
wg.Wait()
require.NoError(t, g.Close())
@ -99,7 +88,7 @@ func TestGraphiteOK(t *testing.T) {
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1(t, &wg)
tcpServer1(t, &wg)
// Init plugin
g := Graphite{
@ -145,7 +134,7 @@ func TestGraphiteOK(t *testing.T) {
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2(t, &wg2)
tcpServer2(t, &wg2)
// Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
@ -163,18 +152,8 @@ func TestGraphiteStrictRegex(t *testing.T) {
t.Log("Starting server")
tcpServer, err := net.Listen("tcp", "127.0.0.1:12042")
require.NoError(t, err)
go func() {
defer wg.Done()
conn, err := (tcpServer).Accept()
require.NoError(t, err)
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "192_168_0_1.|us-west-2|.mymeasurement.myfield 0.123 1289430000", data1)
require.NoError(t, conn.Close())
require.NoError(t, tcpServer.Close())
}()
simulateTCPServer(t, &wg, tcpServer, "192_168_0_1.|us-west-2|.mymeasurement.myfield 0.123 1289430000")
m := metric.New(
"mymeasurement",
@ -204,7 +183,7 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) {
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1(t, &wg)
tcpServer1(t, &wg)
// Init plugin
g := Graphite{
@ -251,7 +230,7 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) {
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2(t, &wg2)
tcpServer2(t, &wg2)
// Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
@ -268,7 +247,7 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1(t, &wg)
tcpServer1(t, &wg)
// Init plugin
g := Graphite{
@ -315,7 +294,7 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2(t, &wg2)
tcpServer2(t, &wg2)
// Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
@ -332,7 +311,7 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithMultipleTemplates(t, &wg)
tcpServer1WithMultipleTemplates(t, &wg)
// Init plugin
g := Graphite{
@ -383,7 +362,7 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithMultipleTemplates(t, &wg2)
tcpServer2WithMultipleTemplates(t, &wg2)
// Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
@ -400,7 +379,7 @@ func TestGraphiteOkWithTags(t *testing.T) {
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithTags(t, &wg)
tcpServer1WithTags(t, &wg)
// Init plugin
g := Graphite{
@ -447,7 +426,7 @@ func TestGraphiteOkWithTags(t *testing.T) {
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithTags(t, &wg2)
tcpServer2WithTags(t, &wg2)
// Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
@ -464,7 +443,7 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithTags(t, &wg)
tcpServer1WithTags(t, &wg)
// Init plugin
g := Graphite{
@ -512,7 +491,7 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithTags(t, &wg2)
tcpServer2WithTags(t, &wg2)
// Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
@ -529,7 +508,7 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithTagsSeparatorUnderscore(t, &wg)
tcpServer1WithTagsSeparatorUnderscore(t, &wg)
// Init plugin
g := Graphite{
@ -577,7 +556,7 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithTagsSeparatorUnderscore(t, &wg2)
tcpServer2WithTagsSeparatorUnderscore(t, &wg2)
// Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
@ -695,150 +674,97 @@ func query(url string, data interface{}) error {
return json.Unmarshal(raw, &data)
}
func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12003")
require.NoError(t, err)
func simulateTCPServer(t *testing.T, wg *sync.WaitGroup, tcpServer net.Listener, lines ...string) {
go func() {
defer wg.Done()
conn, err := (tcpServer).Accept()
require.NoError(t, err)
conn, err := tcpServer.Accept()
if err != nil {
t.Error(err)
return
}
defer func() {
if err := conn.Close(); err != nil {
t.Error(err)
}
if err := tcpServer.Close(); err != nil {
t.Error(err)
}
}()
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
require.NoError(t, conn.Close())
require.NoError(t, tcpServer.Close())
for _, line := range lines {
readLine, err := tp.ReadLine()
if err != nil {
t.Error(err)
return
}
if line != readLine {
t.Error(err)
return
}
}
}()
}
func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
func tcpServer1(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12003")
require.NoError(t, err)
go func() {
defer wg.Done()
conn2, err := (tcpServer).Accept()
require.NoError(t, err)
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
data3, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3)
require.NoError(t, conn2.Close())
require.NoError(t, tcpServer.Close())
}()
simulateTCPServer(t, wg, tcpServer, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000")
}
func TCPServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
func tcpServer2(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12003")
require.NoError(t, err)
go func() {
defer wg.Done()
conn, err := (tcpServer).Accept()
require.NoError(t, err)
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1)
require.NoError(t, conn.Close())
require.NoError(t, tcpServer.Close())
}()
simulateTCPServer(t, wg, tcpServer,
"my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", "my.prefix.192_168_0_1.my_measurement 3.14 1289430000")
}
func TCPServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
func tcpServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12003")
require.NoError(t, err)
go func() {
defer wg.Done()
conn2, err := (tcpServer).Accept()
require.NoError(t, err)
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2)
data3, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3)
require.NoError(t, conn2.Close())
require.NoError(t, tcpServer.Close())
}()
simulateTCPServer(t, wg, tcpServer, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000")
}
func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) {
func tcpServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12003")
require.NoError(t, err)
go func() {
defer wg.Done()
conn, err := (tcpServer).Accept()
require.NoError(t, err)
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1)
require.NoError(t, conn.Close())
require.NoError(t, tcpServer.Close())
}()
simulateTCPServer(t, wg, tcpServer,
"my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000")
}
func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) {
func tcpServer1WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12003")
require.NoError(t, err)
go func() {
defer wg.Done()
conn2, err := (tcpServer).Accept()
require.NoError(t, err)
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
data3, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3)
require.NoError(t, conn2.Close())
require.NoError(t, tcpServer.Close())
}()
simulateTCPServer(t, wg, tcpServer, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000")
}
func TCPServer1WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) {
func tcpServer2WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12003")
require.NoError(t, err)
go func() {
defer wg.Done()
conn, err := (tcpServer).Accept()
require.NoError(t, err)
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my_prefix_mymeasurement_myfield;host=192.168.0.1 3.14 1289430000", data1)
require.NoError(t, conn.Close())
require.NoError(t, tcpServer.Close())
}()
simulateTCPServer(t, wg, tcpServer,
"my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000")
}
func TCPServer2WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) {
func tcpServer1WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12003")
require.NoError(t, err)
go func() {
defer wg.Done()
conn2, err := (tcpServer).Accept()
require.NoError(t, err)
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my_prefix_mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
data3, err := tp.ReadLine()
require.NoError(t, err)
require.Equal(t, "my_prefix_my_measurement;host=192.168.0.1 3.14 1289430000", data3)
require.NoError(t, conn2.Close())
require.NoError(t, tcpServer.Close())
}()
simulateTCPServer(t, wg, tcpServer, "my_prefix_mymeasurement_myfield;host=192.168.0.1 3.14 1289430000")
}
func tcpServer2WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12003")
require.NoError(t, err)
simulateTCPServer(t, wg, tcpServer,
"my_prefix_mymeasurement;host=192.168.0.1 3.14 1289430000", "my_prefix_my_measurement;host=192.168.0.1 3.14 1289430000")
}

View File

@ -191,10 +191,25 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, namefieldnoprefix bool) string
defer wg.Done()
// in UDP scenario all 4 messages are received
require.NoError(t, recv())
require.NoError(t, recv())
require.NoError(t, recv())
require.NoError(t, recv())
err := recv()
if err != nil {
t.Error(err)
}
err = recv()
if err != nil {
t.Error(err)
}
err = recv()
if err != nil {
t.Error(err)
}
err = recv()
if err != nil {
t.Error(err)
}
}()
return address
}

View File

@ -105,7 +105,11 @@ func newMockOtelService(t *testing.T) *mockOtelService {
}
pmetricotlp.RegisterGRPCServer(grpcServer, mockOtelService)
go func() { require.NoError(t, grpcServer.Serve(listener)) }()
go func() {
if err := grpcServer.Serve(listener); err != nil {
t.Error(err)
}
}()
grpcClient, err := grpc.NewClient(listener.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

View File

@ -24,7 +24,8 @@ func BenchmarkPostgresql_concurrent(b *testing.B) {
}
func benchmarkPostgresql(b *testing.B, gen <-chan []telegraf.Metric, concurrency int, foreignTags bool) {
p := newPostgresqlTest(b)
p, err := newPostgresqlTest(b)
require.NoError(b, err)
connection, err := p.Connection.Get()
require.NoError(b, err)

View File

@ -230,7 +230,7 @@ type PostgresqlTest struct {
Logger *LogAccumulator
}
func newPostgresqlTest(tb testing.TB) *PostgresqlTest {
func newPostgresqlTest(tb testing.TB) (*PostgresqlTest, error) {
if testing.Short() {
tb.Skip("Skipping integration test in short mode")
}
@ -257,8 +257,9 @@ func newPostgresqlTest(tb testing.TB) *PostgresqlTest {
}
tb.Cleanup(container.Terminate)
err := container.Start()
require.NoError(tb, err, "failed to start container")
if err := container.Start(); err != nil {
return nil, fmt.Errorf("failed to start container: %w", err)
}
p := newPostgresql()
connection := fmt.Sprintf(
@ -273,12 +274,15 @@ func newPostgresqlTest(tb testing.TB) *PostgresqlTest {
logger := NewLogAccumulator(tb)
p.Logger = logger
p.LogLevel = "debug"
require.NoError(tb, p.Init())
if err := p.Init(); err != nil {
return nil, fmt.Errorf("failed to init plugin: %w", err)
}
pt := &PostgresqlTest{Postgresql: p}
pt.Logger = logger
return pt
return pt, nil
}
func TestPostgresqlConnectIntegration(t *testing.T) {
@ -286,11 +290,13 @@ func TestPostgresqlConnectIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
require.EqualValues(t, 1, p.db.Stat().MaxConns())
p = newPostgresqlTest(t)
p, err = newPostgresqlTest(t)
require.NoError(t, err)
connection, err := p.Connection.Get()
require.NoError(t, err)
p.Connection = config.NewSecret([]byte(connection.String() + " pool_max_conns=2"))
@ -411,7 +417,8 @@ func TestWriteIntegration_sequential(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
@ -448,7 +455,8 @@ func TestWriteIntegration_concurrent(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.dbConfig.MaxConns = 3
require.NoError(t, p.Connect())
@ -507,7 +515,8 @@ func TestWriteIntegration_sequentialPermError(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
@ -543,7 +552,8 @@ func TestWriteIntegration_sequentialSinglePermError(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
@ -563,7 +573,8 @@ func TestWriteIntegration_concurrentPermError(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.dbConfig.MaxConns = 2
require.NoError(t, p.Connect())
@ -595,7 +606,8 @@ func TestWriteIntegration_sequentialTempError(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
// To avoid a race condition, we need to know when our goroutine has started listening to the log.
@ -620,10 +632,13 @@ func TestWriteIntegration_sequentialTempError(t *testing.T) {
conf.Logger = nil
c, err := pgx.ConnectConfig(context.Background(), conf)
if err != nil {
t.Error(err)
return true
}
_, err = c.Exec(context.Background(), "SELECT pg_terminate_backend($1)", pid)
require.NoError(t, err)
if err != nil {
t.Error(err)
}
return true
}, false)
}()
@ -643,7 +658,8 @@ func TestWriteIntegration_concurrentTempError(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.dbConfig.MaxConns = 2
require.NoError(t, p.Connect())
@ -669,10 +685,13 @@ func TestWriteIntegration_concurrentTempError(t *testing.T) {
conf.Logger = nil
c, err := pgx.ConnectConfig(context.Background(), conf)
if err != nil {
t.Error(err)
return true
}
_, err = c.Exec(context.Background(), "SELECT pg_terminate_backend($1)", pid)
require.NoError(t, err)
if err != nil {
t.Error(err)
}
return true
}, false)
}()
@ -703,7 +722,8 @@ func TestTimestampColumnNameIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TimestampColumnName = "timestamp"
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
@ -736,7 +756,8 @@ func TestWriteTagTableIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
@ -772,7 +793,8 @@ func TestWriteIntegration_tagError(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
@ -782,7 +804,7 @@ func TestWriteIntegration_tagError(t *testing.T) {
require.NoError(t, p.Write(metrics))
// It'll have the table cached, so won't know we dropped it, will try insert, and get error.
_, err := p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"")
_, err = p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"")
require.NoError(t, err)
metrics = []telegraf.Metric{
@ -802,7 +824,8 @@ func TestWriteIntegration_tagError_foreignConstraint(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.ForeignTagConstraint = true
require.NoError(t, p.Connect())
@ -813,7 +836,7 @@ func TestWriteIntegration_tagError_foreignConstraint(t *testing.T) {
require.NoError(t, p.Write(metrics))
// It'll have the table cached, so won't know we dropped it, will try insert, and get error.
_, err := p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"")
_, err = p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"")
require.NoError(t, err)
metrics = []telegraf.Metric{
@ -839,7 +862,8 @@ func TestWriteIntegration_utf8(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
@ -866,7 +890,8 @@ func TestWriteIntegration_UnsignedIntegers(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.Uint64Type = PgUint8
require.NoError(t, p.Init())
if err := p.Connect(); err != nil {
@ -902,7 +927,8 @@ func TestStressConcurrencyIntegration(t *testing.T) {
concurrency := 4
loops := 100
pctl := newPostgresqlTest(t)
pctl, err := newPostgresqlTest(t)
require.NoError(t, err)
pctl.Logger.emitLevel = pgx.LogLevelWarn
require.NoError(t, pctl.Connect())
@ -916,18 +942,30 @@ func TestStressConcurrencyIntegration(t *testing.T) {
copy(mShuf, metrics)
rand.Shuffle(len(mShuf), func(a, b int) { mShuf[a], mShuf[b] = mShuf[b], mShuf[a] })
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
if err != nil {
t.Error(err)
}
p.TagsAsForeignKeys = true
p.Logger.emitLevel = pgx.LogLevelWarn
p.dbConfig.MaxConns = int32(rand.Intn(3) + 1)
require.NoError(t, p.Connect())
if err := p.Connect(); err != nil {
t.Error(err)
}
wgStart.Done()
wgStart.Wait()
err := p.Write(mShuf)
require.NoError(t, err)
require.NoError(t, p.Close())
require.False(t, p.Logger.HasLevel(pgx.LogLevelWarn))
if err := p.Write(mShuf); err != nil {
t.Error(err)
}
if err := p.Close(); err != nil {
t.Error(err)
}
if p.Logger.HasLevel(pgx.LogLevelWarn) {
t.Errorf("logger mustn't have a warning level")
}
wgDone.Done()
}()
}

View File

@ -16,7 +16,8 @@ func TestTableManagerIntegration_EnsureStructure(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
cols := []utils.Column{
@ -46,14 +47,15 @@ func TestTableManagerIntegration_EnsureStructure_alter(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
cols := []utils.Column{
p.columnFromTag("foo", ""),
p.columnFromField("bar", 0),
}
_, err := p.tableManager.EnsureStructure(
_, err = p.tableManager.EnsureStructure(
ctx,
p.db,
p.tableManager.table(t.Name()),
@ -90,14 +92,15 @@ func TestTableManagerIntegration_EnsureStructure_overflowTableName(t *testing.T)
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
tbl := p.tableManager.table("ăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăă") // 32 2-byte unicode characters = 64 bytes
cols := []utils.Column{
p.columnFromField("foo", 0),
}
_, err := p.tableManager.EnsureStructure(
_, err = p.tableManager.EnsureStructure(
ctx,
p.db,
tbl,
@ -117,7 +120,8 @@ func TestTableManagerIntegration_EnsureStructure_overflowTagName(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
tbl := p.tableManager.table(t.Name())
@ -125,7 +129,7 @@ func TestTableManagerIntegration_EnsureStructure_overflowTagName(t *testing.T) {
p.columnFromTag("ăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăă", "a"), // 32 2-byte unicode characters = 64 bytes
p.columnFromField("foo", 0),
}
_, err := p.tableManager.EnsureStructure(
_, err = p.tableManager.EnsureStructure(
ctx,
p.db,
tbl,
@ -144,7 +148,8 @@ func TestTableManagerIntegration_EnsureStructure_overflowFieldName(t *testing.T)
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
tbl := p.tableManager.table(t.Name())
@ -172,14 +177,15 @@ func TestTableManagerIntegration_getColumns(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
cols := []utils.Column{
p.columnFromTag("foo", ""),
p.columnFromField("baz", 0),
}
_, err := p.tableManager.EnsureStructure(
_, err = p.tableManager.EnsureStructure(
ctx,
p.db,
p.tableManager.table(t.Name()),
@ -206,7 +212,8 @@ func TestTableManagerIntegration_MatchSource(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
@ -225,7 +232,8 @@ func TestTableManagerIntegration_MatchSource_UnsignedIntegers(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.Uint64Type = PgUint8
require.NoError(t, p.Init())
if err := p.Connect(); err != nil {
@ -250,7 +258,8 @@ func TestTableManagerIntegration_noCreateTable(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.CreateTemplates = nil
require.NoError(t, p.Connect())
@ -267,7 +276,8 @@ func TestTableManagerIntegration_noCreateTagTable(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagTableCreateTemplates = nil
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
@ -286,7 +296,8 @@ func TestTableManagerIntegration_cache(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
@ -304,7 +315,8 @@ func TestTableManagerIntegration_noAlterMissingTag(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.AddColumnTemplates = []*sqltemplate.Template{}
require.NoError(t, p.Connect())
@ -330,7 +342,8 @@ func TestTableManagerIntegration_noAlterMissingTagTableTag(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.TagTableAddColumnTemplates = []*sqltemplate.Template{}
require.NoError(t, p.Connect())
@ -358,7 +371,8 @@ func TestTableManagerIntegration_badAlterTagTable(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
tmpl := &sqltemplate.Template{}
require.NoError(t, tmpl.UnmarshalText([]byte("bad")))
@ -387,7 +401,8 @@ func TestTableManagerIntegration_noAlterMissingField(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.AddColumnTemplates = []*sqltemplate.Template{}
require.NoError(t, p.Connect())
@ -412,7 +427,8 @@ func TestTableManagerIntegration_badAlterField(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
tmpl := &sqltemplate.Template{}
require.NoError(t, tmpl.UnmarshalText([]byte("bad")))
p.AddColumnTemplates = []*sqltemplate.Template{tmpl}
@ -434,7 +450,8 @@ func TestTableManagerIntegration_badAlterField(t *testing.T) {
}
func TestTableManager_addColumnTemplates(t *testing.T) {
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
@ -444,7 +461,8 @@ func TestTableManager_addColumnTemplates(t *testing.T) {
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
p = newPostgresqlTest(t)
p, err = newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
tmpl := &sqltemplate.Template{}
require.NoError(t, tmpl.UnmarshalText([]byte(`-- addColumnTemplate: {{ . }}`)))
@ -471,7 +489,8 @@ func TestTableManager_addColumnTemplates(t *testing.T) {
}
func TestTableManager_TimeWithTimezone(t *testing.T) {
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.TimestampColumnType = "timestamp with time zone"
require.NoError(t, p.Init())

View File

@ -41,7 +41,8 @@ func TestTableSourceIntegration_tagJSONB(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsJsonb = true
metrics := []telegraf.Metric{
@ -64,7 +65,8 @@ func TestTableSourceIntegration_tagTable(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.tagsCache = freecache.NewCache(5 * 1024 * 1024)
@ -87,7 +89,8 @@ func TestTableSourceIntegration_tagTableJSONB(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.TagsAsJsonb = true
p.tagsCache = freecache.NewCache(5 * 1024 * 1024)
@ -109,7 +112,8 @@ func TestTableSourceIntegration_fieldsJSONB(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.FieldsAsJsonb = true
metrics := []telegraf.Metric{
@ -131,7 +135,8 @@ func TestTableSourceIntegration_DropColumn_tag(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"a": "one", "b": "two"}, MSI{"v": 1}),
@ -162,7 +167,8 @@ func TestTableSourceIntegration_DropColumn_tag_fkTrue_fcTrue(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.ForeignTagConstraint = true
p.tagsCache = freecache.NewCache(5 * 1024 * 1024)
@ -200,7 +206,8 @@ func TestTableSourceIntegration_DropColumn_tag_fkTrue_fcFalse(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.ForeignTagConstraint = false
p.tagsCache = freecache.NewCache(5 * 1024 * 1024)
@ -238,7 +245,8 @@ func TestTableSourceIntegration_DropColumn_field(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
@ -267,7 +275,8 @@ func TestTableSourceIntegration_InconsistentTags(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"a": "1"}, MSI{"b": 2}),
@ -289,7 +298,8 @@ func TestTagTableSourceIntegration_InconsistentTags(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
p := newPostgresqlTest(t)
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.tagsCache = freecache.NewCache(5 * 1024 * 1024)