feat: add max_processing_time config to Kafka Consumer input (#9988)
This commit is contained in:
parent
96c66d3a45
commit
343e846480
|
|
@ -93,6 +93,15 @@ and use the old zookeeper connection method.
|
||||||
## waiting until the next flush_interval.
|
## waiting until the next flush_interval.
|
||||||
# max_undelivered_messages = 1000
|
# max_undelivered_messages = 1000
|
||||||
|
|
||||||
|
## Maximum amount of time the consumer should take to process messages. If
|
||||||
|
## the debug log prints messages from sarama about 'abandoning subscription
|
||||||
|
## to [topic] because consuming was taking too long', increase this value to
|
||||||
|
## longer than the time taken by the output plugin(s).
|
||||||
|
##
|
||||||
|
## Note that the effective timeout could be between 'max_processing_time' and
|
||||||
|
## '2 * max_processing_time'.
|
||||||
|
# max_processing_time = "100ms"
|
||||||
|
|
||||||
## Data format to consume.
|
## Data format to consume.
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Each data format has its own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/common/kafka"
|
"github.com/influxdata/telegraf/plugins/common/kafka"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
|
@ -101,6 +102,15 @@ const sampleConfig = `
|
||||||
## waiting until the next flush_interval.
|
## waiting until the next flush_interval.
|
||||||
# max_undelivered_messages = 1000
|
# max_undelivered_messages = 1000
|
||||||
|
|
||||||
|
## Maximum amount of time the consumer should take to process messages. If
|
||||||
|
## the debug log prints messages from sarama about 'abandoning subscription
|
||||||
|
## to [topic] because consuming was taking too long', increase this value to
|
||||||
|
## longer than the time taken by the output plugin(s).
|
||||||
|
##
|
||||||
|
## Note that the effective timeout could be between 'max_processing_time' and
|
||||||
|
## '2 * max_processing_time'.
|
||||||
|
# max_processing_time = "100ms"
|
||||||
|
|
||||||
## Data format to consume.
|
## Data format to consume.
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Each data format has its own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
@ -110,6 +120,7 @@ const sampleConfig = `
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultMaxUndeliveredMessages = 1000
|
defaultMaxUndeliveredMessages = 1000
|
||||||
|
defaultMaxProcessingTime = config.Duration(100 * time.Millisecond)
|
||||||
defaultConsumerGroup = "telegraf_metrics_consumers"
|
defaultConsumerGroup = "telegraf_metrics_consumers"
|
||||||
reconnectDelay = 5 * time.Second
|
reconnectDelay = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
@ -118,14 +129,15 @@ type empty struct{}
|
||||||
type semaphore chan empty
|
type semaphore chan empty
|
||||||
|
|
||||||
type KafkaConsumer struct {
|
type KafkaConsumer struct {
|
||||||
Brokers []string `toml:"brokers"`
|
Brokers []string `toml:"brokers"`
|
||||||
ConsumerGroup string `toml:"consumer_group"`
|
ConsumerGroup string `toml:"consumer_group"`
|
||||||
MaxMessageLen int `toml:"max_message_len"`
|
MaxMessageLen int `toml:"max_message_len"`
|
||||||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||||
Offset string `toml:"offset"`
|
MaxProcessingTime config.Duration `toml:"max_processing_time"`
|
||||||
BalanceStrategy string `toml:"balance_strategy"`
|
Offset string `toml:"offset"`
|
||||||
Topics []string `toml:"topics"`
|
BalanceStrategy string `toml:"balance_strategy"`
|
||||||
TopicTag string `toml:"topic_tag"`
|
Topics []string `toml:"topics"`
|
||||||
|
TopicTag string `toml:"topic_tag"`
|
||||||
|
|
||||||
kafka.ReadConfig
|
kafka.ReadConfig
|
||||||
|
|
||||||
|
|
@ -172,6 +184,9 @@ func (k *KafkaConsumer) Init() error {
|
||||||
if k.MaxUndeliveredMessages == 0 {
|
if k.MaxUndeliveredMessages == 0 {
|
||||||
k.MaxUndeliveredMessages = defaultMaxUndeliveredMessages
|
k.MaxUndeliveredMessages = defaultMaxUndeliveredMessages
|
||||||
}
|
}
|
||||||
|
if time.Duration(k.MaxProcessingTime) == 0 {
|
||||||
|
k.MaxProcessingTime = defaultMaxProcessingTime
|
||||||
|
}
|
||||||
if k.ConsumerGroup == "" {
|
if k.ConsumerGroup == "" {
|
||||||
k.ConsumerGroup = defaultConsumerGroup
|
k.ConsumerGroup = defaultConsumerGroup
|
||||||
}
|
}
|
||||||
|
|
@ -209,6 +224,8 @@ func (k *KafkaConsumer) Init() error {
|
||||||
k.ConsumerCreator = &SaramaCreator{}
|
k.ConsumerCreator = &SaramaCreator{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
config.Consumer.MaxProcessingTime = time.Duration(k.MaxProcessingTime)
|
||||||
|
|
||||||
k.config = config
|
k.config = config
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/plugins/common/kafka"
|
"github.com/influxdata/telegraf/plugins/common/kafka"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/value"
|
"github.com/influxdata/telegraf/plugins/parsers/value"
|
||||||
|
|
@ -64,6 +65,7 @@ func TestInit(t *testing.T) {
|
||||||
require.Equal(t, plugin.MaxUndeliveredMessages, defaultMaxUndeliveredMessages)
|
require.Equal(t, plugin.MaxUndeliveredMessages, defaultMaxUndeliveredMessages)
|
||||||
require.Equal(t, plugin.config.ClientID, "Telegraf")
|
require.Equal(t, plugin.config.ClientID, "Telegraf")
|
||||||
require.Equal(t, plugin.config.Consumer.Offsets.Initial, sarama.OffsetOldest)
|
require.Equal(t, plugin.config.Consumer.Offsets.Initial, sarama.OffsetOldest)
|
||||||
|
require.Equal(t, plugin.config.Consumer.MaxProcessingTime, 100*time.Millisecond)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -165,6 +167,16 @@ func TestInit(t *testing.T) {
|
||||||
require.True(t, plugin.config.Net.TLS.Enable)
|
require.True(t, plugin.config.Net.TLS.Enable)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "custom max_processing_time",
|
||||||
|
plugin: &KafkaConsumer{
|
||||||
|
MaxProcessingTime: config.Duration(1000 * time.Millisecond),
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
},
|
||||||
|
check: func(t *testing.T, plugin *KafkaConsumer) {
|
||||||
|
require.Equal(t, plugin.config.Consumer.MaxProcessingTime, 1000*time.Millisecond)
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue