feat(inputs.kafka_consumer): Implement startup error behavior options (#15919)

This commit is contained in:
Sven Rebhan 2024-10-03 18:41:20 +02:00 committed by GitHub
parent 8313886920
commit cb571deb41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 306 additions and 85 deletions

View File

@ -23,6 +23,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
## Startup error behavior options <!-- @/docs/includes/startup_error_behavior.md -->
In addition to the plugin-specific and global configuration settings the plugin
supports options for specifying the behavior when experiencing startup errors
using the `startup_error_behavior` setting. Available values are:
- `error`: Telegraf with stop and exit in case of startup errors. This is the
default behavior.
- `ignore`: Telegraf will ignore startup errors for this plugin and disables it
but continues processing for all other plugins.
- `retry`: Telegraf will try to startup the plugin in every gather or write
cycle in case of startup errors. The plugin is disabled until
the startup succeeds.
## Secret-store support
This plugin supports secrets from secret-stores for the `sasl_username`,
@ -161,13 +175,6 @@ to use them.
## This list of hostnames then replaces the original address list.
## resolve_canonical_bootstrap_servers_only = false
## Strategy for making connection to kafka brokers. Valid options: "startup",
## "defer". If set to "defer" the plugin is allowed to start before making a
## connection. This is useful if the broker may be down when telegraf is
## started, but if there are any typos in the broker setting, they will cause
## connection failures without warning at startup
# connection_strategy = "startup"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000

View File

@ -4,6 +4,7 @@ package kafka_consumer
import (
"context"
_ "embed"
"errors"
"fmt"
"regexp"
"sort"
@ -51,7 +52,7 @@ type KafkaConsumer struct {
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
TimestampSource string `toml:"timestamp_source"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`
ConnectionStrategy string `toml:"connection_strategy" deprecated:"1.33.0;1.40.0;use 'startup_error_behavior' instead"`
ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"`
Log telegraf.Logger `toml:"-"`
kafka.ReadConfig
@ -307,7 +308,10 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
if k.ConnectionStrategy != "defer" {
err = k.create()
if err != nil {
return fmt.Errorf("create consumer: %w", err)
return &internal.StartupError{
Err: fmt.Errorf("create consumer: %w", err),
Retry: errors.Is(err, sarama.ErrOutOfBrokers),
}
}
k.startErrorAdder(acc)
}

View File

@ -10,11 +10,14 @@ import (
"github.com/IBM/sarama"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
kafkacontainer "github.com/testcontainers/testcontainers-go/modules/kafka"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
@ -291,7 +294,7 @@ func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {
return c.messages
}
func TestConsumerGroupHandler_Lifecycle(t *testing.T) {
func TestConsumerGroupHandlerLifecycle(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.Parser{
@ -325,7 +328,7 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) {
require.NoError(t, err)
}
func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) {
func TestConsumerGroupHandlerConsumeClaim(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.Parser{
MetricName: "cpu",
@ -376,7 +379,7 @@ func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) {
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
func TestConsumerGroupHandler_Handle(t *testing.T) {
func TestConsumerGroupHandlerHandle(t *testing.T) {
tests := []struct {
name string
maxMessageLen int
@ -471,6 +474,94 @@ func TestConsumerGroupHandler_Handle(t *testing.T) {
}
}
func TestExponentialBackoff(t *testing.T) {
var err error
backoff := 10 * time.Millisecond
limit := 3
// get an unused port by listening on next available port, then closing it
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
port := listener.Addr().(*net.TCPAddr).Port
require.NoError(t, listener.Close())
// try to connect to kafka on that unused port
brokers := []string{
fmt.Sprintf("localhost:%d", port),
}
input := KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: []string{"topic"},
MaxUndeliveredMessages: 1,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
MetadataRetryMax: limit,
MetadataRetryBackoff: config.Duration(backoff),
MetadataRetryType: "exponential",
},
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
// time how long initialization (connection) takes
start := time.Now()
require.NoError(t, input.Init())
acc := testutil.Accumulator{}
require.Error(t, input.Start(&acc))
elapsed := time.Since(start)
t.Logf("elapsed %d", elapsed)
var expectedRetryDuration time.Duration
for i := 0; i < limit; i++ {
expectedRetryDuration += backoff * time.Duration(math.Pow(2, float64(i)))
}
t.Logf("expected > %d", expectedRetryDuration)
// Other than the expected retry delay, initializing and starting the
// plugin, including initializing a sarama consumer takes some time.
//
// It would be nice to check that the actual time is within an expected
// range, but we don't know how long the non-retry time should be.
//
// For now, just check that elapsed time isn't shorter than we expect the
// retry delays to be
require.GreaterOrEqual(t, elapsed, expectedRetryDuration)
input.Stop()
}
func TestExponentialBackoffDefault(t *testing.T) {
input := KafkaConsumer{
Brokers: []string{"broker"},
Log: testutil.Logger{},
Topics: []string{"topic"},
MaxUndeliveredMessages: 1,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
MetadataRetryType: "exponential",
},
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
require.NoError(t, input.Init())
// We don't need to start the plugin here since we're only testing
// initialization
// if input.MetadataRetryBackoff isn't set, it should be 250 ms
require.Equal(t, input.MetadataRetryBackoff, config.Duration(250*time.Millisecond))
}
func TestKafkaRoundTripIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
@ -625,90 +716,216 @@ func TestKafkaTimestampSourceIntegration(t *testing.T) {
}
}
func TestExponentialBackoff(t *testing.T) {
var err error
backoff := 10 * time.Millisecond
limit := 3
// get an unused port by listening on next available port, then closing it
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
port := listener.Addr().(*net.TCPAddr).Port
require.NoError(t, listener.Close())
// try to connect to kafka on that unused port
brokers := []string{
fmt.Sprintf("localhost:%d", port),
func TestStartupErrorBehaviorErrorIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
input := KafkaConsumer{
// Startup the container
ctx := context.Background()
container, err := kafkacontainer.Run(ctx, "confluentinc/confluent-local:7.5.0")
require.NoError(t, err)
defer container.Terminate(ctx) //nolint:errcheck // ignored
brokers, err := container.Brokers(ctx)
require.NoError(t, err)
// Pause the container for simulating connectivity issues
containerID := container.GetContainerID()
provider, err := testcontainers.NewDockerProvider()
require.NoError(t, err)
require.NoError(t, provider.Client().ContainerPause(ctx, containerID))
//nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
defer provider.Client().ContainerUnpause(ctx, containerID)
// Setup the plugin and connect to the broker
plugin := &KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: []string{"topic"},
Topics: []string{"test"},
MaxUndeliveredMessages: 1,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
MetadataRetryMax: limit,
MetadataRetryBackoff: config.Duration(backoff),
MetadataRetryType: "exponential",
},
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
plugin.SetParser(parser)
// time how long initialization (connection) takes
start := time.Now()
require.NoError(t, input.Init())
// Create a model to be able to use the startup retry strategy
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "kafka_consumer",
Alias: "error-test",
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())
acc := testutil.Accumulator{}
require.Error(t, input.Start(&acc))
elapsed := time.Since(start)
t.Logf("elapsed %d", elapsed)
// Speed up test
plugin.config.Net.DialTimeout = 100 * time.Millisecond
plugin.config.Net.WriteTimeout = 100 * time.Millisecond
plugin.config.Net.ReadTimeout = 100 * time.Millisecond
var expectedRetryDuration time.Duration
for i := 0; i < limit; i++ {
expectedRetryDuration += backoff * time.Duration(math.Pow(2, float64(i)))
}
t.Logf("expected > %d", expectedRetryDuration)
// Other than the expected retry delay, initializing and starting the
// plugin, including initializing a sarama consumer takes some time.
//
// It would be nice to check that the actual time is within an expected
// range, but we don't know how long the non-retry time should be.
//
// For now, just check that elapsed time isn't shorter than we expect the
// retry delays to be
require.GreaterOrEqual(t, elapsed, expectedRetryDuration)
input.Stop()
// Starting the plugin will fail with an error because the container is paused.
var acc testutil.Accumulator
require.ErrorContains(t, model.Start(&acc), "client has run out of available brokers to talk to")
}
func TestExponentialBackoffDefault(t *testing.T) {
input := KafkaConsumer{
Brokers: []string{"broker"},
func TestStartupErrorBehaviorIgnoreIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Startup the container
ctx := context.Background()
container, err := kafkacontainer.Run(ctx, "confluentinc/confluent-local:7.5.0")
require.NoError(t, err)
defer container.Terminate(ctx) //nolint:errcheck // ignored
brokers, err := container.Brokers(ctx)
require.NoError(t, err)
// Pause the container for simulating connectivity issues
containerID := container.GetContainerID()
provider, err := testcontainers.NewDockerProvider()
require.NoError(t, err)
require.NoError(t, provider.Client().ContainerPause(ctx, containerID))
//nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
defer provider.Client().ContainerUnpause(ctx, containerID)
// Setup the plugin and connect to the broker
plugin := &KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: []string{"topic"},
Topics: []string{"test"},
MaxUndeliveredMessages: 1,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
MetadataRetryType: "exponential",
},
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
require.NoError(t, input.Init())
plugin.SetParser(parser)
// We don't need to start the plugin here since we're only testing
// initialization
// Create a model to be able to use the startup retry strategy
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "kafka_consumer",
Alias: "ignore-test",
StartupErrorBehavior: "ignore",
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())
// if input.MetadataRetryBackoff isn't set, it should be 250 ms
require.Equal(t, input.MetadataRetryBackoff, config.Duration(250*time.Millisecond))
// Speed up test
plugin.config.Net.DialTimeout = 100 * time.Millisecond
plugin.config.Net.WriteTimeout = 100 * time.Millisecond
plugin.config.Net.ReadTimeout = 100 * time.Millisecond
// Starting the plugin will fail because the container is paused.
// The model code should convert it to a fatal error for the agent to remove
// the plugin.
var acc testutil.Accumulator
err = model.Start(&acc)
require.ErrorContains(t, err, "client has run out of available brokers to talk to")
var fatalErr *internal.FatalError
require.ErrorAs(t, err, &fatalErr)
}
func TestStartupErrorBehaviorRetryIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Startup the container
ctx := context.Background()
container, err := kafkacontainer.Run(ctx, "confluentinc/confluent-local:7.5.0")
require.NoError(t, err)
defer container.Terminate(ctx) //nolint:errcheck // ignored
brokers, err := container.Brokers(ctx)
require.NoError(t, err)
// Pause the container for simulating connectivity issues
containerID := container.GetContainerID()
provider, err := testcontainers.NewDockerProvider()
require.NoError(t, err)
require.NoError(t, provider.Client().ContainerPause(ctx, containerID))
//nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
defer provider.Client().ContainerUnpause(ctx, containerID)
// Setup the plugin and connect to the broker
plugin := &KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: []string{"test"},
MaxUndeliveredMessages: 1,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Create a model to be able to use the startup retry strategy
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "kafka_consumer",
Alias: "retry-test",
StartupErrorBehavior: "retry",
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())
// Speed up test
plugin.config.Net.DialTimeout = 100 * time.Millisecond
plugin.config.Net.WriteTimeout = 100 * time.Millisecond
plugin.config.Net.ReadTimeout = 100 * time.Millisecond
// Starting the plugin will not fail but should retry to connect in every gather cycle
var acc testutil.Accumulator
require.NoError(t, model.Start(&acc))
require.EqualValues(t, 1, model.StartupErrors.Get())
// There should be no metrics as the plugin is not fully started up yet
require.Empty(t, acc.GetTelegrafMetrics())
require.ErrorIs(t, model.Gather(&acc), internal.ErrNotConnected)
require.Equal(t, int64(2), model.StartupErrors.Get())
// Unpause the container, now writes should succeed
require.NoError(t, provider.Client().ContainerUnpause(ctx, containerID))
require.NoError(t, model.Gather(&acc))
defer model.Stop()
require.Equal(t, int64(2), model.StartupErrors.Get())
// Setup a writer
creator := outputs.Outputs["kafka"]
output, ok := creator().(*kafkaOutput.Kafka)
require.True(t, ok)
s := &influxSerializer.Serializer{}
require.NoError(t, s.Init())
output.SetSerializer(s)
output.Brokers = brokers
output.Topic = "test"
output.Log = &testutil.Logger{}
require.NoError(t, output.Init())
require.NoError(t, output.Connect())
defer output.Close()
// Send some data to the broker so we have something to receive
metrics := []telegraf.Metric{
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42},
time.Unix(1704067200, 0),
),
}
require.NoError(t, output.Write(metrics))
// Verify that the metrics were actually written
require.Eventually(t, func() bool {
return acc.NMetrics() >= 1
}, 3*time.Second, 100*time.Millisecond)
testutil.RequireMetricsEqual(t, metrics, acc.GetTelegrafMetrics())
}

View File

@ -124,13 +124,6 @@
## This list of hostnames then replaces the original address list.
## resolve_canonical_bootstrap_servers_only = false
## Strategy for making connection to kafka brokers. Valid options: "startup",
## "defer". If set to "defer" the plugin is allowed to start before making a
## connection. This is useful if the broker may be down when telegraf is
## started, but if there are any typos in the broker setting, they will cause
## connection failures without warning at startup
# connection_strategy = "startup"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000