From 4c4b821aaa22cc48dc91412c10d2ef8e3fb9c7e3 Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Fri, 17 Nov 2023 02:47:18 -0700 Subject: [PATCH] test(kafka): Use dynamic ports with integration tests (#14301) --- go.mod | 7 +-- go.sum | 17 +++--- .../kafka_consumer/kafka_consumer_test.go | 54 +++---------------- plugins/outputs/kafka/kafka_test.go | 45 ++++------------ 4 files changed, 31 insertions(+), 92 deletions(-) diff --git a/go.mod b/go.mod index 7b45f6c04..96f433509 100644 --- a/go.mod +++ b/go.mod @@ -165,7 +165,7 @@ require ( github.com/safchain/ethtool v0.3.0 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/sensu/sensu-go/api/core/v2 v2.16.0 - github.com/shirou/gopsutil/v3 v3.23.9 + github.com/shirou/gopsutil/v3 v3.23.10 github.com/showwin/speedtest-go v1.6.7 github.com/signalfx/golib/v3 v3.3.53 github.com/sirupsen/logrus v1.9.3 @@ -176,6 +176,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62 github.com/testcontainers/testcontainers-go v0.26.0 + github.com/testcontainers/testcontainers-go/modules/kafka v0.26.1-0.20231116140448-68d5f8983d09 github.com/thomasklein94/packer-plugin-libvirt v0.5.0 github.com/tidwall/gjson v1.14.4 github.com/tinylib/msgp v1.1.8 @@ -298,8 +299,8 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.5.0 // indirect - github.com/eapache/go-resiliency v1.3.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect + github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/echlebek/timeproxy v1.0.0 // indirect github.com/emicklei/go-restful/v3 v3.10.2 // indirect diff --git a/go.sum b/go.sum index 2094117a3..51b479b14 100644 --- a/go.sum +++ b/go.sum @@ -710,6 +710,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/IBM/nzgo/v12 v12.0.9-0.20231115043259-49c27f2dfe48 h1:TBb4IxmBH0ssmWTUg0C6c9ZnfDmZospTF8f+YbHnbbA= github.com/IBM/nzgo/v12 v12.0.9-0.20231115043259-49c27f2dfe48/go.mod h1:4pvfEkfsrAdqlljsp8HNwv/uzNKy2fzoXBB1aRIssJg= +github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= +github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= @@ -1088,10 +1090,10 @@ github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQx github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/dynatrace-oss/dynatrace-metric-utils-go v0.5.0 h1:wHGPJSXvwKQVf/XfhjUPyrhpcPKWNy8F3ikH+eiwoBg= github.com/dynatrace-oss/dynatrace-metric-utils-go v0.5.0/go.mod h1:PseHFo8Leko7J4A/TfZ6kkHdkzKBLUta6hRZR/OEbbc= -github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= -github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= -github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM= -github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/echlebek/crock v1.0.1 h1:KbzamClMIfVIkkjq/GTXf+N16KylYBpiaTitO3f1ujg= @@ -2077,8 +2079,8 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod github.com/sensu/sensu-go/api/core/v2 v2.16.0 h1:HOq4rFkQ1S5ZjxmMTLc5J5mAbECrnKWvtXXbMqr3j9s= github.com/sensu/sensu-go/api/core/v2 v2.16.0/go.mod h1:MjM7+MCGEyTAgaZ589SiGHwYiaYF7N/58dU0J070u/0= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= -github.com/shirou/gopsutil/v3 v3.23.9 h1:ZI5bWVeu2ep4/DIxB4U9okeYJ7zp/QLTO4auRb/ty/E= -github.com/shirou/gopsutil/v3 v3.23.9/go.mod h1:x/NWSb71eMcjFIO0vhyGW5nZ7oSIgVjrCnADckb85GA= +github.com/shirou/gopsutil/v3 v3.23.10 h1:/N42opWlYzegYaVkWejXWJpbzKv2JDy3mrgGzKsh9hM= +github.com/shirou/gopsutil/v3 v3.23.10/go.mod h1:JIE26kpucQi+innVlAUnIEOSBhBUkirr5b44yr55+WE= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= @@ -2172,6 +2174,8 @@ github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62/go.mod h1:qUzPVl github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955uh98WQvzOm0dgAeLnf2O0Rz0LPoC5ze+0= github.com/testcontainers/testcontainers-go v0.26.0 h1:uqcYdoOHBy1ca7gKODfBd9uTHVK3a7UL848z09MVZ0c= github.com/testcontainers/testcontainers-go v0.26.0/go.mod h1:ICriE9bLX5CLxL9OFQ2N+2N+f+803LNJ1utJb1+Inx0= +github.com/testcontainers/testcontainers-go/modules/kafka v0.26.1-0.20231116140448-68d5f8983d09 h1:jqohCgCKphLrxHl6crzKJbmlmo8GYUNpTiw/Ib+AFLo= +github.com/testcontainers/testcontainers-go/modules/kafka v0.26.1-0.20231116140448-68d5f8983d09/go.mod h1:MBqGe6sHltLHRmjk1K1axtIboCjjATh3+oZObcWYFMg= github.com/thomasklein94/packer-plugin-libvirt v0.5.0 h1:aj2HLHZZM/ClGLIwVp9rrgh+2TOU/w4EiaZHAwCpOgs= github.com/thomasklein94/packer-plugin-libvirt v0.5.0/go.mod h1:GwN82FQ6KxCNKtS8LNUgLbwTZs90GGhBzCmTNkrTCrY= github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM= @@ -2703,7 +2707,6 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 2498b35ad..2f45b4f41 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -11,7 +11,7 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/wait" + kafkacontainer "github.com/testcontainers/testcontainers-go/modules/kafka" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" @@ -484,58 +484,20 @@ func TestKafkaRoundTripIntegration(t *testing.T) { }{ {"connection strategy startup", "startup", []string{"Test"}, nil, config.Duration(0)}, {"connection strategy defer", "defer", []string{"Test"}, nil, config.Duration(0)}, - {"topic regexp", "startup", nil, []string{"T*"}, config.Duration(5 * time.Second)}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - t.Logf("rt: starting network") ctx := context.Background() - networkName := "telegraf-test-kafka-consumer-network" - network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ - NetworkRequest: testcontainers.NetworkRequest{ - Name: networkName, - Attachable: true, - CheckDuplicate: true, - }, - }) + kafkaContainer, err := kafkacontainer.RunContainer(ctx, + kafkacontainer.WithClusterID("test-cluster"), + testcontainers.WithImage("confluentinc/confluent-local:7.5.0"), + ) require.NoError(t, err) - defer func() { - require.NoError(t, network.Remove(ctx), "terminating network failed") - }() + defer kafkaContainer.Terminate(ctx) //nolint:errcheck // ignored - t.Logf("rt: starting zookeeper") - zookeeperName := "telegraf-test-kafka-consumer-zookeeper" - zookeeper := testutil.Container{ - Image: "wurstmeister/zookeeper", - ExposedPorts: []string{"2181:2181"}, - Networks: []string{networkName}, - WaitingFor: wait.ForLog("binding to port"), - Name: zookeeperName, - } - require.NoError(t, zookeeper.Start(), "failed to start container") - defer zookeeper.Terminate() - - t.Logf("rt: starting broker") - container := testutil.Container{ - Name: "telegraf-test-kafka-consumer", - Image: "wurstmeister/kafka", - ExposedPorts: []string{"9092:9092"}, - Env: map[string]string{ - "KAFKA_ADVERTISED_HOST_NAME": "localhost", - "KAFKA_ADVERTISED_PORT": "9092", - "KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]), - "KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", "Test"), - }, - Networks: []string{networkName}, - WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"), - } - require.NoError(t, container.Start(), "failed to start container") - defer container.Terminate() - - brokers := []string{ - fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]), - } + brokers, err := kafkaContainer.Brokers(ctx) + require.NoError(t, err) // Make kafka output t.Logf("rt: starting output plugin") diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index 57608811f..5a23aeefd 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -6,10 +6,9 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/docker/go-connections/nat" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/wait" + kafkacontainer "github.com/testcontainers/testcontainers-go/modules/kafka" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" @@ -28,45 +27,19 @@ func TestConnectAndWriteIntegration(t *testing.T) { } ctx := context.Background() - t.Log("creating test network") - networkName := "telegraf-test-output-kafka-network" - network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ - NetworkRequest: testcontainers.NetworkRequest{ - Name: networkName, - Attachable: true, - CheckDuplicate: true, - }, - }) + kafkaContainer, err := kafkacontainer.RunContainer(ctx, + kafkacontainer.WithClusterID("test-cluster"), + testcontainers.WithImage("confluentinc/confluent-local:7.5.0"), + ) require.NoError(t, err) - defer func() { - require.NoError(t, network.Remove(ctx), "terminating network failed") - }() + defer kafkaContainer.Terminate(ctx) //nolint:errcheck // ignored - // Start the container as broker AND controller - container := testutil.Container{ - Image: "bitnami/kafka", - Hostname: "localhost", // required to be able to resolve the name - Networks: []string{networkName}, - ExposedPorts: []string{"9092:9092", "9093:9093"}, - Env: map[string]string{ - "KAFKA_CFG_NODE_ID": "0", - "KAFKA_CFG_PROCESS_ROLES": "controller,broker", - "KAFKA_CFG_LISTENERS": "PLAINTEXT://:9092,CONTROLLER://:9093", - "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT", - "KAFKA_CFG_CONTROLLER_QUORUM_VOTERS": "0@localhost:9093", - "KAFKA_CFG_CONTROLLER_LISTENER_NAMES": "CONTROLLER", - }, - WaitingFor: wait.ForAll( - wait.ForListeningPort(nat.Port("9092")), - wait.ForLog("Kafka Server started"), - ), - } - require.NoError(t, container.Start(), "failed to start container") - defer container.Terminate() + brokers, err := kafkaContainer.Brokers(ctx) + require.NoError(t, err) // Setup the plugin plugin := &Kafka{ - Brokers: []string{container.Address + ":" + container.Ports["9092"]}, + Brokers: brokers, Topic: "Test", Log: testutil.Logger{}, producerFunc: sarama.NewSyncProducer,