From 343e846480d575afd96868e6b537d624d05049fe Mon Sep 17 00:00:00 2001 From: Chris Ruscio Date: Thu, 28 Oct 2021 16:35:22 -0400 Subject: [PATCH] feat: add max_processing_time config to Kafka Consumer input (#9988) --- plugins/inputs/kafka_consumer/README.md | 9 +++++ .../inputs/kafka_consumer/kafka_consumer.go | 33 ++++++++++++++----- .../kafka_consumer/kafka_consumer_test.go | 12 +++++++ 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 741f24d04..f4629ed4e 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -93,6 +93,15 @@ and use the old zookeeper connection method. ## waiting until the next flush_interval. # max_undelivered_messages = 1000 + ## Maximum amount of time the consumer should take to process messages. If + ## the debug log prints messages from sarama about 'abandoning subscription + ## to [topic] because consuming was taking too long', increase this value to + ## longer than the time taken by the output plugin(s). + ## + ## Note that the effective timeout could be between 'max_processing_time' and + ## '2 * max_processing_time'. + # max_processing_time = "100ms" + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index b20004a87..1aff773a5 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -10,6 +10,7 @@ import ( "github.com/Shopify/sarama" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/inputs" @@ -101,6 +102,15 @@ const sampleConfig = ` ## waiting until the next flush_interval. # max_undelivered_messages = 1000 + ## Maximum amount of time the consumer should take to process messages. If + ## the debug log prints messages from sarama about 'abandoning subscription + ## to [topic] because consuming was taking too long', increase this value to + ## longer than the time taken by the output plugin(s). + ## + ## Note that the effective timeout could be between 'max_processing_time' and + ## '2 * max_processing_time'. + # max_processing_time = "100ms" + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -110,6 +120,7 @@ const sampleConfig = ` const ( defaultMaxUndeliveredMessages = 1000 + defaultMaxProcessingTime = config.Duration(100 * time.Millisecond) defaultConsumerGroup = "telegraf_metrics_consumers" reconnectDelay = 5 * time.Second ) @@ -118,14 +129,15 @@ type empty struct{} type semaphore chan empty type KafkaConsumer struct { - Brokers []string `toml:"brokers"` - ConsumerGroup string `toml:"consumer_group"` - MaxMessageLen int `toml:"max_message_len"` - MaxUndeliveredMessages int `toml:"max_undelivered_messages"` - Offset string `toml:"offset"` - BalanceStrategy string `toml:"balance_strategy"` - Topics []string `toml:"topics"` - TopicTag string `toml:"topic_tag"` + Brokers []string `toml:"brokers"` + ConsumerGroup string `toml:"consumer_group"` + MaxMessageLen int `toml:"max_message_len"` + MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + MaxProcessingTime config.Duration `toml:"max_processing_time"` + Offset string `toml:"offset"` + BalanceStrategy string `toml:"balance_strategy"` + Topics []string `toml:"topics"` + TopicTag string `toml:"topic_tag"` kafka.ReadConfig @@ -172,6 +184,9 @@ func (k *KafkaConsumer) Init() error { if k.MaxUndeliveredMessages == 0 { k.MaxUndeliveredMessages = defaultMaxUndeliveredMessages } + if time.Duration(k.MaxProcessingTime) == 0 { + k.MaxProcessingTime = defaultMaxProcessingTime + } if k.ConsumerGroup == "" { k.ConsumerGroup = defaultConsumerGroup } @@ -209,6 +224,8 @@ func (k *KafkaConsumer) Init() error { k.ConsumerCreator = &SaramaCreator{} } + config.Consumer.MaxProcessingTime = time.Duration(k.MaxProcessingTime) + k.config = config return nil } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 68fd9e062..7d31dad92 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/parsers/value" @@ -64,6 +65,7 @@ func TestInit(t *testing.T) { require.Equal(t, plugin.MaxUndeliveredMessages, defaultMaxUndeliveredMessages) require.Equal(t, plugin.config.ClientID, "Telegraf") require.Equal(t, plugin.config.Consumer.Offsets.Initial, sarama.OffsetOldest) + require.Equal(t, plugin.config.Consumer.MaxProcessingTime, 100*time.Millisecond) }, }, { @@ -165,6 +167,16 @@ func TestInit(t *testing.T) { require.True(t, plugin.config.Net.TLS.Enable) }, }, + { + name: "custom max_processing_time", + plugin: &KafkaConsumer{ + MaxProcessingTime: config.Duration(1000 * time.Millisecond), + Log: testutil.Logger{}, + }, + check: func(t *testing.T, plugin *KafkaConsumer) { + require.Equal(t, plugin.config.Consumer.MaxProcessingTime, 1000*time.Millisecond) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {