From b1546fe1c7ad14406b009710ce00451fd44a3861 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Albertas=20Bu=C5=BEinskas?= Date: Tue, 26 Jul 2022 00:49:34 +0300 Subject: [PATCH] feat(inputs.kafka_consumer): Option to set default fetch message bytes (#11220) --- plugins/inputs/kafka_consumer/README.md | 7 +++++++ plugins/inputs/kafka_consumer/kafka_consumer.go | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index c69e5f11b..56fdba000 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -103,6 +103,13 @@ plugin and use the old zookeeper connection method. ## '2 * max_processing_time'. # max_processing_time = "100ms" + ## The default number of message bytes to fetch from the broker in each + ## request (default 1MB). This should be larger than the majority of + ## your messages, or else the consumer will spend a lot of time + ## negotiating sizes and not actually consuming. Similar to the JVM's + ## `fetch.message.max.bytes`. + # consumer_fetch_default = "1MB" + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 1edc1c060..3c716f0c3 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -43,6 +43,7 @@ type KafkaConsumer struct { BalanceStrategy string `toml:"balance_strategy"` Topics []string `toml:"topics"` TopicTag string `toml:"topic_tag"` + ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` kafka.ReadConfig @@ -127,6 +128,10 @@ func (k *KafkaConsumer) Init() error { cfg.Consumer.MaxProcessingTime = time.Duration(k.MaxProcessingTime) + if k.ConsumerFetchDefault != 0 { + cfg.Consumer.Fetch.Default = int32(k.ConsumerFetchDefault) + } + k.config = cfg return nil }