feat(inputs.kafka_consumer): Add message headers as metric tags. (#13924)
This commit is contained in:
parent
a542899811
commit
5ab2468789
|
|
@ -43,6 +43,16 @@ to use them.
|
||||||
## Kafka brokers.
|
## Kafka brokers.
|
||||||
brokers = ["localhost:9092"]
|
brokers = ["localhost:9092"]
|
||||||
|
|
||||||
|
## Set the minimal supported Kafka version. Should be a string contains
|
||||||
|
## 4 digits in case if it is 0 version and 3 digits for versions starting
|
||||||
|
## from 1.0.0 separated by dot. This setting enables the use of new
|
||||||
|
## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater.
|
||||||
|
## Please, check the list of supported versions at
|
||||||
|
## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions
|
||||||
|
## ex: kafka_version = "2.6.0"
|
||||||
|
## ex: kafka_version = "0.10.2.0"
|
||||||
|
# kafka_version = "0.10.2.0"
|
||||||
|
|
||||||
## Topics to consume.
|
## Topics to consume.
|
||||||
topics = ["telegraf"]
|
topics = ["telegraf"]
|
||||||
|
|
||||||
|
|
@ -53,14 +63,14 @@ to use them.
|
||||||
## When set this tag will be added to all metrics with the topic as the value.
|
## When set this tag will be added to all metrics with the topic as the value.
|
||||||
# topic_tag = ""
|
# topic_tag = ""
|
||||||
|
|
||||||
|
## The list of Kafka message headers that should be pass as metric tags
|
||||||
|
## works only for Kafka version 0.11+, on lower versions the message headers
|
||||||
|
## are not available
|
||||||
|
# msg_headers_to_tags = []
|
||||||
|
|
||||||
## Optional Client id
|
## Optional Client id
|
||||||
# client_id = "Telegraf"
|
# client_id = "Telegraf"
|
||||||
|
|
||||||
## Set the minimal supported Kafka version. Setting this enables the use of new
|
|
||||||
## Kafka features and APIs. Must be 0.10.2.0 or greater.
|
|
||||||
## ex: version = "1.1.0"
|
|
||||||
# version = ""
|
|
||||||
|
|
||||||
## Optional TLS Config
|
## Optional TLS Config
|
||||||
# enable_tls = false
|
# enable_tls = false
|
||||||
# tls_ca = "/etc/telegraf/ca.pem"
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ type semaphore chan empty
|
||||||
|
|
||||||
type KafkaConsumer struct {
|
type KafkaConsumer struct {
|
||||||
Brokers []string `toml:"brokers"`
|
Brokers []string `toml:"brokers"`
|
||||||
|
Version string `toml:"kafka_version"`
|
||||||
ConsumerGroup string `toml:"consumer_group"`
|
ConsumerGroup string `toml:"consumer_group"`
|
||||||
MaxMessageLen int `toml:"max_message_len"`
|
MaxMessageLen int `toml:"max_message_len"`
|
||||||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||||
|
|
@ -44,6 +45,7 @@ type KafkaConsumer struct {
|
||||||
Topics []string `toml:"topics"`
|
Topics []string `toml:"topics"`
|
||||||
TopicRegexps []string `toml:"topic_regexps"`
|
TopicRegexps []string `toml:"topic_regexps"`
|
||||||
TopicTag string `toml:"topic_tag"`
|
TopicTag string `toml:"topic_tag"`
|
||||||
|
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
|
||||||
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
|
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
|
||||||
ConnectionStrategy string `toml:"connection_strategy"`
|
ConnectionStrategy string `toml:"connection_strategy"`
|
||||||
|
|
||||||
|
|
@ -109,7 +111,15 @@ func (k *KafkaConsumer) Init() error {
|
||||||
cfg := sarama.NewConfig()
|
cfg := sarama.NewConfig()
|
||||||
|
|
||||||
// Kafka version 0.10.2.0 is required for consumer groups.
|
// Kafka version 0.10.2.0 is required for consumer groups.
|
||||||
|
// Try to parse version from config. If can not, set default
|
||||||
cfg.Version = sarama.V0_10_2_0
|
cfg.Version = sarama.V0_10_2_0
|
||||||
|
if k.Version != "" {
|
||||||
|
version, err := sarama.ParseKafkaVersion(k.Version)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid version: %w", err)
|
||||||
|
}
|
||||||
|
cfg.Version = version
|
||||||
|
}
|
||||||
|
|
||||||
if err := k.SetConfig(cfg, k.Log); err != nil {
|
if err := k.SetConfig(cfg, k.Log); err != nil {
|
||||||
return fmt.Errorf("SetConfig: %w", err)
|
return fmt.Errorf("SetConfig: %w", err)
|
||||||
|
|
@ -311,6 +321,15 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
|
||||||
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log)
|
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log)
|
||||||
handler.MaxMessageLen = k.MaxMessageLen
|
handler.MaxMessageLen = k.MaxMessageLen
|
||||||
handler.TopicTag = k.TopicTag
|
handler.TopicTag = k.TopicTag
|
||||||
|
//if message headers list specified, put it as map to handler
|
||||||
|
msgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags))
|
||||||
|
if len(k.MsgHeadersAsTags) > 0 {
|
||||||
|
for _, header := range k.MsgHeadersAsTags {
|
||||||
|
msgHeadersMap[header] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
handler.MsgHeadersToTags = msgHeadersMap
|
||||||
|
|
||||||
// We need to copy allWantedTopics; the Consume() is
|
// We need to copy allWantedTopics; the Consume() is
|
||||||
// long-running and we can easily deadlock if our
|
// long-running and we can easily deadlock if our
|
||||||
// topic-update-checker fires.
|
// topic-update-checker fires.
|
||||||
|
|
@ -371,8 +390,9 @@ func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, fn te
|
||||||
|
|
||||||
// ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
|
// ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
|
||||||
type ConsumerGroupHandler struct {
|
type ConsumerGroupHandler struct {
|
||||||
MaxMessageLen int
|
MaxMessageLen int
|
||||||
TopicTag string
|
TopicTag string
|
||||||
|
MsgHeadersToTags map[string]bool
|
||||||
|
|
||||||
acc telegraf.TrackingAccumulator
|
acc telegraf.TrackingAccumulator
|
||||||
sem semaphore
|
sem semaphore
|
||||||
|
|
@ -467,6 +487,22 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if any message header should be pass as tag
|
||||||
|
header_key := ""
|
||||||
|
if len(h.MsgHeadersToTags) > 0 {
|
||||||
|
for _, header := range msg.Headers {
|
||||||
|
//convert to a string as the header and value are byte arrays.
|
||||||
|
header_key = string(header.Key)
|
||||||
|
if _, exists := h.MsgHeadersToTags[header_key]; exists {
|
||||||
|
// If message header should be pass as tag then add it to the metrics
|
||||||
|
for _, metric := range metrics {
|
||||||
|
metric.AddTag(header_key, string(header.Value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add topic name as tag with TopicTag name specified in the config
|
||||||
if len(h.TopicTag) > 0 {
|
if len(h.TopicTag) > 0 {
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
metric.AddTag(h.TopicTag, msg.Topic)
|
metric.AddTag(h.TopicTag, msg.Topic)
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,16 @@
|
||||||
## Kafka brokers.
|
## Kafka brokers.
|
||||||
brokers = ["localhost:9092"]
|
brokers = ["localhost:9092"]
|
||||||
|
|
||||||
|
## Set the minimal supported Kafka version. Should be a string contains
|
||||||
|
## 4 digits in case if it is 0 version and 3 digits for versions starting
|
||||||
|
## from 1.0.0 separated by dot. This setting enables the use of new
|
||||||
|
## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater.
|
||||||
|
## Please, check the list of supported versions at
|
||||||
|
## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions
|
||||||
|
## ex: kafka_version = "2.6.0"
|
||||||
|
## ex: kafka_version = "0.10.2.0"
|
||||||
|
# kafka_version = "0.10.2.0"
|
||||||
|
|
||||||
## Topics to consume.
|
## Topics to consume.
|
||||||
topics = ["telegraf"]
|
topics = ["telegraf"]
|
||||||
|
|
||||||
|
|
@ -13,14 +23,14 @@
|
||||||
## When set this tag will be added to all metrics with the topic as the value.
|
## When set this tag will be added to all metrics with the topic as the value.
|
||||||
# topic_tag = ""
|
# topic_tag = ""
|
||||||
|
|
||||||
|
## The list of Kafka message headers that should be pass as metric tags
|
||||||
|
## works only for Kafka version 0.11+, on lower versions the message headers
|
||||||
|
## are not available
|
||||||
|
# msg_headers_to_tags = []
|
||||||
|
|
||||||
## Optional Client id
|
## Optional Client id
|
||||||
# client_id = "Telegraf"
|
# client_id = "Telegraf"
|
||||||
|
|
||||||
## Set the minimal supported Kafka version. Setting this enables the use of new
|
|
||||||
## Kafka features and APIs. Must be 0.10.2.0 or greater.
|
|
||||||
## ex: version = "1.1.0"
|
|
||||||
# version = ""
|
|
||||||
|
|
||||||
## Optional TLS Config
|
## Optional TLS Config
|
||||||
# enable_tls = false
|
# enable_tls = false
|
||||||
# tls_ca = "/etc/telegraf/ca.pem"
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue