test: harden running of testcontainer integration tests (#11245)
This commit is contained in:
parent
c6ed8bb807
commit
9daefaba11
|
|
@ -505,7 +505,7 @@ var testEsAggregationData = []esAggregationQueryTest{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupIntegrationTest(t *testing.T) (testutil.Container, error) {
|
func setupIntegrationTest(t *testing.T) (*testutil.Container, error) {
|
||||||
type nginxlog struct {
|
type nginxlog struct {
|
||||||
IPaddress string `json:"IP"`
|
IPaddress string `json:"IP"`
|
||||||
Timestamp time.Time `json:"@timestamp"`
|
Timestamp time.Time `json:"@timestamp"`
|
||||||
|
|
@ -542,7 +542,7 @@ func setupIntegrationTest(t *testing.T) (testutil.Container, error) {
|
||||||
|
|
||||||
err = e.connectToES()
|
err = e.connectToES()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return container, err
|
return &container, err
|
||||||
}
|
}
|
||||||
|
|
||||||
bulkRequest := e.esClient.Bulk()
|
bulkRequest := e.esClient.Bulk()
|
||||||
|
|
@ -550,7 +550,7 @@ func setupIntegrationTest(t *testing.T) (testutil.Container, error) {
|
||||||
// populate elasticsearch with nginx_logs test data file
|
// populate elasticsearch with nginx_logs test data file
|
||||||
file, err := os.Open("testdata/nginx_logs")
|
file, err := os.Open("testdata/nginx_logs")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return container, err
|
return &container, err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
|
|
@ -579,22 +579,22 @@ func setupIntegrationTest(t *testing.T) (testutil.Container, error) {
|
||||||
Doc(logline))
|
Doc(logline))
|
||||||
}
|
}
|
||||||
if scanner.Err() != nil {
|
if scanner.Err() != nil {
|
||||||
return container, err
|
return &container, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = bulkRequest.Do(context.Background())
|
_, err = bulkRequest.Do(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return container, err
|
return &container, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// force elastic to refresh indexes to get new batch data
|
// force elastic to refresh indexes to get new batch data
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
_, err = e.esClient.Refresh().Do(ctx)
|
_, err = e.esClient.Refresh().Do(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return container, err
|
return &container, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return container, nil
|
return &container, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestElasticsearchQuery(t *testing.T) {
|
func TestElasticsearchQuery(t *testing.T) {
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,10 @@ func TestMysqlDefaultsToLocalIntegration(t *testing.T) {
|
||||||
"MYSQL_ALLOW_EMPTY_PASSWORD": "yes",
|
"MYSQL_ALLOW_EMPTY_PASSWORD": "yes",
|
||||||
},
|
},
|
||||||
ExposedPorts: []string{servicePort},
|
ExposedPorts: []string{servicePort},
|
||||||
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.ForLog("/usr/sbin/mysqld: ready for connections"),
|
||||||
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
err := container.Start()
|
err := container.Start()
|
||||||
|
|
@ -59,7 +62,10 @@ func TestMysqlMultipleInstancesIntegration(t *testing.T) {
|
||||||
"MYSQL_ALLOW_EMPTY_PASSWORD": "yes",
|
"MYSQL_ALLOW_EMPTY_PASSWORD": "yes",
|
||||||
},
|
},
|
||||||
ExposedPorts: []string{servicePort},
|
ExposedPorts: []string{servicePort},
|
||||||
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.ForLog("/usr/sbin/mysqld: ready for connections"),
|
||||||
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
err := container.Start()
|
err := container.Start()
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ func TestOpenldapGeneratesMetricsIntegration(t *testing.T) {
|
||||||
"LDAP_ADMIN_PASSWORD": "secret",
|
"LDAP_ADMIN_PASSWORD": "secret",
|
||||||
},
|
},
|
||||||
WaitingFor: wait.ForAll(
|
WaitingFor: wait.ForAll(
|
||||||
wait.ForLog("Starting slapd"),
|
wait.ForLog("slapd starting"),
|
||||||
wait.ForListeningPort(nat.Port(servicePort)),
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
@ -128,7 +128,7 @@ func TestOpenldapStartTLSIntegration(t *testing.T) {
|
||||||
"/server.key": tlsKey,
|
"/server.key": tlsKey,
|
||||||
},
|
},
|
||||||
WaitingFor: wait.ForAll(
|
WaitingFor: wait.ForAll(
|
||||||
wait.ForLog("Starting slapd"),
|
wait.ForLog("slapd starting"),
|
||||||
wait.ForListeningPort(nat.Port(servicePort)),
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
@ -191,7 +191,7 @@ func TestOpenldapLDAPSIntegration(t *testing.T) {
|
||||||
"/server.key": tlsKey,
|
"/server.key": tlsKey,
|
||||||
},
|
},
|
||||||
WaitingFor: wait.ForAll(
|
WaitingFor: wait.ForAll(
|
||||||
wait.ForLog("Starting slapd"),
|
wait.ForLog("slapd starting"),
|
||||||
wait.ForListeningPort(nat.Port(servicePortSecure)),
|
wait.ForListeningPort(nat.Port(servicePortSecure)),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
@ -249,7 +249,7 @@ func TestOpenldapInvalidSSLIntegration(t *testing.T) {
|
||||||
"/server.key": tlsKey,
|
"/server.key": tlsKey,
|
||||||
},
|
},
|
||||||
WaitingFor: wait.ForAll(
|
WaitingFor: wait.ForAll(
|
||||||
wait.ForLog("Starting slapd"),
|
wait.ForLog("slapd starting"),
|
||||||
wait.ForListeningPort(nat.Port(servicePortSecure)),
|
wait.ForListeningPort(nat.Port(servicePortSecure)),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
@ -289,7 +289,7 @@ func TestOpenldapBindIntegration(t *testing.T) {
|
||||||
"LDAP_ADMIN_PASSWORD": "secret",
|
"LDAP_ADMIN_PASSWORD": "secret",
|
||||||
},
|
},
|
||||||
WaitingFor: wait.ForAll(
|
WaitingFor: wait.ForAll(
|
||||||
wait.ForLog("Starting slapd"),
|
wait.ForLog("slapd starting"),
|
||||||
wait.ForListeningPort(nat.Port(servicePort)),
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
@ -341,7 +341,7 @@ func TestOpenldapReverseMetricsIntegration(t *testing.T) {
|
||||||
"LDAP_ADMIN_PASSWORD": "secret",
|
"LDAP_ADMIN_PASSWORD": "secret",
|
||||||
},
|
},
|
||||||
WaitingFor: wait.ForAll(
|
WaitingFor: wait.ForAll(
|
||||||
wait.ForLog("Starting slapd"),
|
wait.ForLog("slapd starting"),
|
||||||
wait.ForListeningPort(nat.Port(servicePort)),
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,10 @@ func TestPgBouncerGeneratesMetricsIntegration(t *testing.T) {
|
||||||
"PG_ENV_POSTGRESQL_USER": "pgbouncer",
|
"PG_ENV_POSTGRESQL_USER": "pgbouncer",
|
||||||
"PG_ENV_POSTGRESQL_PASS": "pgbouncer",
|
"PG_ENV_POSTGRESQL_PASS": "pgbouncer",
|
||||||
},
|
},
|
||||||
WaitingFor: wait.ForListeningPort(nat.Port(pgBouncerServicePort)),
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.ForListeningPort(nat.Port(pgBouncerServicePort)),
|
||||||
|
wait.ForLog("LOG process up"),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
err = container.Start()
|
err = container.Start()
|
||||||
require.NoError(t, err, "failed to start container")
|
require.NoError(t, err, "failed to start container")
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package sql
|
package sql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -9,8 +8,8 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/docker/go-connections/nat"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/testcontainers/testcontainers-go"
|
|
||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
|
@ -37,50 +36,35 @@ func TestMariaDB(t *testing.T) {
|
||||||
|
|
||||||
logger := testutil.Logger{}
|
logger := testutil.Logger{}
|
||||||
|
|
||||||
addr := "127.0.0.1"
|
|
||||||
port := "3306"
|
port := "3306"
|
||||||
passwd := ""
|
passwd := pwgen(32)
|
||||||
database := "foo"
|
database := "foo"
|
||||||
|
|
||||||
logger.Infof("Spinning up container...")
|
|
||||||
|
|
||||||
// Generate a random password
|
|
||||||
passwd = pwgen(32)
|
|
||||||
|
|
||||||
// Determine the test-data mountpoint
|
// Determine the test-data mountpoint
|
||||||
testdata, err := filepath.Abs("testdata/mariadb")
|
testdata, err := filepath.Abs("testdata/mariadb")
|
||||||
require.NoError(t, err, "determining absolute path of test-data failed")
|
require.NoError(t, err, "determining absolute path of test-data failed")
|
||||||
|
|
||||||
// Spin-up the container
|
container := testutil.Container{
|
||||||
ctx := context.Background()
|
Image: "mariadb",
|
||||||
req := testcontainers.GenericContainerRequest{
|
ExposedPorts: []string{port},
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
Env: map[string]string{
|
||||||
Image: "mariadb",
|
"MYSQL_ROOT_PASSWORD": passwd,
|
||||||
Env: map[string]string{
|
"MYSQL_DATABASE": database,
|
||||||
"MYSQL_ROOT_PASSWORD": passwd,
|
|
||||||
"MYSQL_DATABASE": database,
|
|
||||||
},
|
|
||||||
BindMounts: map[string]string{
|
|
||||||
"/docker-entrypoint-initdb.d": testdata,
|
|
||||||
},
|
|
||||||
ExposedPorts: []string{"3306/tcp"},
|
|
||||||
WaitingFor: wait.ForListeningPort("3306/tcp"),
|
|
||||||
},
|
},
|
||||||
Started: true,
|
BindMounts: map[string]string{
|
||||||
|
"/docker-entrypoint-initdb.d": testdata,
|
||||||
|
},
|
||||||
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.ForLog("Buffer pool(s) load completed at"),
|
||||||
|
wait.ForListeningPort(nat.Port(port)),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
container, err := testcontainers.GenericContainer(ctx, req)
|
err = container.Start()
|
||||||
require.NoError(t, err, "starting container failed")
|
require.NoError(t, err, "failed to start container")
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, container.Terminate(ctx), "terminating container failed")
|
require.NoError(t, container.Terminate(), "terminating container failed")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Get the connection details from the container
|
|
||||||
addr, err = container.Host(ctx)
|
|
||||||
require.NoError(t, err, "getting container host address failed")
|
|
||||||
p, err := container.MappedPort(ctx, "3306/tcp")
|
|
||||||
require.NoError(t, err, "getting container host port failed")
|
|
||||||
port = p.Port()
|
|
||||||
|
|
||||||
// Define the testset
|
// Define the testset
|
||||||
var testset = []struct {
|
var testset = []struct {
|
||||||
name string
|
name string
|
||||||
|
|
@ -119,8 +103,13 @@ func TestMariaDB(t *testing.T) {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
// Setup the plugin-under-test
|
// Setup the plugin-under-test
|
||||||
plugin := &SQL{
|
plugin := &SQL{
|
||||||
Driver: "maria",
|
Driver: "maria",
|
||||||
Dsn: fmt.Sprintf("root:%s@tcp(%s:%s)/%s", passwd, addr, port, database),
|
Dsn: fmt.Sprintf("root:%s@tcp(%s:%s)/%s",
|
||||||
|
passwd,
|
||||||
|
container.Address,
|
||||||
|
container.Ports[port],
|
||||||
|
database,
|
||||||
|
),
|
||||||
Queries: tt.queries,
|
Queries: tt.queries,
|
||||||
Log: logger,
|
Log: logger,
|
||||||
}
|
}
|
||||||
|
|
@ -154,50 +143,35 @@ func TestPostgreSQL(t *testing.T) {
|
||||||
|
|
||||||
logger := testutil.Logger{}
|
logger := testutil.Logger{}
|
||||||
|
|
||||||
addr := "127.0.0.1"
|
|
||||||
port := "5432"
|
port := "5432"
|
||||||
passwd := ""
|
passwd := pwgen(32)
|
||||||
database := "foo"
|
database := "foo"
|
||||||
|
|
||||||
logger.Infof("Spinning up container...")
|
|
||||||
|
|
||||||
// Generate a random password
|
|
||||||
passwd = pwgen(32)
|
|
||||||
|
|
||||||
// Determine the test-data mountpoint
|
// Determine the test-data mountpoint
|
||||||
testdata, err := filepath.Abs("testdata/postgres")
|
testdata, err := filepath.Abs("testdata/postgres")
|
||||||
require.NoError(t, err, "determining absolute path of test-data failed")
|
require.NoError(t, err, "determining absolute path of test-data failed")
|
||||||
|
|
||||||
// Spin-up the container
|
container := testutil.Container{
|
||||||
ctx := context.Background()
|
Image: "postgres",
|
||||||
req := testcontainers.GenericContainerRequest{
|
ExposedPorts: []string{port},
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
Env: map[string]string{
|
||||||
Image: "postgres",
|
"POSTGRES_PASSWORD": passwd,
|
||||||
Env: map[string]string{
|
"POSTGRES_DB": database,
|
||||||
"POSTGRES_PASSWORD": passwd,
|
|
||||||
"POSTGRES_DB": database,
|
|
||||||
},
|
|
||||||
BindMounts: map[string]string{
|
|
||||||
"/docker-entrypoint-initdb.d": testdata,
|
|
||||||
},
|
|
||||||
ExposedPorts: []string{"5432/tcp"},
|
|
||||||
WaitingFor: wait.ForListeningPort("5432/tcp"),
|
|
||||||
},
|
},
|
||||||
Started: true,
|
BindMounts: map[string]string{
|
||||||
|
"/docker-entrypoint-initdb.d": testdata,
|
||||||
|
},
|
||||||
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.ForLog("database system is ready to accept connections"),
|
||||||
|
wait.ForListeningPort(nat.Port(port)),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
container, err := testcontainers.GenericContainer(ctx, req)
|
err = container.Start()
|
||||||
require.NoError(t, err, "starting container failed")
|
require.NoError(t, err, "failed to start container")
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, container.Terminate(ctx), "terminating container failed")
|
require.NoError(t, container.Terminate(), "terminating container failed")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Get the connection details from the container
|
|
||||||
addr, err = container.Host(ctx)
|
|
||||||
require.NoError(t, err, "getting container host address failed")
|
|
||||||
p, err := container.MappedPort(ctx, "5432/tcp")
|
|
||||||
require.NoError(t, err, "getting container host port failed")
|
|
||||||
port = p.Port()
|
|
||||||
|
|
||||||
// Define the testset
|
// Define the testset
|
||||||
var testset = []struct {
|
var testset = []struct {
|
||||||
name string
|
name string
|
||||||
|
|
@ -236,8 +210,13 @@ func TestPostgreSQL(t *testing.T) {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
// Setup the plugin-under-test
|
// Setup the plugin-under-test
|
||||||
plugin := &SQL{
|
plugin := &SQL{
|
||||||
Driver: "pgx",
|
Driver: "pgx",
|
||||||
Dsn: fmt.Sprintf("postgres://postgres:%v@%v:%v/%v", passwd, addr, port, database),
|
Dsn: fmt.Sprintf("postgres://postgres:%v@%v:%v/%v",
|
||||||
|
passwd,
|
||||||
|
container.Address,
|
||||||
|
container.Ports[port],
|
||||||
|
database,
|
||||||
|
),
|
||||||
Queries: tt.queries,
|
Queries: tt.queries,
|
||||||
Log: logger,
|
Log: logger,
|
||||||
}
|
}
|
||||||
|
|
@ -271,42 +250,31 @@ func TestClickHouse(t *testing.T) {
|
||||||
|
|
||||||
logger := testutil.Logger{}
|
logger := testutil.Logger{}
|
||||||
|
|
||||||
addr := "127.0.0.1"
|
|
||||||
port := "9000"
|
port := "9000"
|
||||||
user := "default"
|
user := "default"
|
||||||
|
|
||||||
logger.Infof("Spinning up container...")
|
|
||||||
|
|
||||||
// Determine the test-data mountpoint
|
// Determine the test-data mountpoint
|
||||||
testdata, err := filepath.Abs("testdata/clickhouse")
|
testdata, err := filepath.Abs("testdata/clickhouse")
|
||||||
require.NoError(t, err, "determining absolute path of test-data failed")
|
require.NoError(t, err, "determining absolute path of test-data failed")
|
||||||
|
|
||||||
// Spin-up the container
|
container := testutil.Container{
|
||||||
ctx := context.Background()
|
Image: "yandex/clickhouse-server",
|
||||||
req := testcontainers.GenericContainerRequest{
|
ExposedPorts: []string{port, "8123"},
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
BindMounts: map[string]string{
|
||||||
Image: "yandex/clickhouse-server",
|
"/docker-entrypoint-initdb.d": testdata,
|
||||||
BindMounts: map[string]string{
|
|
||||||
"/docker-entrypoint-initdb.d": testdata,
|
|
||||||
},
|
|
||||||
ExposedPorts: []string{"9000/tcp", "8123/tcp"},
|
|
||||||
WaitingFor: wait.NewHTTPStrategy("/").WithPort("8123/tcp"),
|
|
||||||
},
|
},
|
||||||
Started: true,
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.NewHTTPStrategy("/").WithPort(nat.Port("8123")),
|
||||||
|
wait.ForListeningPort(nat.Port(port)),
|
||||||
|
wait.ForLog("Saved preprocessed configuration to '/var/lib/clickhouse/preprocessed_configs/users.xml'"),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
container, err := testcontainers.GenericContainer(ctx, req)
|
err = container.Start()
|
||||||
require.NoError(t, err, "starting container failed")
|
require.NoError(t, err, "failed to start container")
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, container.Terminate(ctx), "terminating container failed")
|
require.NoError(t, container.Terminate(), "terminating container failed")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Get the connection details from the container
|
|
||||||
addr, err = container.Host(ctx)
|
|
||||||
require.NoError(t, err, "getting container host address failed")
|
|
||||||
p, err := container.MappedPort(ctx, "9000/tcp")
|
|
||||||
require.NoError(t, err, "getting container host port failed")
|
|
||||||
port = p.Port()
|
|
||||||
|
|
||||||
// Define the testset
|
// Define the testset
|
||||||
var testset = []struct {
|
var testset = []struct {
|
||||||
name string
|
name string
|
||||||
|
|
@ -345,8 +313,12 @@ func TestClickHouse(t *testing.T) {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
// Setup the plugin-under-test
|
// Setup the plugin-under-test
|
||||||
plugin := &SQL{
|
plugin := &SQL{
|
||||||
Driver: "clickhouse",
|
Driver: "clickhouse",
|
||||||
Dsn: fmt.Sprintf("tcp://%v:%v?username=%v", addr, port, user),
|
Dsn: fmt.Sprintf("tcp://%v:%v?username=%v",
|
||||||
|
container.Address,
|
||||||
|
container.Ports[port],
|
||||||
|
user,
|
||||||
|
),
|
||||||
Queries: tt.queries,
|
Queries: tt.queries,
|
||||||
Log: logger,
|
Log: logger,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,10 @@ func TestZookeeperGeneratesMetricsIntegration(t *testing.T) {
|
||||||
Env: map[string]string{
|
Env: map[string]string{
|
||||||
"ZOO_4LW_COMMANDS_WHITELIST": "mntr",
|
"ZOO_4LW_COMMANDS_WHITELIST": "mntr",
|
||||||
},
|
},
|
||||||
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
|
wait.ForLog("ZooKeeper audit is disabled."),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
err := container.Start()
|
err := container.Start()
|
||||||
require.NoError(t, err, "failed to start container")
|
require.NoError(t, err, "failed to start container")
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,16 @@
|
||||||
package mongodb
|
package mongodb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/go-connections/nat"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/testcontainers/testcontainers-go"
|
|
||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -20,34 +19,27 @@ func TestConnectAndWriteIntegrationNoAuth(t *testing.T) {
|
||||||
t.Skip("Skipping integration test in short mode")
|
t.Skip("Skipping integration test in short mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
req := testcontainers.GenericContainerRequest{
|
servicePort := "27017"
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
container := testutil.Container{
|
||||||
Image: "mongo",
|
Image: "mongo",
|
||||||
ExposedPorts: []string{"27017/tcp"},
|
ExposedPorts: []string{servicePort},
|
||||||
WaitingFor: wait.NewHTTPStrategy("/").WithPort("27017"),
|
WaitingFor: wait.ForAll(
|
||||||
},
|
wait.NewHTTPStrategy("/").WithPort(nat.Port(servicePort)),
|
||||||
Started: true,
|
wait.ForLog("Waiting for connections"),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
err := container.Start()
|
||||||
ctx := context.Background()
|
require.NoError(t, err, "failed to start container")
|
||||||
container, err := testcontainers.GenericContainer(ctx, req)
|
|
||||||
require.NoError(t, err, "starting container failed")
|
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, container.Terminate(ctx), "terminating container failed")
|
require.NoError(t, container.Terminate(), "terminating container failed")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
host, err := container.Host(ctx)
|
|
||||||
require.NoError(t, err, "getting container host address failed")
|
|
||||||
require.NotEmpty(t, host)
|
|
||||||
|
|
||||||
natPort, err := container.MappedPort(ctx, "27017/tcp")
|
|
||||||
require.NoError(t, err, "getting container host port failed")
|
|
||||||
port := natPort.Port()
|
|
||||||
require.NotEmpty(t, port)
|
|
||||||
|
|
||||||
// Run test
|
// Run test
|
||||||
plugin := &MongoDB{
|
plugin := &MongoDB{
|
||||||
Dsn: fmt.Sprintf("mongodb://localhost:%s", port),
|
Dsn: fmt.Sprintf("mongodb://%s:%s",
|
||||||
|
container.Address,
|
||||||
|
container.Ports[servicePort],
|
||||||
|
),
|
||||||
AuthenticationType: "NONE",
|
AuthenticationType: "NONE",
|
||||||
MetricDatabase: "telegraf_test",
|
MetricDatabase: "telegraf_test",
|
||||||
MetricGranularity: "seconds",
|
MetricGranularity: "seconds",
|
||||||
|
|
@ -68,34 +60,24 @@ func TestConnectAndWriteIntegrationSCRAMAuth(t *testing.T) {
|
||||||
initdb, err := filepath.Abs("testdata/auth_scram")
|
initdb, err := filepath.Abs("testdata/auth_scram")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
req := testcontainers.GenericContainerRequest{
|
servicePort := "27017"
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
container := testutil.Container{
|
||||||
Image: "mongo",
|
Image: "mongo",
|
||||||
BindMounts: map[string]string{
|
ExposedPorts: []string{servicePort},
|
||||||
"/docker-entrypoint-initdb.d": initdb,
|
BindMounts: map[string]string{
|
||||||
},
|
"/docker-entrypoint-initdb.d": initdb,
|
||||||
ExposedPorts: []string{"27017/tcp"},
|
|
||||||
WaitingFor: wait.NewHTTPStrategy("/").WithPort("27017"),
|
|
||||||
},
|
},
|
||||||
Started: true,
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.NewHTTPStrategy("/").WithPort(nat.Port(servicePort)),
|
||||||
|
wait.ForLog("Waiting for connections"),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
err = container.Start()
|
||||||
ctx := context.Background()
|
require.NoError(t, err, "failed to start container")
|
||||||
container, err := testcontainers.GenericContainer(ctx, req)
|
|
||||||
require.NoError(t, err, "starting container failed")
|
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, container.Terminate(ctx), "terminating container failed")
|
require.NoError(t, container.Terminate(), "terminating container failed")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
host, err := container.Host(ctx)
|
|
||||||
require.NoError(t, err, "getting container host address failed")
|
|
||||||
require.NotEmpty(t, host)
|
|
||||||
|
|
||||||
natPort, err := container.MappedPort(ctx, "27017/tcp")
|
|
||||||
require.NoError(t, err, "getting container host port failed")
|
|
||||||
port := natPort.Port()
|
|
||||||
require.NotEmpty(t, port)
|
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
plugin *MongoDB
|
plugin *MongoDB
|
||||||
|
|
@ -104,7 +86,8 @@ func TestConnectAndWriteIntegrationSCRAMAuth(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "success with scram authentication",
|
name: "success with scram authentication",
|
||||||
plugin: &MongoDB{
|
plugin: &MongoDB{
|
||||||
Dsn: fmt.Sprintf("mongodb://localhost:%s/admin", port),
|
Dsn: fmt.Sprintf("mongodb://%s:%s/admin",
|
||||||
|
container.Address, container.Ports[servicePort]),
|
||||||
AuthenticationType: "SCRAM",
|
AuthenticationType: "SCRAM",
|
||||||
Username: "root",
|
Username: "root",
|
||||||
Password: "changeme",
|
Password: "changeme",
|
||||||
|
|
@ -118,7 +101,8 @@ func TestConnectAndWriteIntegrationSCRAMAuth(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "fail with scram authentication bad password",
|
name: "fail with scram authentication bad password",
|
||||||
plugin: &MongoDB{
|
plugin: &MongoDB{
|
||||||
Dsn: fmt.Sprintf("mongodb://localhost:%s/admin", port),
|
Dsn: fmt.Sprintf("mongodb://%s:%s/admin",
|
||||||
|
container.Address, container.Ports[servicePort]),
|
||||||
AuthenticationType: "SCRAM",
|
AuthenticationType: "SCRAM",
|
||||||
Username: "root",
|
Username: "root",
|
||||||
Password: "root",
|
Password: "root",
|
||||||
|
|
@ -172,43 +156,33 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
|
||||||
serverpem, err := filepath.Abs(pki.ServerCertAndKeyPath())
|
serverpem, err := filepath.Abs(pki.ServerCertAndKeyPath())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
req := testcontainers.GenericContainerRequest{
|
servicePort := "27017"
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
container := testutil.Container{
|
||||||
Image: "mongo",
|
Image: "mongo",
|
||||||
BindMounts: map[string]string{
|
ExposedPorts: []string{servicePort},
|
||||||
"/docker-entrypoint-initdb.d": initdb,
|
BindMounts: map[string]string{
|
||||||
"/cacert.pem": cacert,
|
"/docker-entrypoint-initdb.d": initdb,
|
||||||
"/server.pem": serverpem,
|
"/cacert.pem": cacert,
|
||||||
},
|
"/server.pem": serverpem,
|
||||||
ExposedPorts: []string{"27017/tcp"},
|
|
||||||
Entrypoint: []string{
|
|
||||||
"docker-entrypoint.sh",
|
|
||||||
"--auth", "--setParameter", "authenticationMechanisms=MONGODB-X509",
|
|
||||||
"--tlsMode", "preferTLS",
|
|
||||||
"--tlsCAFile", "/cacert.pem",
|
|
||||||
"--tlsCertificateKeyFile", "/server.pem",
|
|
||||||
},
|
|
||||||
WaitingFor: wait.NewHTTPStrategy("/").WithPort("27017"),
|
|
||||||
},
|
},
|
||||||
Started: true,
|
Entrypoint: []string{
|
||||||
|
"docker-entrypoint.sh",
|
||||||
|
"--auth", "--setParameter", "authenticationMechanisms=MONGODB-X509",
|
||||||
|
"--tlsMode", "preferTLS",
|
||||||
|
"--tlsCAFile", "/cacert.pem",
|
||||||
|
"--tlsCertificateKeyFile", "/server.pem",
|
||||||
|
},
|
||||||
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.NewHTTPStrategy("/").WithPort(nat.Port(servicePort)),
|
||||||
|
wait.ForLog("Waiting for connections"),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
err = container.Start()
|
||||||
ctx := context.Background()
|
require.NoError(t, err, "failed to start container")
|
||||||
cont, err := testcontainers.GenericContainer(ctx, req)
|
|
||||||
require.NoError(t, err, "starting container failed")
|
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, cont.Terminate(ctx), "terminating container failed")
|
require.NoError(t, container.Terminate(), "terminating container failed")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
host, err := cont.Host(ctx)
|
|
||||||
require.NoError(t, err, "getting container host address failed")
|
|
||||||
require.NotEmpty(t, host)
|
|
||||||
|
|
||||||
natPort, err := cont.MappedPort(ctx, "27017/tcp")
|
|
||||||
require.NoError(t, err, "getting container host port failed")
|
|
||||||
port := natPort.Port()
|
|
||||||
require.NotEmpty(t, port)
|
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
plugin *MongoDB
|
plugin *MongoDB
|
||||||
|
|
@ -217,7 +191,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "success with x509 authentication",
|
name: "success with x509 authentication",
|
||||||
plugin: &MongoDB{
|
plugin: &MongoDB{
|
||||||
Dsn: fmt.Sprintf("mongodb://localhost:%s", port),
|
Dsn: fmt.Sprintf("mongodb://%s:%s",
|
||||||
|
container.Address, container.Ports[servicePort]),
|
||||||
AuthenticationType: "X509",
|
AuthenticationType: "X509",
|
||||||
MetricDatabase: "telegraf_test",
|
MetricDatabase: "telegraf_test",
|
||||||
MetricGranularity: "seconds",
|
MetricGranularity: "seconds",
|
||||||
|
|
@ -236,7 +211,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "success with x509 authentication using encrypted key file",
|
name: "success with x509 authentication using encrypted key file",
|
||||||
plugin: &MongoDB{
|
plugin: &MongoDB{
|
||||||
Dsn: fmt.Sprintf("mongodb://localhost:%s", port),
|
Dsn: fmt.Sprintf("mongodb://%s:%s",
|
||||||
|
container.Address, container.Ports[servicePort]),
|
||||||
AuthenticationType: "X509",
|
AuthenticationType: "X509",
|
||||||
MetricDatabase: "telegraf_test",
|
MetricDatabase: "telegraf_test",
|
||||||
MetricGranularity: "seconds",
|
MetricGranularity: "seconds",
|
||||||
|
|
@ -256,7 +232,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "success with x509 authentication missing ca and using insceure tls",
|
name: "success with x509 authentication missing ca and using insceure tls",
|
||||||
plugin: &MongoDB{
|
plugin: &MongoDB{
|
||||||
Dsn: fmt.Sprintf("mongodb://localhost:%s", port),
|
Dsn: fmt.Sprintf("mongodb://%s:%s",
|
||||||
|
container.Address, container.Ports[servicePort]),
|
||||||
AuthenticationType: "X509",
|
AuthenticationType: "X509",
|
||||||
MetricDatabase: "telegraf_test",
|
MetricDatabase: "telegraf_test",
|
||||||
MetricGranularity: "seconds",
|
MetricGranularity: "seconds",
|
||||||
|
|
@ -274,7 +251,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "fail with x509 authentication missing ca",
|
name: "fail with x509 authentication missing ca",
|
||||||
plugin: &MongoDB{
|
plugin: &MongoDB{
|
||||||
Dsn: fmt.Sprintf("mongodb://localhost:%s", port),
|
Dsn: fmt.Sprintf("mongodb://%s:%s",
|
||||||
|
container.Address, container.Ports[servicePort]),
|
||||||
AuthenticationType: "X509",
|
AuthenticationType: "X509",
|
||||||
MetricDatabase: "telegraf_test",
|
MetricDatabase: "telegraf_test",
|
||||||
MetricGranularity: "seconds",
|
MetricGranularity: "seconds",
|
||||||
|
|
@ -292,7 +270,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "fail with x509 authentication using encrypted key file",
|
name: "fail with x509 authentication using encrypted key file",
|
||||||
plugin: &MongoDB{
|
plugin: &MongoDB{
|
||||||
Dsn: fmt.Sprintf("mongodb://localhost:%s", port),
|
Dsn: fmt.Sprintf("mongodb://%s:%s",
|
||||||
|
container.Address, container.Ports[servicePort]),
|
||||||
AuthenticationType: "X509",
|
AuthenticationType: "X509",
|
||||||
MetricDatabase: "telegraf_test",
|
MetricDatabase: "telegraf_test",
|
||||||
MetricGranularity: "seconds",
|
MetricGranularity: "seconds",
|
||||||
|
|
@ -312,7 +291,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "fail with x509 authentication using invalid ca",
|
name: "fail with x509 authentication using invalid ca",
|
||||||
plugin: &MongoDB{
|
plugin: &MongoDB{
|
||||||
Dsn: fmt.Sprintf("mongodb://localhost:%s", port),
|
Dsn: fmt.Sprintf("mongodb://%s:%s",
|
||||||
|
container.Address, container.Ports[servicePort]),
|
||||||
AuthenticationType: "X509",
|
AuthenticationType: "X509",
|
||||||
MetricDatabase: "telegraf_test",
|
MetricDatabase: "telegraf_test",
|
||||||
MetricGranularity: "seconds",
|
MetricGranularity: "seconds",
|
||||||
|
|
@ -331,7 +311,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "fail with x509 authentication using invalid key",
|
name: "fail with x509 authentication using invalid key",
|
||||||
plugin: &MongoDB{
|
plugin: &MongoDB{
|
||||||
Dsn: fmt.Sprintf("mongodb://localhost:%s", port),
|
Dsn: fmt.Sprintf("mongodb://%s:%s",
|
||||||
|
container.Address, container.Ports[servicePort]),
|
||||||
AuthenticationType: "X509",
|
AuthenticationType: "X509",
|
||||||
MetricDatabase: "telegraf_test",
|
MetricDatabase: "telegraf_test",
|
||||||
MetricGranularity: "seconds",
|
MetricGranularity: "seconds",
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package sql
|
package sql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
|
@ -9,11 +8,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/go-connections/nat"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/testcontainers/testcontainers-go"
|
|
||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -171,40 +170,31 @@ func TestMysqlIntegration(t *testing.T) {
|
||||||
password := pwgen(32)
|
password := pwgen(32)
|
||||||
outDir := t.TempDir()
|
outDir := t.TempDir()
|
||||||
|
|
||||||
ctx := context.Background()
|
servicePort := "3306"
|
||||||
req := testcontainers.GenericContainerRequest{
|
container := testutil.Container{
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
Image: "mariadb",
|
||||||
Image: "mariadb",
|
Env: map[string]string{
|
||||||
Env: map[string]string{
|
"MARIADB_ROOT_PASSWORD": password,
|
||||||
"MARIADB_ROOT_PASSWORD": password,
|
|
||||||
},
|
|
||||||
BindMounts: map[string]string{
|
|
||||||
"/docker-entrypoint-initdb.d": initdb,
|
|
||||||
"/out": outDir,
|
|
||||||
},
|
|
||||||
ExposedPorts: []string{"3306/tcp"},
|
|
||||||
WaitingFor: wait.ForListeningPort("3306/tcp"),
|
|
||||||
},
|
},
|
||||||
Started: true,
|
BindMounts: map[string]string{
|
||||||
|
"/docker-entrypoint-initdb.d": initdb,
|
||||||
|
"/out": outDir,
|
||||||
|
},
|
||||||
|
ExposedPorts: []string{servicePort},
|
||||||
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
|
wait.ForLog("Buffer pool(s) load completed at"),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
mariadbContainer, err := testcontainers.GenericContainer(ctx, req)
|
err = container.Start()
|
||||||
require.NoError(t, err, "starting container failed")
|
require.NoError(t, err, "failed to start container")
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, mariadbContainer.Terminate(ctx), "terminating container failed")
|
require.NoError(t, container.Terminate(), "terminating container failed")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Get the connection details from the container
|
|
||||||
host, err := mariadbContainer.Host(ctx)
|
|
||||||
require.NoError(t, err, "getting container host address failed")
|
|
||||||
require.NotEmpty(t, host)
|
|
||||||
natPort, err := mariadbContainer.MappedPort(ctx, "3306/tcp")
|
|
||||||
require.NoError(t, err, "getting container host port failed")
|
|
||||||
port := natPort.Port()
|
|
||||||
require.NotEmpty(t, port)
|
|
||||||
|
|
||||||
//use the plugin to write to the database
|
//use the plugin to write to the database
|
||||||
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
|
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
|
||||||
username, password, host, port, dbname,
|
username, password, container.Address, container.Ports[servicePort], dbname,
|
||||||
)
|
)
|
||||||
p := newSQL()
|
p := newSQL()
|
||||||
p.Log = testutil.Logger{}
|
p.Log = testutil.Logger{}
|
||||||
|
|
@ -220,7 +210,7 @@ func TestMysqlIntegration(t *testing.T) {
|
||||||
|
|
||||||
//dump the database
|
//dump the database
|
||||||
var rc int
|
var rc int
|
||||||
rc, err = mariadbContainer.Exec(ctx, []string{
|
rc, err = container.Exec([]string{
|
||||||
"bash",
|
"bash",
|
||||||
"-c",
|
"-c",
|
||||||
"mariadb-dump --user=" + username +
|
"mariadb-dump --user=" + username +
|
||||||
|
|
@ -259,41 +249,32 @@ func TestPostgresIntegration(t *testing.T) {
|
||||||
password := pwgen(32)
|
password := pwgen(32)
|
||||||
outDir := t.TempDir()
|
outDir := t.TempDir()
|
||||||
|
|
||||||
ctx := context.Background()
|
servicePort := "5432"
|
||||||
req := testcontainers.GenericContainerRequest{
|
container := testutil.Container{
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
Image: "postgres",
|
||||||
Image: "postgres",
|
Env: map[string]string{
|
||||||
Env: map[string]string{
|
"POSTGRES_PASSWORD": password,
|
||||||
"POSTGRES_PASSWORD": password,
|
|
||||||
},
|
|
||||||
BindMounts: map[string]string{
|
|
||||||
"/docker-entrypoint-initdb.d": initdb,
|
|
||||||
"/out": outDir,
|
|
||||||
},
|
|
||||||
ExposedPorts: []string{"5432/tcp"},
|
|
||||||
WaitingFor: wait.ForListeningPort("5432/tcp"),
|
|
||||||
},
|
},
|
||||||
Started: true,
|
BindMounts: map[string]string{
|
||||||
|
"/docker-entrypoint-initdb.d": initdb,
|
||||||
|
"/out": outDir,
|
||||||
|
},
|
||||||
|
ExposedPorts: []string{servicePort},
|
||||||
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
|
wait.ForLog("database system is ready to accept connections"),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
cont, err := testcontainers.GenericContainer(ctx, req)
|
err = container.Start()
|
||||||
require.NoError(t, err, "starting container failed")
|
require.NoError(t, err, "failed to start container")
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, cont.Terminate(ctx), "terminating container failed")
|
require.NoError(t, container.Terminate(), "terminating container failed")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Get the connection details from the container
|
|
||||||
host, err := cont.Host(ctx)
|
|
||||||
require.NoError(t, err, "getting container host address failed")
|
|
||||||
require.NotEmpty(t, host)
|
|
||||||
natPort, err := cont.MappedPort(ctx, "5432/tcp")
|
|
||||||
require.NoError(t, err, "getting container host port failed")
|
|
||||||
port := natPort.Port()
|
|
||||||
require.NotEmpty(t, port)
|
|
||||||
|
|
||||||
//use the plugin to write to the database
|
//use the plugin to write to the database
|
||||||
// host, port, username, password, dbname
|
// host, port, username, password, dbname
|
||||||
address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
|
address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
|
||||||
username, password, host, port, dbname,
|
username, password, container.Address, container.Ports[servicePort], dbname,
|
||||||
)
|
)
|
||||||
p := newSQL()
|
p := newSQL()
|
||||||
p.Log = testutil.Logger{}
|
p.Log = testutil.Logger{}
|
||||||
|
|
@ -311,7 +292,7 @@ func TestPostgresIntegration(t *testing.T) {
|
||||||
//dump the database
|
//dump the database
|
||||||
//psql -u postgres
|
//psql -u postgres
|
||||||
var rc int
|
var rc int
|
||||||
rc, err = cont.Exec(ctx, []string{
|
rc, err = container.Exec([]string{
|
||||||
"bash",
|
"bash",
|
||||||
"-c",
|
"-c",
|
||||||
"pg_dump" +
|
"pg_dump" +
|
||||||
|
|
@ -359,37 +340,30 @@ func TestClickHouseIntegration(t *testing.T) {
|
||||||
|
|
||||||
outDir := t.TempDir()
|
outDir := t.TempDir()
|
||||||
|
|
||||||
ctx := context.Background()
|
servicePort := "9000"
|
||||||
req := testcontainers.GenericContainerRequest{
|
container := testutil.Container{
|
||||||
ContainerRequest: testcontainers.ContainerRequest{
|
Image: "yandex/clickhouse-server",
|
||||||
Image: "yandex/clickhouse-server",
|
ExposedPorts: []string{servicePort, "8123"},
|
||||||
BindMounts: map[string]string{
|
BindMounts: map[string]string{
|
||||||
"/docker-entrypoint-initdb.d": initdb,
|
"/docker-entrypoint-initdb.d": initdb,
|
||||||
"/out": outDir,
|
"/out": outDir,
|
||||||
},
|
|
||||||
ExposedPorts: []string{"9000/tcp", "8123/tcp"},
|
|
||||||
WaitingFor: wait.NewHTTPStrategy("/").WithPort("8123/tcp"),
|
|
||||||
},
|
},
|
||||||
Started: true,
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.NewHTTPStrategy("/").WithPort(nat.Port("8123")),
|
||||||
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
|
wait.ForLog("Saved preprocessed configuration to '/var/lib/clickhouse/preprocessed_configs/users.xml'"),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
cont, err := testcontainers.GenericContainer(ctx, req)
|
err = container.Start()
|
||||||
require.NoError(t, err, "starting container failed")
|
require.NoError(t, err, "failed to start container")
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, cont.Terminate(ctx), "terminating container failed")
|
require.NoError(t, container.Terminate(), "terminating container failed")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Get the connection details from the container
|
|
||||||
host, err := cont.Host(ctx)
|
|
||||||
require.NoError(t, err, "getting container host address failed")
|
|
||||||
require.NotEmpty(t, host)
|
|
||||||
natPort, err := cont.MappedPort(ctx, "9000/tcp")
|
|
||||||
require.NoError(t, err, "getting container host port failed")
|
|
||||||
port := natPort.Port()
|
|
||||||
require.NotEmpty(t, port)
|
|
||||||
|
|
||||||
//use the plugin to write to the database
|
//use the plugin to write to the database
|
||||||
// host, port, username, password, dbname
|
// host, port, username, password, dbname
|
||||||
address := fmt.Sprintf("tcp://%v:%v?username=%v&database=%v", host, port, username, dbname)
|
address := fmt.Sprintf("tcp://%v:%v?username=%v&database=%v",
|
||||||
|
container.Address, container.Ports[servicePort], username, dbname)
|
||||||
p := newSQL()
|
p := newSQL()
|
||||||
p.Log = testutil.Logger{}
|
p.Log = testutil.Logger{}
|
||||||
p.Driver = "clickhouse"
|
p.Driver = "clickhouse"
|
||||||
|
|
@ -410,7 +384,7 @@ func TestClickHouseIntegration(t *testing.T) {
|
||||||
// dump the database
|
// dump the database
|
||||||
var rc int
|
var rc int
|
||||||
for _, testMetric := range testMetrics {
|
for _, testMetric := range testMetrics {
|
||||||
rc, err = cont.Exec(ctx, []string{
|
rc, err = container.Exec([]string{
|
||||||
"bash",
|
"bash",
|
||||||
"-c",
|
"-c",
|
||||||
"clickhouse-client" +
|
"clickhouse-client" +
|
||||||
|
|
|
||||||
|
|
@ -114,6 +114,10 @@ func (c *Container) LookupMappedPorts() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Container) Exec(cmds []string) (int, error) {
|
||||||
|
return c.container.Exec(c.ctx, cmds)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Container) PrintLogs() {
|
func (c *Container) PrintLogs() {
|
||||||
fmt.Println("--- Container Logs Start ---")
|
fmt.Println("--- Container Logs Start ---")
|
||||||
for _, msg := range c.Logs.Msgs {
|
for _, msg := range c.Logs.Msgs {
|
||||||
|
|
@ -123,15 +127,17 @@ func (c *Container) PrintLogs() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Container) Terminate() error {
|
func (c *Container) Terminate() error {
|
||||||
err := c.container.Terminate(c.ctx)
|
err := c.container.StopLogProducer()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.container.Terminate(c.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("failed to terminate the container: %s", err)
|
fmt.Printf("failed to terminate the container: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// this needs to happen after the container is terminated otherwise there
|
|
||||||
// is a huge time penalty on the order of 50% increase in test time
|
|
||||||
_ = c.container.StopLogProducer()
|
|
||||||
c.PrintLogs()
|
c.PrintLogs()
|
||||||
|
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue