test: migrate kafka to testcontainers (#11206)

This commit is contained in:
Joshua Powers 2022-06-02 07:47:44 -06:00 committed by GitHub
parent 6699e114a4
commit 6e021b26ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 53 additions and 10 deletions

View File

@ -1,11 +1,15 @@
package kafka
import (
"context"
"fmt"
"testing"
"time"
"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
@ -23,7 +27,54 @@ func TestConnectAndWriteIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
brokers := []string{testutil.GetLocalHost() + ":9092"}
ctx := context.Background()
networkName := "kafka-test-network"
net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
require.NoError(t, err)
defer func() {
require.NoError(t, net.Remove(ctx), "terminating network failed")
}()
zookeeper := testutil.Container{
Image: "wurstmeister/zookeeper",
ExposedPorts: []string{"2181:2181"},
Networks: []string{networkName},
WaitingFor: wait.ForLog("binding to port"),
Name: "telegraf-test-zookeeper",
}
err = zookeeper.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, zookeeper.Terminate(), "terminating container failed")
}()
container := testutil.Container{
Image: "wurstmeister/kafka",
ExposedPorts: []string{"9092:9092"},
Env: map[string]string{
"KAFKA_ADVERTISED_HOST_NAME": "localhost",
"KAFKA_ADVERTISED_PORT": "9092",
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("telegraf-test-zookeeper:%s", zookeeper.Ports["2181"]),
},
Networks: []string{networkName},
WaitingFor: wait.ForLog("[KafkaServer id=1001] started"),
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
brokers := []string{
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
}
s, _ := serializers.NewInfluxSerializer()
k := &Kafka{
Brokers: brokers,
@ -33,7 +84,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
// Verify that we can connect to the Kafka broker
err := k.Init()
err = k.Init()
require.NoError(t, err)
err = k.Connect()
require.NoError(t, err)
@ -45,10 +96,6 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
func TestTopicSuffixesIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
topic := "Test"
m := testutil.TestMetric(1)
@ -92,10 +139,6 @@ func TestTopicSuffixesIntegration(t *testing.T) {
}
func TestValidateTopicSuffixMethodIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
err := ValidateTopicSuffixMethod("invalid_topic_suffix_method")
require.Error(t, err, "Topic suffix method used should be invalid.")