feat(inputs.kafka_consumer): Add option to set metric name from message header (#14320)
This commit is contained in:
parent
d7018226e0
commit
cd83c70241
|
|
@ -68,6 +68,11 @@ to use them.
|
||||||
## are not available
|
## are not available
|
||||||
# msg_headers_to_tags = []
|
# msg_headers_to_tags = []
|
||||||
|
|
||||||
|
## The name of kafka message header which value should override the metric name.
|
||||||
|
## In case when the same header specified in current option and in msg_headers_to_tags
|
||||||
|
## option, it will be excluded from the msg_headers_to_tags list.
|
||||||
|
# msg_header_as_metric_name = ""
|
||||||
|
|
||||||
## Optional Client id
|
## Optional Client id
|
||||||
# client_id = "Telegraf"
|
# client_id = "Telegraf"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,7 @@ type KafkaConsumer struct {
|
||||||
TopicRegexps []string `toml:"topic_regexps"`
|
TopicRegexps []string `toml:"topic_regexps"`
|
||||||
TopicTag string `toml:"topic_tag"`
|
TopicTag string `toml:"topic_tag"`
|
||||||
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
|
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
|
||||||
|
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
|
||||||
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
|
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
|
||||||
ConnectionStrategy string `toml:"connection_strategy"`
|
ConnectionStrategy string `toml:"connection_strategy"`
|
||||||
|
|
||||||
|
|
@ -321,11 +322,14 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
|
||||||
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
|
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
|
||||||
handler.MaxMessageLen = k.MaxMessageLen
|
handler.MaxMessageLen = k.MaxMessageLen
|
||||||
handler.TopicTag = k.TopicTag
|
handler.TopicTag = k.TopicTag
|
||||||
|
handler.MsgHeaderToMetricName = k.MsgHeaderAsMetricName
|
||||||
//if message headers list specified, put it as map to handler
|
//if message headers list specified, put it as map to handler
|
||||||
msgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags))
|
msgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags))
|
||||||
if len(k.MsgHeadersAsTags) > 0 {
|
if len(k.MsgHeadersAsTags) > 0 {
|
||||||
for _, header := range k.MsgHeadersAsTags {
|
for _, header := range k.MsgHeadersAsTags {
|
||||||
msgHeadersMap[header] = true
|
if k.MsgHeaderAsMetricName != header {
|
||||||
|
msgHeadersMap[header] = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
handler.MsgHeadersToTags = msgHeadersMap
|
handler.MsgHeadersToTags = msgHeadersMap
|
||||||
|
|
@ -390,9 +394,10 @@ func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parse
|
||||||
|
|
||||||
// ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
|
// ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
|
||||||
type ConsumerGroupHandler struct {
|
type ConsumerGroupHandler struct {
|
||||||
MaxMessageLen int
|
MaxMessageLen int
|
||||||
TopicTag string
|
TopicTag string
|
||||||
MsgHeadersToTags map[string]bool
|
MsgHeadersToTags map[string]bool
|
||||||
|
MsgHeaderToMetricName string
|
||||||
|
|
||||||
acc telegraf.TrackingAccumulator
|
acc telegraf.TrackingAccumulator
|
||||||
sem semaphore
|
sem semaphore
|
||||||
|
|
@ -482,9 +487,9 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if any message header should be pass as tag
|
|
||||||
headerKey := ""
|
headerKey := ""
|
||||||
if len(h.MsgHeadersToTags) > 0 {
|
// Check if any message header should override metric name or should be pass as tag
|
||||||
|
if len(h.MsgHeadersToTags) > 0 || h.MsgHeaderToMetricName != "" {
|
||||||
for _, header := range msg.Headers {
|
for _, header := range msg.Headers {
|
||||||
//convert to a string as the header and value are byte arrays.
|
//convert to a string as the header and value are byte arrays.
|
||||||
headerKey = string(header.Key)
|
headerKey = string(header.Key)
|
||||||
|
|
@ -493,6 +498,12 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
metric.AddTag(headerKey, string(header.Value))
|
metric.AddTag(headerKey, string(header.Value))
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if h.MsgHeaderToMetricName == headerKey {
|
||||||
|
for _, metric := range metrics {
|
||||||
|
metric.SetName(string(header.Value))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,11 @@
|
||||||
## works only for Kafka version 0.11+, on lower versions the message headers
|
## works only for Kafka version 0.11+, on lower versions the message headers
|
||||||
## are not available
|
## are not available
|
||||||
# msg_headers_to_tags = []
|
# msg_headers_to_tags = []
|
||||||
|
|
||||||
|
## The name of kafka message header which value should override the metric name.
|
||||||
|
## In case when the same header specified in current option and in msg_headers_to_tags
|
||||||
|
## option, it will be excluded from the msg_headers_to_tags list.
|
||||||
|
# msg_header_as_metric_name = ""
|
||||||
|
|
||||||
## Optional Client id
|
## Optional Client id
|
||||||
# client_id = "Telegraf"
|
# client_id = "Telegraf"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue