diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 0198d2b97..6b5edd348 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -43,6 +43,16 @@ to use them. ## Kafka brokers. brokers = ["localhost:9092"] + ## Set the minimal supported Kafka version. Should be a string contains + ## 4 digits in case if it is 0 version and 3 digits for versions starting + ## from 1.0.0 separated by dot. This setting enables the use of new + ## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater. + ## Please, check the list of supported versions at + ## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions + ## ex: kafka_version = "2.6.0" + ## ex: kafka_version = "0.10.2.0" + # kafka_version = "0.10.2.0" + ## Topics to consume. topics = ["telegraf"] @@ -53,14 +63,14 @@ to use them. ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = "" + ## The list of Kafka message headers that should be pass as metric tags + ## works only for Kafka version 0.11+, on lower versions the message headers + ## are not available + # msg_headers_to_tags = [] + ## Optional Client id # client_id = "Telegraf" - ## Set the minimal supported Kafka version. Setting this enables the use of new - ## Kafka features and APIs. Must be 0.10.2.0 or greater. - ## ex: version = "1.1.0" - # version = "" - ## Optional TLS Config # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 695c7a601..23aed9609 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -35,6 +35,7 @@ type semaphore chan empty type KafkaConsumer struct { Brokers []string `toml:"brokers"` + Version string `toml:"kafka_version"` ConsumerGroup string `toml:"consumer_group"` MaxMessageLen int `toml:"max_message_len"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` @@ -44,6 +45,7 @@ type KafkaConsumer struct { Topics []string `toml:"topics"` TopicRegexps []string `toml:"topic_regexps"` TopicTag string `toml:"topic_tag"` + MsgHeadersAsTags []string `toml:"msg_headers_as_tags"` ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` ConnectionStrategy string `toml:"connection_strategy"` @@ -109,7 +111,15 @@ func (k *KafkaConsumer) Init() error { cfg := sarama.NewConfig() // Kafka version 0.10.2.0 is required for consumer groups. + // Try to parse version from config. If can not, set default cfg.Version = sarama.V0_10_2_0 + if k.Version != "" { + version, err := sarama.ParseKafkaVersion(k.Version) + if err != nil { + return fmt.Errorf("invalid version: %w", err) + } + cfg.Version = version + } if err := k.SetConfig(cfg, k.Log); err != nil { return fmt.Errorf("SetConfig: %w", err) @@ -311,6 +321,15 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log) handler.MaxMessageLen = k.MaxMessageLen handler.TopicTag = k.TopicTag + //if message headers list specified, put it as map to handler + msgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags)) + if len(k.MsgHeadersAsTags) > 0 { + for _, header := range k.MsgHeadersAsTags { + msgHeadersMap[header] = true + } + } + handler.MsgHeadersToTags = msgHeadersMap + // We need to copy allWantedTopics; the Consume() is // long-running and we can easily deadlock if our // topic-update-checker fires. @@ -371,8 +390,9 @@ func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, fn te // ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation. type ConsumerGroupHandler struct { - MaxMessageLen int - TopicTag string + MaxMessageLen int + TopicTag string + MsgHeadersToTags map[string]bool acc telegraf.TrackingAccumulator sem semaphore @@ -467,6 +487,22 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg * return err } + // Check if any message header should be pass as tag + header_key := "" + if len(h.MsgHeadersToTags) > 0 { + for _, header := range msg.Headers { + //convert to a string as the header and value are byte arrays. + header_key = string(header.Key) + if _, exists := h.MsgHeadersToTags[header_key]; exists { + // If message header should be pass as tag then add it to the metrics + for _, metric := range metrics { + metric.AddTag(header_key, string(header.Value)) + } + } + } + } + + // Add topic name as tag with TopicTag name specified in the config if len(h.TopicTag) > 0 { for _, metric := range metrics { metric.AddTag(h.TopicTag, msg.Topic) diff --git a/plugins/inputs/kafka_consumer/sample.conf b/plugins/inputs/kafka_consumer/sample.conf index c618d1610..28fdb215a 100644 --- a/plugins/inputs/kafka_consumer/sample.conf +++ b/plugins/inputs/kafka_consumer/sample.conf @@ -3,6 +3,16 @@ ## Kafka brokers. brokers = ["localhost:9092"] + ## Set the minimal supported Kafka version. Should be a string contains + ## 4 digits in case if it is 0 version and 3 digits for versions starting + ## from 1.0.0 separated by dot. This setting enables the use of new + ## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater. + ## Please, check the list of supported versions at + ## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions + ## ex: kafka_version = "2.6.0" + ## ex: kafka_version = "0.10.2.0" + # kafka_version = "0.10.2.0" + ## Topics to consume. topics = ["telegraf"] @@ -13,14 +23,14 @@ ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = "" + ## The list of Kafka message headers that should be pass as metric tags + ## works only for Kafka version 0.11+, on lower versions the message headers + ## are not available + # msg_headers_to_tags = [] + ## Optional Client id # client_id = "Telegraf" - ## Set the minimal supported Kafka version. Setting this enables the use of new - ## Kafka features and APIs. Must be 0.10.2.0 or greater. - ## ex: version = "1.1.0" - # version = "" - ## Optional TLS Config # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem"