feat: Add exponential backoff when connecting or reconnecting and allow plugin to start without making initial connection (#12111)

This commit is contained in:
reimda 2022-11-03 07:01:22 -06:00 committed by GitHub
parent 415cfa8548
commit 494f558b4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 367 additions and 108 deletions

View File

@ -1,8 +1,14 @@
package kafka
import (
"fmt"
"math"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
tgConf "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls"
)
@ -12,10 +18,10 @@ type ReadConfig struct {
}
// SetConfig on the sarama.Config object from the ReadConfig struct.
func (k *ReadConfig) SetConfig(config *sarama.Config) error {
func (k *ReadConfig) SetConfig(config *sarama.Config, log telegraf.Logger) error {
config.Consumer.Return.Errors = true
return k.Config.SetConfig(config)
return k.Config.SetConfig(config, log)
}
// WriteConfig for kafka clients meaning to write to kafka
@ -29,7 +35,7 @@ type WriteConfig struct {
}
// SetConfig on the sarama.Config object from the WriteConfig struct.
func (k *WriteConfig) SetConfig(config *sarama.Config) error {
func (k *WriteConfig) SetConfig(config *sarama.Config, log telegraf.Logger) error {
config.Producer.Return.Successes = true
config.Producer.Idempotent = k.IdempotentWrites
config.Producer.Retry.Max = k.MaxRetry
@ -40,7 +46,7 @@ func (k *WriteConfig) SetConfig(config *sarama.Config) error {
if config.Producer.Idempotent {
config.Net.MaxOpenRequests = 1
}
return k.Config.SetConfig(config)
return k.Config.SetConfig(config, log)
}
// Config common to all Kafka clients.
@ -53,14 +59,29 @@ type Config struct {
CompressionCodec int `toml:"compression_codec"`
EnableTLS *bool `toml:"enable_tls"`
Log telegraf.Logger `toml:"-"`
MetadataRetryMax int `toml:"metadata_retry_max"`
MetadataRetryType string `toml:"metadata_retry_type"`
MetadataRetryBackoff tgConf.Duration `toml:"metadata_retry_backoff"`
MetadataRetryMaxDuration tgConf.Duration `toml:"metadata_retry_max_duration"`
// Disable full metadata fetching
MetadataFull *bool `toml:"metadata_full"`
}
type BackoffFunc func(retries, maxRetries int) time.Duration
func makeBackoffFunc(backoff, maxDuration time.Duration) BackoffFunc {
return func(retries, maxRetries int) time.Duration {
d := time.Duration(math.Pow(2, float64(retries))) * backoff
if maxDuration != 0 && d > maxDuration {
return maxDuration
}
return d
}
}
// SetConfig on the sarama.Config object from the Config struct.
func (k *Config) SetConfig(config *sarama.Config) error {
func (k *Config) SetConfig(config *sarama.Config, log telegraf.Logger) error {
if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
@ -102,5 +123,30 @@ func (k *Config) SetConfig(config *sarama.Config) error {
config.Metadata.Full = *k.MetadataFull
}
if k.MetadataRetryMax != 0 {
config.Metadata.Retry.Max = k.MetadataRetryMax
}
if k.MetadataRetryBackoff != 0 {
// If config.Metadata.Retry.BackoffFunc is set, sarama ignores
// config.Metadata.Retry.Backoff
config.Metadata.Retry.Backoff = time.Duration(k.MetadataRetryBackoff)
}
switch strings.ToLower(k.MetadataRetryType) {
default:
return fmt.Errorf("invalid metadata retry type")
case "exponential":
if k.MetadataRetryBackoff == 0 {
k.MetadataRetryBackoff = tgConf.Duration(250 * time.Millisecond)
log.Warnf("metadata_retry_backoff is 0, using %s", time.Duration(k.MetadataRetryBackoff))
}
config.Metadata.Retry.BackoffFunc = makeBackoffFunc(
time.Duration(k.MetadataRetryBackoff),
time.Duration(k.MetadataRetryMaxDuration),
)
case "constant", "":
}
return k.SetSASLConfig(config)
}

View File

@ -0,0 +1,22 @@
package kafka
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestBackoffFunc(t *testing.T) {
b := 250 * time.Millisecond
max := 1100 * time.Millisecond
f := makeBackoffFunc(b, max)
require.Equal(t, b, f(0, 0))
require.Equal(t, b*2, f(1, 0))
require.Equal(t, b*4, f(2, 0))
require.Equal(t, max, f(3, 0)) // would be 2000 but that's greater than max
f = makeBackoffFunc(b, 0) // max = 0 means no max
require.Equal(t, b*8, f(3, 0)) // with no max, it's 2000
}

View File

@ -90,6 +90,32 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
# balance_strategy = "range"
## Maximum number of retries for metadata operations including
## connecting. Sets Sarama library's Metadata.Retry.Max config value. If 0 or
## unset, use the Sarama default of 3,
# metadata_retry_max = 0
## Type of retry backoff. Valid options: "constant", "exponential"
# metadata_retry_type = "constant"
## Amount of time to wait before retrying. When metadata_retry_type is
## "constant", each retry is delayed this amount. When "exponential", the
## first retry is delayed this amount, and subsequent delays are doubled. If 0
## or unset, use the Sarama default of 250 ms
# metadata_retry_backoff = 0
## Maximum amount of time to wait before retrying when metadata_retry_type is
## "exponential". Ignored for other retry types. If 0, there is no backoff
## limit.
# metadata_retry_max_duration = 0
## Strategy for making connection to kafka brokers. Valid options: "startup",
## "defer". If set to "defer" the plugin is allowed to start before making a
## connection. This is useful if the broker may be down when telegraf is
## started, but if there are any typos in the broker setting, they will cause
## connection failures without warning at startup
# connection_strategy = "startup"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
@ -130,3 +156,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
[kafka]: https://kafka.apache.org
[kafka_consumer_legacy]: /plugins/inputs/kafka_consumer_legacy/README.md
[input data formats]: /docs/DATA_FORMATS_INPUT.md
## Metrics
The plugin accepts arbitrary input and parses it according to the `data_format`
setting. There is no predefined metric format.
## Example Output
There is no predefined metric format, so output depends on plugin input.

View File

@ -43,6 +43,7 @@ type KafkaConsumer struct {
Topics []string `toml:"topics"`
TopicTag string `toml:"topic_tag"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`
kafka.ReadConfig
@ -97,7 +98,7 @@ func (k *KafkaConsumer) Init() error {
// Kafka version 0.10.2.0 is required for consumer groups.
cfg.Version = sarama.V0_10_2_0
if err := k.SetConfig(cfg); err != nil {
if err := k.SetConfig(cfg, k.Log); err != nil {
return fmt.Errorf("SetConfig: %w", err)
}
@ -131,28 +132,67 @@ func (k *KafkaConsumer) Init() error {
cfg.Consumer.Fetch.Default = int32(k.ConsumerFetchDefault)
}
switch strings.ToLower(k.ConnectionStrategy) {
default:
return fmt.Errorf("invalid connection strategy %q", k.ConnectionStrategy)
case "defer", "startup", "":
}
k.config = cfg
return nil
}
func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
func (k *KafkaConsumer) create() error {
var err error
k.consumer, err = k.ConsumerCreator.Create(
k.Brokers,
k.ConsumerGroup,
k.config,
)
if err != nil {
return fmt.Errorf("create consumer: %w", err)
}
return err
}
func (k *KafkaConsumer) startErrorAdder(acc telegraf.Accumulator) {
k.wg.Add(1)
go func() {
defer k.wg.Done()
for err := range k.consumer.Errors() {
acc.AddError(fmt.Errorf("channel: %w", err))
}
}()
}
func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
var err error
ctx, cancel := context.WithCancel(context.Background())
k.cancel = cancel
if k.ConnectionStrategy != "defer" {
err = k.create()
if err != nil {
return fmt.Errorf("create consumer: %w", err)
}
k.startErrorAdder(acc)
}
// Start consumer goroutine
k.wg.Add(1)
go func() {
var err error
defer k.wg.Done()
if k.consumer == nil {
err = k.create()
if err != nil {
acc.AddError(fmt.Errorf("create consumer async: %w", err))
return
}
}
k.startErrorAdder(acc)
for ctx.Err() == nil {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
@ -171,14 +211,6 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
}
}()
k.wg.Add(1)
go func() {
defer k.wg.Done()
for err := range k.consumer.Errors() {
acc.AddError(fmt.Errorf("channel: %w", err))
}
}()
return nil
}

View File

@ -3,6 +3,8 @@ package kafka_consumer
import (
"context"
"fmt"
"math"
"net"
"testing"
"time"
@ -226,8 +228,7 @@ func TestStartStop(t *testing.T) {
require.NoError(t, err)
var acc testutil.Accumulator
err = plugin.Start(&acc)
require.NoError(t, err)
require.NoError(t, plugin.Start(&acc))
plugin.Stop()
}
@ -474,108 +475,205 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
t.Logf("rt: starting network")
ctx := context.Background()
networkName := "telegraf-test-kafka-consumer-network"
net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
var tests = []struct {
name string
connectionStrategy string
}{
{"connection strategy startup", "startup"},
{"connection strategy defer", "defer"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Logf("rt: starting network")
ctx := context.Background()
networkName := "telegraf-test-kafka-consumer-network"
net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
require.NoError(t, err)
defer func() {
require.NoError(t, net.Remove(ctx), "terminating network failed")
}()
t.Logf("rt: starting zookeeper")
zookeeperName := "telegraf-test-kafka-consumer-zookeeper"
zookeeper := testutil.Container{
Image: "wurstmeister/zookeeper",
ExposedPorts: []string{"2181:2181"},
Networks: []string{networkName},
WaitingFor: wait.ForLog("binding to port"),
Name: zookeeperName,
}
require.NoError(t, zookeeper.Start(), "failed to start container")
defer func() {
require.NoError(t, zookeeper.Terminate(), "terminating container failed")
}()
t.Logf("rt: starting broker")
topic := "Test"
container := testutil.Container{
Name: "telegraf-test-kafka-consumer",
Image: "wurstmeister/kafka",
ExposedPorts: []string{"9092:9092"},
Env: map[string]string{
"KAFKA_ADVERTISED_HOST_NAME": "localhost",
"KAFKA_ADVERTISED_PORT": "9092",
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]),
"KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", topic),
},
Networks: []string{networkName},
WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"),
}
require.NoError(t, container.Start(), "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
brokers := []string{
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
}
// Make kafka output
t.Logf("rt: starting output plugin")
creator := outputs.Outputs["kafka"]
output, ok := creator().(*kafkaOutput.Kafka)
require.True(t, ok)
s, _ := serializers.NewInfluxSerializer()
output.SetSerializer(s)
output.Brokers = brokers
output.Topic = topic
output.Log = testutil.Logger{}
require.NoError(t, output.Init())
require.NoError(t, output.Connect())
// Make kafka input
t.Logf("rt: starting input plugin")
input := KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: []string{topic},
MaxUndeliveredMessages: 1,
ConnectionStrategy: tt.connectionStrategy,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
require.NoError(t, input.Init())
acc := testutil.Accumulator{}
require.NoError(t, input.Start(&acc))
// Shove some metrics through
expected := testutil.MockMetrics()
t.Logf("rt: writing")
require.NoError(t, output.Write(expected))
// Check that they were received
t.Logf("rt: expecting")
acc.Wait(len(expected))
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
t.Logf("rt: shutdown")
require.NoError(t, output.Close())
input.Stop()
t.Logf("rt: done")
})
}
}
func TestExponentialBackoff(t *testing.T) {
var err error
backoff := 10 * time.Millisecond
max := 3
// get an unused port by listening on next available port, then closing it
listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer func() {
require.NoError(t, net.Remove(ctx), "terminating network failed")
}()
t.Logf("rt: starting zookeeper")
zookeeperName := "telegraf-test-kafka-consumer-zookeeper"
zookeeper := testutil.Container{
Image: "wurstmeister/zookeeper",
ExposedPorts: []string{"2181:2181"},
Networks: []string{networkName},
WaitingFor: wait.ForLog("binding to port"),
Name: zookeeperName,
}
err = zookeeper.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, zookeeper.Terminate(), "terminating container failed")
}()
t.Logf("rt: starting broker")
topic := "Test"
container := testutil.Container{
Name: "telegraf-test-kafka-consumer",
Image: "wurstmeister/kafka",
ExposedPorts: []string{"9092:9092"},
Env: map[string]string{
"KAFKA_ADVERTISED_HOST_NAME": "localhost",
"KAFKA_ADVERTISED_PORT": "9092",
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]),
"KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", topic),
},
Networks: []string{networkName},
WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"),
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
port := listener.Addr().(*net.TCPAddr).Port
require.NoError(t, listener.Close())
// try to connect to kafka on that unused port
brokers := []string{
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
fmt.Sprintf("localhost:%d", port),
}
// Make kafka output
t.Logf("rt: starting output plugin")
creator := outputs.Outputs["kafka"]
output, ok := creator().(*kafkaOutput.Kafka)
require.True(t, ok)
s, _ := serializers.NewInfluxSerializer()
output.SetSerializer(s)
output.Brokers = brokers
output.Topic = topic
output.Log = testutil.Logger{}
require.NoError(t, output.Init())
require.NoError(t, output.Connect())
// Make kafka input
t.Logf("rt: starting input plugin")
input := KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: []string{topic},
Topics: []string{"topic"},
MaxUndeliveredMessages: 1,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
MetadataRetryMax: max,
MetadataRetryBackoff: config.Duration(backoff),
MetadataRetryType: "exponential",
},
},
}
parser := &influx.Parser{}
err = parser.Init()
require.NoError(t, err)
require.NoError(t, parser.Init())
input.SetParser(parser)
err = input.Init()
require.NoError(t, err)
//time how long initialization (connection) takes
start := time.Now()
require.NoError(t, input.Init())
acc := testutil.Accumulator{}
err = input.Start(&acc)
require.NoError(t, err)
require.Error(t, input.Start(&acc))
elapsed := time.Since(start)
t.Logf("elapsed %d", elapsed)
// Shove some metrics through
expected := testutil.MockMetrics()
t.Logf("rt: writing")
require.NoError(t, output.Write(expected))
var expectedRetryDuration time.Duration
for i := 0; i < max; i++ {
expectedRetryDuration += backoff * time.Duration(math.Pow(2, float64(i)))
}
t.Logf("expected > %d", expectedRetryDuration)
// Check that they were received
t.Logf("rt: expecting")
acc.Wait(len(expected))
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
// Other than the expected retry delay, initializing and starting the
// plugin, including initializing a sarama consumer takes some time.
//
// It would be nice to check that the actual time is within an expected
// range, but we don't know how long the non-retry time should be.
//
// For now, just check that elapsed time isn't shorter than we expect the
// retry delays to be
require.GreaterOrEqual(t, elapsed, expectedRetryDuration)
t.Logf("rt: shutdown")
require.NoError(t, output.Close())
input.Stop()
t.Logf("rt: done")
}
func TestExponentialBackoffDefault(t *testing.T) {
input := KafkaConsumer{
Brokers: []string{"broker"},
Log: testutil.Logger{},
Topics: []string{"topic"},
MaxUndeliveredMessages: 1,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
MetadataRetryType: "exponential",
},
},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
require.NoError(t, input.Init())
// We don't need to start the plugin here since we're only testing
// initialization
// if input.MetadataRetryBackoff isn't set, it should be 250 ms
require.Equal(t, input.MetadataRetryBackoff, config.Duration(250*time.Millisecond))
}

View File

@ -70,6 +70,32 @@
## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
# balance_strategy = "range"
## Maximum number of retries for metadata operations including
## connecting. Sets Sarama library's Metadata.Retry.Max config value. If 0 or
## unset, use the Sarama default of 3,
# metadata_retry_max = 0
## Type of retry backoff. Valid options: "constant", "exponential"
# metadata_retry_type = "constant"
## Amount of time to wait before retrying. When metadata_retry_type is
## "constant", each retry is delayed this amount. When "exponential", the
## first retry is delayed this amount, and subsequent delays are doubled. If 0
## or unset, use the Sarama default of 250 ms
# metadata_retry_backoff = 0
## Maximum amount of time to wait before retrying when metadata_retry_type is
## "exponential". Ignored for other retry types. If 0, there is no backoff
## limit.
# metadata_retry_max_duration = 0
## Strategy for making connection to kafka brokers. Valid options: "startup",
## "defer". If set to "defer" the plugin is allowed to start before making a
## connection. This is useful if the broker may be down when telegraf is
## started, but if there are any typos in the broker setting, they will cause
## connection failures without warning at startup
# connection_strategy = "startup"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000

View File

@ -145,7 +145,7 @@ func (k *Kafka) Init() error {
}
config := sarama.NewConfig()
if err := k.SetConfig(config); err != nil {
if err := k.SetConfig(config, k.Log); err != nil {
return err
}