test(outputs.kafka): Rework integration test to use bitnami/kafka (#14161)

This commit is contained in:
Sven Rebhan 2023-10-23 18:44:17 +02:00 committed by GitHub
parent 189e5a56cc
commit 062326eb48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 58 deletions

View File

@ -133,8 +133,6 @@ func (k *Kafka) Init() error {
return err return err
} }
k.saramaConfig = config
// Legacy support ssl config // Legacy support ssl config
if k.Certificate != "" { if k.Certificate != "" {
k.TLSCert = k.Certificate k.TLSCert = k.Certificate
@ -151,6 +149,7 @@ func (k *Kafka) Init() error {
} }
config.Net.Proxy.Dialer = dialer config.Net.Proxy.Dialer = dialer
} }
k.saramaConfig = config
return nil return nil
} }

View File

@ -1,14 +1,12 @@
package kafka package kafka
import ( import (
"context"
"fmt"
"testing" "testing"
"time" "time"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait" "github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -27,73 +25,47 @@ func TestConnectAndWriteIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
ctx := context.Background() // Start the container as broker AND controller
networkName := "kafka-test-network"
net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
require.NoError(t, err)
defer func() {
require.NoError(t, net.Remove(ctx), "terminating network failed")
}()
zookeeper := testutil.Container{
Image: "wurstmeister/zookeeper",
ExposedPorts: []string{"2181:2181"},
Networks: []string{networkName},
WaitingFor: wait.ForLog("binding to port"),
Name: "telegraf-test-zookeeper",
}
err = zookeeper.Start()
require.NoError(t, err, "failed to start container")
defer zookeeper.Terminate()
container := testutil.Container{ container := testutil.Container{
Image: "wurstmeister/kafka", Image: "bitnami/kafka",
ExposedPorts: []string{"9092:9092"}, Hostname: "localhost", // required to be able to resolve the name
ExposedPorts: []string{"9092:9092", "9093:9093"},
Env: map[string]string{ Env: map[string]string{
"KAFKA_ADVERTISED_HOST_NAME": "localhost", "KAFKA_CFG_NODE_ID": "0",
"KAFKA_ADVERTISED_PORT": "9092", "KAFKA_CFG_PROCESS_ROLES": "controller,broker",
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("telegraf-test-zookeeper:%s", zookeeper.Ports["2181"]), "KAFKA_CFG_LISTENERS": "PLAINTEXT://:9092,CONTROLLER://:9093",
"KAFKA_CREATE_TOPICS": "Test:1:1", "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT",
"KAFKA_CFG_CONTROLLER_QUORUM_VOTERS": "0@localhost:9093",
"KAFKA_CFG_CONTROLLER_LISTENER_NAMES": "CONTROLLER",
}, },
Networks: []string{networkName}, WaitingFor: wait.ForAll(
WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"), wait.ForListeningPort(nat.Port("9092")),
wait.ForLog("Kafka Server started"),
),
} }
err = container.Start() require.NoError(t, container.Start(), "failed to start container")
require.NoError(t, err, "failed to start container")
defer container.Terminate() defer container.Terminate()
brokers := []string{ // Setup the plugin
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]), plugin := &Kafka{
} Brokers: []string{container.Address + ":" + container.Ports["9092"]},
s := &influx.Serializer{}
require.NoError(t, s.Init())
k := &Kafka{
Brokers: brokers,
Topic: "Test", Topic: "Test",
Log: testutil.Logger{}, Log: testutil.Logger{},
serializer: s,
producerFunc: sarama.NewSyncProducer, producerFunc: sarama.NewSyncProducer,
} }
// Setup the metric serializer
s := &influx.Serializer{}
require.NoError(t, s.Init())
plugin.SetSerializer(s)
// Verify that we can connect to the Kafka broker // Verify that we can connect to the Kafka broker
err = k.Init() require.NoError(t, plugin.Init())
require.NoError(t, err) require.NoError(t, plugin.Connect())
err = k.Connect() defer plugin.Close()
require.NoError(t, err)
// Verify that we can successfully write data to the kafka broker // Verify that we can successfully write data to the kafka broker
err = k.Write(testutil.MockMetrics()) require.NoError(t, plugin.Write(testutil.MockMetrics()))
require.NoError(t, err)
err = k.Close()
require.NoError(t, err)
} }
func TestTopicSuffixes(t *testing.T) { func TestTopicSuffixes(t *testing.T) {

View File

@ -31,6 +31,7 @@ type Container struct {
Cmd []string Cmd []string
Image string Image string
Name string Name string
Hostname string
Networks []string Networks []string
WaitingFor wait.Strategy WaitingFor wait.Strategy
@ -60,6 +61,7 @@ func (c *Container) Start() error {
Cmd: c.Cmd, Cmd: c.Cmd,
Image: c.Image, Image: c.Image,
Name: c.Name, Name: c.Name,
Hostname: c.Hostname,
Networks: c.Networks, Networks: c.Networks,
WaitingFor: c.WaitingFor, WaitingFor: c.WaitingFor,
}, },