diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index c69e5f11b..56fdba000 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -103,6 +103,13 @@ plugin and use the old zookeeper connection method. ## '2 * max_processing_time'. # max_processing_time = "100ms" + ## The default number of message bytes to fetch from the broker in each + ## request (default 1MB). This should be larger than the majority of + ## your messages, or else the consumer will spend a lot of time + ## negotiating sizes and not actually consuming. Similar to the JVM's + ## `fetch.message.max.bytes`. + # consumer_fetch_default = "1MB" + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 1edc1c060..3c716f0c3 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -43,6 +43,7 @@ type KafkaConsumer struct { BalanceStrategy string `toml:"balance_strategy"` Topics []string `toml:"topics"` TopicTag string `toml:"topic_tag"` + ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` kafka.ReadConfig @@ -127,6 +128,10 @@ func (k *KafkaConsumer) Init() error { cfg.Consumer.MaxProcessingTime = time.Duration(k.MaxProcessingTime) + if k.ConsumerFetchDefault != 0 { + cfg.Consumer.Fetch.Default = int32(k.ConsumerFetchDefault) + } + k.config = cfg return nil }