feat(inputs.kafka_consumer): Option to set default fetch message bytes (#11220)

This commit is contained in:
Albertas Bužinskas 2022-07-26 00:49:34 +03:00 committed by GitHub
parent f96755c156
commit b1546fe1c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 0 deletions

View File

@ -103,6 +103,13 @@ plugin and use the old zookeeper connection method.
## '2 * max_processing_time'.
# max_processing_time = "100ms"
## The default number of message bytes to fetch from the broker in each
## request (default 1MB). This should be larger than the majority of
## your messages, or else the consumer will spend a lot of time
## negotiating sizes and not actually consuming. Similar to the JVM's
## `fetch.message.max.bytes`.
# consumer_fetch_default = "1MB"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:

View File

@ -43,6 +43,7 @@ type KafkaConsumer struct {
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicTag string `toml:"topic_tag"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
kafka.ReadConfig
@ -127,6 +128,10 @@ func (k *KafkaConsumer) Init() error {
cfg.Consumer.MaxProcessingTime = time.Duration(k.MaxProcessingTime)
if k.ConsumerFetchDefault != 0 {
cfg.Consumer.Fetch.Default = int32(k.ConsumerFetchDefault)
}
k.config = cfg
return nil
}