test(kafka): Use dynamic ports with integration tests (#14301)

This commit is contained in:
Joshua Powers 2023-11-17 02:47:18 -07:00 committed by GitHub
parent 64f56b4cb5
commit 4c4b821aaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 31 additions and 92 deletions

7
go.mod
View File

@ -165,7 +165,7 @@ require (
github.com/safchain/ethtool v0.3.0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/sensu/sensu-go/api/core/v2 v2.16.0
github.com/shirou/gopsutil/v3 v3.23.9
github.com/shirou/gopsutil/v3 v3.23.10
github.com/showwin/speedtest-go v1.6.7
github.com/signalfx/golib/v3 v3.3.53
github.com/sirupsen/logrus v1.9.3
@ -176,6 +176,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62
github.com/testcontainers/testcontainers-go v0.26.0
github.com/testcontainers/testcontainers-go/modules/kafka v0.26.1-0.20231116140448-68d5f8983d09
github.com/thomasklein94/packer-plugin-libvirt v0.5.0
github.com/tidwall/gjson v1.14.4
github.com/tinylib/msgp v1.1.8
@ -298,8 +299,8 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/echlebek/timeproxy v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.10.2 // indirect

17
go.sum
View File

@ -710,6 +710,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/IBM/nzgo/v12 v12.0.9-0.20231115043259-49c27f2dfe48 h1:TBb4IxmBH0ssmWTUg0C6c9ZnfDmZospTF8f+YbHnbbA=
github.com/IBM/nzgo/v12 v12.0.9-0.20231115043259-49c27f2dfe48/go.mod h1:4pvfEkfsrAdqlljsp8HNwv/uzNKy2fzoXBB1aRIssJg=
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI=
@ -1088,10 +1090,10 @@ github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQx
github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/dynatrace-oss/dynatrace-metric-utils-go v0.5.0 h1:wHGPJSXvwKQVf/XfhjUPyrhpcPKWNy8F3ikH+eiwoBg=
github.com/dynatrace-oss/dynatrace-metric-utils-go v0.5.0/go.mod h1:PseHFo8Leko7J4A/TfZ6kkHdkzKBLUta6hRZR/OEbbc=
github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0=
github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM=
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0=
github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/echlebek/crock v1.0.1 h1:KbzamClMIfVIkkjq/GTXf+N16KylYBpiaTitO3f1ujg=
@ -2077,8 +2079,8 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod
github.com/sensu/sensu-go/api/core/v2 v2.16.0 h1:HOq4rFkQ1S5ZjxmMTLc5J5mAbECrnKWvtXXbMqr3j9s=
github.com/sensu/sensu-go/api/core/v2 v2.16.0/go.mod h1:MjM7+MCGEyTAgaZ589SiGHwYiaYF7N/58dU0J070u/0=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil/v3 v3.23.9 h1:ZI5bWVeu2ep4/DIxB4U9okeYJ7zp/QLTO4auRb/ty/E=
github.com/shirou/gopsutil/v3 v3.23.9/go.mod h1:x/NWSb71eMcjFIO0vhyGW5nZ7oSIgVjrCnADckb85GA=
github.com/shirou/gopsutil/v3 v3.23.10 h1:/N42opWlYzegYaVkWejXWJpbzKv2JDy3mrgGzKsh9hM=
github.com/shirou/gopsutil/v3 v3.23.10/go.mod h1:JIE26kpucQi+innVlAUnIEOSBhBUkirr5b44yr55+WE=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
@ -2172,6 +2174,8 @@ github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62/go.mod h1:qUzPVl
github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955uh98WQvzOm0dgAeLnf2O0Rz0LPoC5ze+0=
github.com/testcontainers/testcontainers-go v0.26.0 h1:uqcYdoOHBy1ca7gKODfBd9uTHVK3a7UL848z09MVZ0c=
github.com/testcontainers/testcontainers-go v0.26.0/go.mod h1:ICriE9bLX5CLxL9OFQ2N+2N+f+803LNJ1utJb1+Inx0=
github.com/testcontainers/testcontainers-go/modules/kafka v0.26.1-0.20231116140448-68d5f8983d09 h1:jqohCgCKphLrxHl6crzKJbmlmo8GYUNpTiw/Ib+AFLo=
github.com/testcontainers/testcontainers-go/modules/kafka v0.26.1-0.20231116140448-68d5f8983d09/go.mod h1:MBqGe6sHltLHRmjk1K1axtIboCjjATh3+oZObcWYFMg=
github.com/thomasklein94/packer-plugin-libvirt v0.5.0 h1:aj2HLHZZM/ClGLIwVp9rrgh+2TOU/w4EiaZHAwCpOgs=
github.com/thomasklein94/packer-plugin-libvirt v0.5.0/go.mod h1:GwN82FQ6KxCNKtS8LNUgLbwTZs90GGhBzCmTNkrTCrY=
github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM=
@ -2703,7 +2707,6 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=

View File

@ -11,7 +11,7 @@ import (
"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
kafkacontainer "github.com/testcontainers/testcontainers-go/modules/kafka"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
@ -484,58 +484,20 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
}{
{"connection strategy startup", "startup", []string{"Test"}, nil, config.Duration(0)},
{"connection strategy defer", "defer", []string{"Test"}, nil, config.Duration(0)},
{"topic regexp", "startup", nil, []string{"T*"}, config.Duration(5 * time.Second)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Logf("rt: starting network")
ctx := context.Background()
networkName := "telegraf-test-kafka-consumer-network"
network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
kafkaContainer, err := kafkacontainer.RunContainer(ctx,
kafkacontainer.WithClusterID("test-cluster"),
testcontainers.WithImage("confluentinc/confluent-local:7.5.0"),
)
require.NoError(t, err)
defer func() {
require.NoError(t, network.Remove(ctx), "terminating network failed")
}()
defer kafkaContainer.Terminate(ctx) //nolint:errcheck // ignored
t.Logf("rt: starting zookeeper")
zookeeperName := "telegraf-test-kafka-consumer-zookeeper"
zookeeper := testutil.Container{
Image: "wurstmeister/zookeeper",
ExposedPorts: []string{"2181:2181"},
Networks: []string{networkName},
WaitingFor: wait.ForLog("binding to port"),
Name: zookeeperName,
}
require.NoError(t, zookeeper.Start(), "failed to start container")
defer zookeeper.Terminate()
t.Logf("rt: starting broker")
container := testutil.Container{
Name: "telegraf-test-kafka-consumer",
Image: "wurstmeister/kafka",
ExposedPorts: []string{"9092:9092"},
Env: map[string]string{
"KAFKA_ADVERTISED_HOST_NAME": "localhost",
"KAFKA_ADVERTISED_PORT": "9092",
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]),
"KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", "Test"),
},
Networks: []string{networkName},
WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
brokers := []string{
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
}
brokers, err := kafkaContainer.Brokers(ctx)
require.NoError(t, err)
// Make kafka output
t.Logf("rt: starting output plugin")

View File

@ -6,10 +6,9 @@ import (
"time"
"github.com/Shopify/sarama"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
kafkacontainer "github.com/testcontainers/testcontainers-go/modules/kafka"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
@ -28,45 +27,19 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
ctx := context.Background()
t.Log("creating test network")
networkName := "telegraf-test-output-kafka-network"
network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
kafkaContainer, err := kafkacontainer.RunContainer(ctx,
kafkacontainer.WithClusterID("test-cluster"),
testcontainers.WithImage("confluentinc/confluent-local:7.5.0"),
)
require.NoError(t, err)
defer func() {
require.NoError(t, network.Remove(ctx), "terminating network failed")
}()
defer kafkaContainer.Terminate(ctx) //nolint:errcheck // ignored
// Start the container as broker AND controller
container := testutil.Container{
Image: "bitnami/kafka",
Hostname: "localhost", // required to be able to resolve the name
Networks: []string{networkName},
ExposedPorts: []string{"9092:9092", "9093:9093"},
Env: map[string]string{
"KAFKA_CFG_NODE_ID": "0",
"KAFKA_CFG_PROCESS_ROLES": "controller,broker",
"KAFKA_CFG_LISTENERS": "PLAINTEXT://:9092,CONTROLLER://:9093",
"KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT",
"KAFKA_CFG_CONTROLLER_QUORUM_VOTERS": "0@localhost:9093",
"KAFKA_CFG_CONTROLLER_LISTENER_NAMES": "CONTROLLER",
},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port("9092")),
wait.ForLog("Kafka Server started"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
brokers, err := kafkaContainer.Brokers(ctx)
require.NoError(t, err)
// Setup the plugin
plugin := &Kafka{
Brokers: []string{container.Address + ":" + container.Ports["9092"]},
Brokers: brokers,
Topic: "Test",
Log: testutil.Logger{},
producerFunc: sarama.NewSyncProducer,