From 7d9f09ddc8436d173d611d86f23a2f05860d2138 Mon Sep 17 00:00:00 2001 From: reimda Date: Tue, 25 Oct 2022 07:50:59 -0600 Subject: [PATCH] chore: add kafka round trip integration test using inputs.kafka_consumer and outputs.kafka (#12058) --- .../inputs/kafka_consumer/kafka_consumer.go | 10 +- .../kafka_consumer/kafka_consumer_test.go | 113 ++++++++++++++++++ 2 files changed, 118 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index a72513997..454cb9160 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -98,7 +98,7 @@ func (k *KafkaConsumer) Init() error { cfg.Version = sarama.V0_10_2_0 if err := k.SetConfig(cfg); err != nil { - return err + return fmt.Errorf("SetConfig: %w", err) } switch strings.ToLower(k.Offset) { @@ -143,7 +143,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { k.config, ) if err != nil { - return err + return fmt.Errorf("create consumer: %w", err) } ctx, cancel := context.WithCancel(context.Background()) @@ -159,7 +159,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { handler.TopicTag = k.TopicTag err := k.consumer.Consume(ctx, k.Topics, handler) if err != nil { - acc.AddError(err) + acc.AddError(fmt.Errorf("consume: %w", err)) // Ignore returned error as we cannot do anything about it anyway //nolint:errcheck,revive internal.SleepContext(ctx, reconnectDelay) @@ -167,7 +167,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { } err = k.consumer.Close() if err != nil { - acc.AddError(err) + acc.AddError(fmt.Errorf("close: %w", err)) } }() @@ -175,7 +175,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { go func() { defer k.wg.Done() for err := range k.consumer.Errors() { - acc.AddError(err) + acc.AddError(fmt.Errorf("channel: %w", err)) } }() diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 6fcd15ff6..4cfd6e296 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -2,17 +2,24 @@ package kafka_consumer import ( "context" + "fmt" "testing" "time" "github.com/Shopify/sarama" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/outputs" + kafkaOutput "github.com/influxdata/telegraf/plugins/outputs/kafka" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/value" + "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" ) @@ -461,3 +468,109 @@ func TestConsumerGroupHandler_Handle(t *testing.T) { }) } } + +func TestKafkaRoundTripIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + t.Logf("rt: starting network") + ctx := context.Background() + networkName := "telegraf-test-kafka-consumer-network" + net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ + NetworkRequest: testcontainers.NetworkRequest{ + Name: networkName, + Attachable: true, + CheckDuplicate: true, + }, + }) + require.NoError(t, err) + defer func() { + require.NoError(t, net.Remove(ctx), "terminating network failed") + }() + + t.Logf("rt: starting zookeeper") + zookeeperName := "telegraf-test-kafka-consumer-zookeeper" + zookeeper := testutil.Container{ + Image: "wurstmeister/zookeeper", + ExposedPorts: []string{"2181:2181"}, + Networks: []string{networkName}, + WaitingFor: wait.ForLog("binding to port"), + Name: zookeeperName, + } + err = zookeeper.Start() + require.NoError(t, err, "failed to start container") + defer func() { + require.NoError(t, zookeeper.Terminate(), "terminating container failed") + }() + + t.Logf("rt: starting broker") + topic := "Test" + container := testutil.Container{ + Name: "telegraf-test-kafka-consumer", + Image: "wurstmeister/kafka", + ExposedPorts: []string{"9092:9092"}, + Env: map[string]string{ + "KAFKA_ADVERTISED_HOST_NAME": "localhost", + "KAFKA_ADVERTISED_PORT": "9092", + "KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]), + "KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", topic), + }, + Networks: []string{networkName}, + WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"), + } + err = container.Start() + require.NoError(t, err, "failed to start container") + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + brokers := []string{ + fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]), + } + + // Make kafka output + t.Logf("rt: starting output plugin") + creator := outputs.Outputs["kafka"] + output, ok := creator().(*kafkaOutput.Kafka) + require.True(t, ok) + + s, _ := serializers.NewInfluxSerializer() + output.SetSerializer(s) + output.Brokers = brokers + output.Topic = topic + output.Log = testutil.Logger{} + + require.NoError(t, output.Init()) + require.NoError(t, output.Connect()) + + // Make kafka input + t.Logf("rt: starting input plugin") + input := KafkaConsumer{ + Brokers: brokers, + Log: testutil.Logger{}, + Topics: []string{topic}, + MaxUndeliveredMessages: 1, + } + parser := &influx.Parser{} + parser.Init() + input.SetParser(parser) + err = input.Init() + require.NoError(t, err) + + acc := testutil.Accumulator{} + err = input.Start(&acc) + require.NoError(t, err) + + // Shove some metrics through + expected := testutil.MockMetrics() + t.Logf("rt: writing") + require.NoError(t, output.Write(expected)) + + // Check that they were received + t.Logf("rt: expecting") + acc.Wait(len(expected)) + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) + + t.Logf("rt: done") +}