From 494f558b4e44925c440db451e44907bddcf500ce Mon Sep 17 00:00:00 2001 From: reimda Date: Thu, 3 Nov 2022 07:01:22 -0600 Subject: [PATCH] feat: Add exponential backoff when connecting or reconnecting and allow plugin to start without making initial connection (#12111) --- plugins/common/kafka/config.go | 58 +++- plugins/common/kafka/config_test.go | 22 ++ plugins/inputs/kafka_consumer/README.md | 35 +++ .../inputs/kafka_consumer/kafka_consumer.go | 58 +++- .../kafka_consumer/kafka_consumer_test.go | 274 ++++++++++++------ plugins/inputs/kafka_consumer/sample.conf | 26 ++ plugins/outputs/kafka/kafka.go | 2 +- 7 files changed, 367 insertions(+), 108 deletions(-) create mode 100644 plugins/common/kafka/config_test.go diff --git a/plugins/common/kafka/config.go b/plugins/common/kafka/config.go index 1790b289e..69e10b777 100644 --- a/plugins/common/kafka/config.go +++ b/plugins/common/kafka/config.go @@ -1,8 +1,14 @@ package kafka import ( + "fmt" + "math" + "strings" + "time" + "github.com/Shopify/sarama" "github.com/influxdata/telegraf" + tgConf "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/tls" ) @@ -12,10 +18,10 @@ type ReadConfig struct { } // SetConfig on the sarama.Config object from the ReadConfig struct. -func (k *ReadConfig) SetConfig(config *sarama.Config) error { +func (k *ReadConfig) SetConfig(config *sarama.Config, log telegraf.Logger) error { config.Consumer.Return.Errors = true - return k.Config.SetConfig(config) + return k.Config.SetConfig(config, log) } // WriteConfig for kafka clients meaning to write to kafka @@ -29,7 +35,7 @@ type WriteConfig struct { } // SetConfig on the sarama.Config object from the WriteConfig struct. -func (k *WriteConfig) SetConfig(config *sarama.Config) error { +func (k *WriteConfig) SetConfig(config *sarama.Config, log telegraf.Logger) error { config.Producer.Return.Successes = true config.Producer.Idempotent = k.IdempotentWrites config.Producer.Retry.Max = k.MaxRetry @@ -40,7 +46,7 @@ func (k *WriteConfig) SetConfig(config *sarama.Config) error { if config.Producer.Idempotent { config.Net.MaxOpenRequests = 1 } - return k.Config.SetConfig(config) + return k.Config.SetConfig(config, log) } // Config common to all Kafka clients. @@ -53,14 +59,29 @@ type Config struct { CompressionCodec int `toml:"compression_codec"` EnableTLS *bool `toml:"enable_tls"` - Log telegraf.Logger `toml:"-"` + MetadataRetryMax int `toml:"metadata_retry_max"` + MetadataRetryType string `toml:"metadata_retry_type"` + MetadataRetryBackoff tgConf.Duration `toml:"metadata_retry_backoff"` + MetadataRetryMaxDuration tgConf.Duration `toml:"metadata_retry_max_duration"` // Disable full metadata fetching MetadataFull *bool `toml:"metadata_full"` } +type BackoffFunc func(retries, maxRetries int) time.Duration + +func makeBackoffFunc(backoff, maxDuration time.Duration) BackoffFunc { + return func(retries, maxRetries int) time.Duration { + d := time.Duration(math.Pow(2, float64(retries))) * backoff + if maxDuration != 0 && d > maxDuration { + return maxDuration + } + return d + } +} + // SetConfig on the sarama.Config object from the Config struct. -func (k *Config) SetConfig(config *sarama.Config) error { +func (k *Config) SetConfig(config *sarama.Config, log telegraf.Logger) error { if k.Version != "" { version, err := sarama.ParseKafkaVersion(k.Version) if err != nil { @@ -102,5 +123,30 @@ func (k *Config) SetConfig(config *sarama.Config) error { config.Metadata.Full = *k.MetadataFull } + if k.MetadataRetryMax != 0 { + config.Metadata.Retry.Max = k.MetadataRetryMax + } + + if k.MetadataRetryBackoff != 0 { + // If config.Metadata.Retry.BackoffFunc is set, sarama ignores + // config.Metadata.Retry.Backoff + config.Metadata.Retry.Backoff = time.Duration(k.MetadataRetryBackoff) + } + + switch strings.ToLower(k.MetadataRetryType) { + default: + return fmt.Errorf("invalid metadata retry type") + case "exponential": + if k.MetadataRetryBackoff == 0 { + k.MetadataRetryBackoff = tgConf.Duration(250 * time.Millisecond) + log.Warnf("metadata_retry_backoff is 0, using %s", time.Duration(k.MetadataRetryBackoff)) + } + config.Metadata.Retry.BackoffFunc = makeBackoffFunc( + time.Duration(k.MetadataRetryBackoff), + time.Duration(k.MetadataRetryMaxDuration), + ) + case "constant", "": + } + return k.SetSASLConfig(config) } diff --git a/plugins/common/kafka/config_test.go b/plugins/common/kafka/config_test.go new file mode 100644 index 000000000..07989c45f --- /dev/null +++ b/plugins/common/kafka/config_test.go @@ -0,0 +1,22 @@ +package kafka + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBackoffFunc(t *testing.T) { + b := 250 * time.Millisecond + max := 1100 * time.Millisecond + + f := makeBackoffFunc(b, max) + require.Equal(t, b, f(0, 0)) + require.Equal(t, b*2, f(1, 0)) + require.Equal(t, b*4, f(2, 0)) + require.Equal(t, max, f(3, 0)) // would be 2000 but that's greater than max + + f = makeBackoffFunc(b, 0) // max = 0 means no max + require.Equal(t, b*8, f(3, 0)) // with no max, it's 2000 +} diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index f2a190003..c1ac1a544 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -90,6 +90,32 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky". # balance_strategy = "range" + ## Maximum number of retries for metadata operations including + ## connecting. Sets Sarama library's Metadata.Retry.Max config value. If 0 or + ## unset, use the Sarama default of 3, + # metadata_retry_max = 0 + + ## Type of retry backoff. Valid options: "constant", "exponential" + # metadata_retry_type = "constant" + + ## Amount of time to wait before retrying. When metadata_retry_type is + ## "constant", each retry is delayed this amount. When "exponential", the + ## first retry is delayed this amount, and subsequent delays are doubled. If 0 + ## or unset, use the Sarama default of 250 ms + # metadata_retry_backoff = 0 + + ## Maximum amount of time to wait before retrying when metadata_retry_type is + ## "exponential". Ignored for other retry types. If 0, there is no backoff + ## limit. + # metadata_retry_max_duration = 0 + + ## Strategy for making connection to kafka brokers. Valid options: "startup", + ## "defer". If set to "defer" the plugin is allowed to start before making a + ## connection. This is useful if the broker may be down when telegraf is + ## started, but if there are any typos in the broker setting, they will cause + ## connection failures without warning at startup + # connection_strategy = "startup" + ## Maximum length of a message to consume, in bytes (default 0/unlimited); ## larger messages are dropped max_message_len = 1000000 @@ -130,3 +156,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. [kafka]: https://kafka.apache.org [kafka_consumer_legacy]: /plugins/inputs/kafka_consumer_legacy/README.md [input data formats]: /docs/DATA_FORMATS_INPUT.md + +## Metrics + +The plugin accepts arbitrary input and parses it according to the `data_format` +setting. There is no predefined metric format. + +## Example Output + +There is no predefined metric format, so output depends on plugin input. diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 454cb9160..b80fdeb7f 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -43,6 +43,7 @@ type KafkaConsumer struct { Topics []string `toml:"topics"` TopicTag string `toml:"topic_tag"` ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` + ConnectionStrategy string `toml:"connection_strategy"` kafka.ReadConfig @@ -97,7 +98,7 @@ func (k *KafkaConsumer) Init() error { // Kafka version 0.10.2.0 is required for consumer groups. cfg.Version = sarama.V0_10_2_0 - if err := k.SetConfig(cfg); err != nil { + if err := k.SetConfig(cfg, k.Log); err != nil { return fmt.Errorf("SetConfig: %w", err) } @@ -131,28 +132,67 @@ func (k *KafkaConsumer) Init() error { cfg.Consumer.Fetch.Default = int32(k.ConsumerFetchDefault) } + switch strings.ToLower(k.ConnectionStrategy) { + default: + return fmt.Errorf("invalid connection strategy %q", k.ConnectionStrategy) + case "defer", "startup", "": + } + k.config = cfg return nil } -func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { +func (k *KafkaConsumer) create() error { var err error k.consumer, err = k.ConsumerCreator.Create( k.Brokers, k.ConsumerGroup, k.config, ) - if err != nil { - return fmt.Errorf("create consumer: %w", err) - } + + return err +} + +func (k *KafkaConsumer) startErrorAdder(acc telegraf.Accumulator) { + k.wg.Add(1) + go func() { + defer k.wg.Done() + for err := range k.consumer.Errors() { + acc.AddError(fmt.Errorf("channel: %w", err)) + } + }() +} + +func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { + var err error ctx, cancel := context.WithCancel(context.Background()) k.cancel = cancel + if k.ConnectionStrategy != "defer" { + err = k.create() + if err != nil { + return fmt.Errorf("create consumer: %w", err) + } + k.startErrorAdder(acc) + } + // Start consumer goroutine k.wg.Add(1) go func() { + var err error defer k.wg.Done() + + if k.consumer == nil { + err = k.create() + if err != nil { + acc.AddError(fmt.Errorf("create consumer async: %w", err)) + return + } + } + + k.startErrorAdder(acc) + for ctx.Err() == nil { handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log) handler.MaxMessageLen = k.MaxMessageLen @@ -171,14 +211,6 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { } }() - k.wg.Add(1) - go func() { - defer k.wg.Done() - for err := range k.consumer.Errors() { - acc.AddError(fmt.Errorf("channel: %w", err)) - } - }() - return nil } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 872e81efc..b79f40c40 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -3,6 +3,8 @@ package kafka_consumer import ( "context" "fmt" + "math" + "net" "testing" "time" @@ -226,8 +228,7 @@ func TestStartStop(t *testing.T) { require.NoError(t, err) var acc testutil.Accumulator - err = plugin.Start(&acc) - require.NoError(t, err) + require.NoError(t, plugin.Start(&acc)) plugin.Stop() } @@ -474,108 +475,205 @@ func TestKafkaRoundTripIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - t.Logf("rt: starting network") - ctx := context.Background() - networkName := "telegraf-test-kafka-consumer-network" - net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ - NetworkRequest: testcontainers.NetworkRequest{ - Name: networkName, - Attachable: true, - CheckDuplicate: true, - }, - }) + var tests = []struct { + name string + connectionStrategy string + }{ + {"connection strategy startup", "startup"}, + {"connection strategy defer", "defer"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Logf("rt: starting network") + ctx := context.Background() + networkName := "telegraf-test-kafka-consumer-network" + net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ + NetworkRequest: testcontainers.NetworkRequest{ + Name: networkName, + Attachable: true, + CheckDuplicate: true, + }, + }) + require.NoError(t, err) + defer func() { + require.NoError(t, net.Remove(ctx), "terminating network failed") + }() + + t.Logf("rt: starting zookeeper") + zookeeperName := "telegraf-test-kafka-consumer-zookeeper" + zookeeper := testutil.Container{ + Image: "wurstmeister/zookeeper", + ExposedPorts: []string{"2181:2181"}, + Networks: []string{networkName}, + WaitingFor: wait.ForLog("binding to port"), + Name: zookeeperName, + } + require.NoError(t, zookeeper.Start(), "failed to start container") + defer func() { + require.NoError(t, zookeeper.Terminate(), "terminating container failed") + }() + + t.Logf("rt: starting broker") + topic := "Test" + container := testutil.Container{ + Name: "telegraf-test-kafka-consumer", + Image: "wurstmeister/kafka", + ExposedPorts: []string{"9092:9092"}, + Env: map[string]string{ + "KAFKA_ADVERTISED_HOST_NAME": "localhost", + "KAFKA_ADVERTISED_PORT": "9092", + "KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]), + "KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", topic), + }, + Networks: []string{networkName}, + WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"), + } + require.NoError(t, container.Start(), "failed to start container") + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + brokers := []string{ + fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]), + } + + // Make kafka output + t.Logf("rt: starting output plugin") + creator := outputs.Outputs["kafka"] + output, ok := creator().(*kafkaOutput.Kafka) + require.True(t, ok) + + s, _ := serializers.NewInfluxSerializer() + output.SetSerializer(s) + output.Brokers = brokers + output.Topic = topic + output.Log = testutil.Logger{} + + require.NoError(t, output.Init()) + require.NoError(t, output.Connect()) + + // Make kafka input + t.Logf("rt: starting input plugin") + input := KafkaConsumer{ + Brokers: brokers, + Log: testutil.Logger{}, + Topics: []string{topic}, + MaxUndeliveredMessages: 1, + ConnectionStrategy: tt.connectionStrategy, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + input.SetParser(parser) + require.NoError(t, input.Init()) + + acc := testutil.Accumulator{} + require.NoError(t, input.Start(&acc)) + + // Shove some metrics through + expected := testutil.MockMetrics() + t.Logf("rt: writing") + require.NoError(t, output.Write(expected)) + + // Check that they were received + t.Logf("rt: expecting") + acc.Wait(len(expected)) + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) + + t.Logf("rt: shutdown") + require.NoError(t, output.Close()) + input.Stop() + + t.Logf("rt: done") + }) + } +} + +func TestExponentialBackoff(t *testing.T) { + var err error + + backoff := 10 * time.Millisecond + max := 3 + + // get an unused port by listening on next available port, then closing it + listener, err := net.Listen("tcp", ":0") require.NoError(t, err) - defer func() { - require.NoError(t, net.Remove(ctx), "terminating network failed") - }() - - t.Logf("rt: starting zookeeper") - zookeeperName := "telegraf-test-kafka-consumer-zookeeper" - zookeeper := testutil.Container{ - Image: "wurstmeister/zookeeper", - ExposedPorts: []string{"2181:2181"}, - Networks: []string{networkName}, - WaitingFor: wait.ForLog("binding to port"), - Name: zookeeperName, - } - err = zookeeper.Start() - require.NoError(t, err, "failed to start container") - defer func() { - require.NoError(t, zookeeper.Terminate(), "terminating container failed") - }() - - t.Logf("rt: starting broker") - topic := "Test" - container := testutil.Container{ - Name: "telegraf-test-kafka-consumer", - Image: "wurstmeister/kafka", - ExposedPorts: []string{"9092:9092"}, - Env: map[string]string{ - "KAFKA_ADVERTISED_HOST_NAME": "localhost", - "KAFKA_ADVERTISED_PORT": "9092", - "KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]), - "KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", topic), - }, - Networks: []string{networkName}, - WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"), - } - err = container.Start() - require.NoError(t, err, "failed to start container") - defer func() { - require.NoError(t, container.Terminate(), "terminating container failed") - }() + port := listener.Addr().(*net.TCPAddr).Port + require.NoError(t, listener.Close()) + // try to connect to kafka on that unused port brokers := []string{ - fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]), + fmt.Sprintf("localhost:%d", port), } - // Make kafka output - t.Logf("rt: starting output plugin") - creator := outputs.Outputs["kafka"] - output, ok := creator().(*kafkaOutput.Kafka) - require.True(t, ok) - - s, _ := serializers.NewInfluxSerializer() - output.SetSerializer(s) - output.Brokers = brokers - output.Topic = topic - output.Log = testutil.Logger{} - - require.NoError(t, output.Init()) - require.NoError(t, output.Connect()) - - // Make kafka input - t.Logf("rt: starting input plugin") input := KafkaConsumer{ Brokers: brokers, Log: testutil.Logger{}, - Topics: []string{topic}, + Topics: []string{"topic"}, MaxUndeliveredMessages: 1, + + ReadConfig: kafka.ReadConfig{ + Config: kafka.Config{ + MetadataRetryMax: max, + MetadataRetryBackoff: config.Duration(backoff), + MetadataRetryType: "exponential", + }, + }, } parser := &influx.Parser{} - err = parser.Init() - require.NoError(t, err) + require.NoError(t, parser.Init()) input.SetParser(parser) - err = input.Init() - require.NoError(t, err) + + //time how long initialization (connection) takes + start := time.Now() + require.NoError(t, input.Init()) acc := testutil.Accumulator{} - err = input.Start(&acc) - require.NoError(t, err) + require.Error(t, input.Start(&acc)) + elapsed := time.Since(start) + t.Logf("elapsed %d", elapsed) - // Shove some metrics through - expected := testutil.MockMetrics() - t.Logf("rt: writing") - require.NoError(t, output.Write(expected)) + var expectedRetryDuration time.Duration + for i := 0; i < max; i++ { + expectedRetryDuration += backoff * time.Duration(math.Pow(2, float64(i))) + } + t.Logf("expected > %d", expectedRetryDuration) - // Check that they were received - t.Logf("rt: expecting") - acc.Wait(len(expected)) - testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) + // Other than the expected retry delay, initializing and starting the + // plugin, including initializing a sarama consumer takes some time. + // + // It would be nice to check that the actual time is within an expected + // range, but we don't know how long the non-retry time should be. + // + // For now, just check that elapsed time isn't shorter than we expect the + // retry delays to be + require.GreaterOrEqual(t, elapsed, expectedRetryDuration) - t.Logf("rt: shutdown") - require.NoError(t, output.Close()) input.Stop() - - t.Logf("rt: done") +} + +func TestExponentialBackoffDefault(t *testing.T) { + input := KafkaConsumer{ + Brokers: []string{"broker"}, + Log: testutil.Logger{}, + Topics: []string{"topic"}, + MaxUndeliveredMessages: 1, + + ReadConfig: kafka.ReadConfig{ + Config: kafka.Config{ + MetadataRetryType: "exponential", + }, + }, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + input.SetParser(parser) + + require.NoError(t, input.Init()) + + // We don't need to start the plugin here since we're only testing + // initialization + + // if input.MetadataRetryBackoff isn't set, it should be 250 ms + require.Equal(t, input.MetadataRetryBackoff, config.Duration(250*time.Millisecond)) } diff --git a/plugins/inputs/kafka_consumer/sample.conf b/plugins/inputs/kafka_consumer/sample.conf index 89a26f0ba..1c06a65e0 100644 --- a/plugins/inputs/kafka_consumer/sample.conf +++ b/plugins/inputs/kafka_consumer/sample.conf @@ -70,6 +70,32 @@ ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky". # balance_strategy = "range" + ## Maximum number of retries for metadata operations including + ## connecting. Sets Sarama library's Metadata.Retry.Max config value. If 0 or + ## unset, use the Sarama default of 3, + # metadata_retry_max = 0 + + ## Type of retry backoff. Valid options: "constant", "exponential" + # metadata_retry_type = "constant" + + ## Amount of time to wait before retrying. When metadata_retry_type is + ## "constant", each retry is delayed this amount. When "exponential", the + ## first retry is delayed this amount, and subsequent delays are doubled. If 0 + ## or unset, use the Sarama default of 250 ms + # metadata_retry_backoff = 0 + + ## Maximum amount of time to wait before retrying when metadata_retry_type is + ## "exponential". Ignored for other retry types. If 0, there is no backoff + ## limit. + # metadata_retry_max_duration = 0 + + ## Strategy for making connection to kafka brokers. Valid options: "startup", + ## "defer". If set to "defer" the plugin is allowed to start before making a + ## connection. This is useful if the broker may be down when telegraf is + ## started, but if there are any typos in the broker setting, they will cause + ## connection failures without warning at startup + # connection_strategy = "startup" + ## Maximum length of a message to consume, in bytes (default 0/unlimited); ## larger messages are dropped max_message_len = 1000000 diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index af978aa30..a7b384175 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -145,7 +145,7 @@ func (k *Kafka) Init() error { } config := sarama.NewConfig() - if err := k.SetConfig(config); err != nil { + if err := k.SetConfig(config, k.Log); err != nil { return err }