From 82902ebd0625917c9f4a45017a184baaa6ea86d2 Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Fri, 31 May 2024 02:27:46 -0600 Subject: [PATCH] feat(inputs.kafka_consumer): Add resolve canonical bootstrap server option (#15368) --- plugins/inputs/kafka_consumer/README.md | 5 +++ .../inputs/kafka_consumer/kafka_consumer.go | 34 ++++++++++--------- plugins/inputs/kafka_consumer/sample.conf | 5 +++ 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 0daadebb6..38b50595b 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -149,6 +149,11 @@ to use them. ## limit. # metadata_retry_max_duration = 0 + ## When set to true, this turns each bootstrap broker address into a set of + ## IPs, then does a reverse lookup on each one to get its canonical hostname. + ## This list of hostnames then replaces the original address list. + ## resolve_canonical_bootstrap_servers_only = false + ## Strategy for making connection to kafka brokers. Valid options: "startup", ## "defer". If set to "defer" the plugin is allowed to start before making a ## connection. This is useful if the broker may be down when telegraf is diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index e820642bc..edbcc97a2 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -34,24 +34,24 @@ type empty struct{} type semaphore chan empty type KafkaConsumer struct { - Brokers []string `toml:"brokers"` - Version string `toml:"kafka_version"` - ConsumerGroup string `toml:"consumer_group"` - MaxMessageLen int `toml:"max_message_len"` - MaxUndeliveredMessages int `toml:"max_undelivered_messages"` - MaxProcessingTime config.Duration `toml:"max_processing_time"` - Offset string `toml:"offset"` - BalanceStrategy string `toml:"balance_strategy"` - Topics []string `toml:"topics"` - TopicRegexps []string `toml:"topic_regexps"` - TopicTag string `toml:"topic_tag"` - MsgHeadersAsTags []string `toml:"msg_headers_as_tags"` - MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"` - ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` - ConnectionStrategy string `toml:"connection_strategy"` + Brokers []string `toml:"brokers"` + Version string `toml:"kafka_version"` + ConsumerGroup string `toml:"consumer_group"` + MaxMessageLen int `toml:"max_message_len"` + MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + MaxProcessingTime config.Duration `toml:"max_processing_time"` + Offset string `toml:"offset"` + BalanceStrategy string `toml:"balance_strategy"` + Topics []string `toml:"topics"` + TopicRegexps []string `toml:"topic_regexps"` + TopicTag string `toml:"topic_tag"` + MsgHeadersAsTags []string `toml:"msg_headers_as_tags"` + MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"` + ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` + ConnectionStrategy string `toml:"connection_strategy"` + ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"` kafka.ReadConfig - kafka.Logger Log telegraf.Logger `toml:"-"` @@ -150,6 +150,8 @@ func (k *KafkaConsumer) Init() error { k.ConsumerCreator = &SaramaCreator{} } + cfg.Net.ResolveCanonicalBootstrapServers = k.ResolveCanonicalBootstrapServersOnly + cfg.Consumer.MaxProcessingTime = time.Duration(k.MaxProcessingTime) if k.ConsumerFetchDefault != 0 { diff --git a/plugins/inputs/kafka_consumer/sample.conf b/plugins/inputs/kafka_consumer/sample.conf index b57b4c2f9..00e6a9489 100644 --- a/plugins/inputs/kafka_consumer/sample.conf +++ b/plugins/inputs/kafka_consumer/sample.conf @@ -112,6 +112,11 @@ ## limit. # metadata_retry_max_duration = 0 + ## When set to true, this turns each bootstrap broker address into a set of + ## IPs, then does a reverse lookup on each one to get its canonical hostname. + ## This list of hostnames then replaces the original address list. + ## resolve_canonical_bootstrap_servers_only = false + ## Strategy for making connection to kafka brokers. Valid options: "startup", ## "defer". If set to "defer" the plugin is allowed to start before making a ## connection. This is useful if the broker may be down when telegraf is