chore: add kafka round trip integration test using inputs.kafka_consumer and outputs.kafka (#12058)

This commit is contained in:
reimda 2022-10-25 07:50:59 -06:00 committed by GitHub
parent 284edccf92
commit 7d9f09ddc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 118 additions and 5 deletions

View File

@ -98,7 +98,7 @@ func (k *KafkaConsumer) Init() error {
cfg.Version = sarama.V0_10_2_0
if err := k.SetConfig(cfg); err != nil {
return err
return fmt.Errorf("SetConfig: %w", err)
}
switch strings.ToLower(k.Offset) {
@ -143,7 +143,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
k.config,
)
if err != nil {
return err
return fmt.Errorf("create consumer: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
@ -159,7 +159,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
handler.TopicTag = k.TopicTag
err := k.consumer.Consume(ctx, k.Topics, handler)
if err != nil {
acc.AddError(err)
acc.AddError(fmt.Errorf("consume: %w", err))
// Ignore returned error as we cannot do anything about it anyway
//nolint:errcheck,revive
internal.SleepContext(ctx, reconnectDelay)
@ -167,7 +167,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
}
err = k.consumer.Close()
if err != nil {
acc.AddError(err)
acc.AddError(fmt.Errorf("close: %w", err))
}
}()
@ -175,7 +175,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
go func() {
defer k.wg.Done()
for err := range k.consumer.Errors() {
acc.AddError(err)
acc.AddError(fmt.Errorf("channel: %w", err))
}
}()

View File

@ -2,17 +2,24 @@ package kafka_consumer
import (
"context"
"fmt"
"testing"
"time"
"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
kafkaOutput "github.com/influxdata/telegraf/plugins/outputs/kafka"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
)
@ -461,3 +468,109 @@ func TestConsumerGroupHandler_Handle(t *testing.T) {
})
}
}
func TestKafkaRoundTripIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
t.Logf("rt: starting network")
ctx := context.Background()
networkName := "telegraf-test-kafka-consumer-network"
net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
require.NoError(t, err)
defer func() {
require.NoError(t, net.Remove(ctx), "terminating network failed")
}()
t.Logf("rt: starting zookeeper")
zookeeperName := "telegraf-test-kafka-consumer-zookeeper"
zookeeper := testutil.Container{
Image: "wurstmeister/zookeeper",
ExposedPorts: []string{"2181:2181"},
Networks: []string{networkName},
WaitingFor: wait.ForLog("binding to port"),
Name: zookeeperName,
}
err = zookeeper.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, zookeeper.Terminate(), "terminating container failed")
}()
t.Logf("rt: starting broker")
topic := "Test"
container := testutil.Container{
Name: "telegraf-test-kafka-consumer",
Image: "wurstmeister/kafka",
ExposedPorts: []string{"9092:9092"},
Env: map[string]string{
"KAFKA_ADVERTISED_HOST_NAME": "localhost",
"KAFKA_ADVERTISED_PORT": "9092",
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]),
"KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", topic),
},
Networks: []string{networkName},
WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"),
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
brokers := []string{
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
}
// Make kafka output
t.Logf("rt: starting output plugin")
creator := outputs.Outputs["kafka"]
output, ok := creator().(*kafkaOutput.Kafka)
require.True(t, ok)
s, _ := serializers.NewInfluxSerializer()
output.SetSerializer(s)
output.Brokers = brokers
output.Topic = topic
output.Log = testutil.Logger{}
require.NoError(t, output.Init())
require.NoError(t, output.Connect())
// Make kafka input
t.Logf("rt: starting input plugin")
input := KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: []string{topic},
MaxUndeliveredMessages: 1,
}
parser := &influx.Parser{}
parser.Init()
input.SetParser(parser)
err = input.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
err = input.Start(&acc)
require.NoError(t, err)
// Shove some metrics through
expected := testutil.MockMetrics()
t.Logf("rt: writing")
require.NoError(t, output.Write(expected))
// Check that they were received
t.Logf("rt: expecting")
acc.Wait(len(expected))
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
t.Logf("rt: done")
}