feat(outputs.kafka): Option to add metric name as record header (#15722)
This commit is contained in:
parent
99b55b71b0
commit
fc198910cf
|
|
@ -127,6 +127,9 @@ to use them.
|
|||
## * now: Uses the time of write
|
||||
# producer_timestamp = metric
|
||||
|
||||
## Add metric name as specified kafka header if not empty
|
||||
# metric_name_header = ""
|
||||
|
||||
## Optional TLS Config
|
||||
# enable_tls = false
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ type Kafka struct {
|
|||
RoutingTag string `toml:"routing_tag"`
|
||||
RoutingKey string `toml:"routing_key"`
|
||||
ProducerTimestamp string `toml:"producer_timestamp"`
|
||||
MetricNameHeader string `toml:"metric_name_header"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
proxy.Socks5ProxyConfig
|
||||
kafka.WriteConfig
|
||||
|
|
@ -209,6 +210,15 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
|||
Value: sarama.ByteEncoder(buf),
|
||||
}
|
||||
|
||||
if k.MetricNameHeader != "" {
|
||||
m.Headers = []sarama.RecordHeader{
|
||||
{
|
||||
Key: []byte(k.MetricNameHeader),
|
||||
Value: []byte(metric.Name()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Negative timestamps are not allowed by the Kafka protocol.
|
||||
if k.ProducerTimestamp == "metric" && !metric.Time().Before(zeroTime) {
|
||||
m.Timestamp = metric.Time()
|
||||
|
|
|
|||
|
|
@ -87,6 +87,9 @@
|
|||
## * now: Uses the time of write
|
||||
# producer_timestamp = metric
|
||||
|
||||
## Add metric name as specified kafka header if not empty
|
||||
# metric_name_header = ""
|
||||
|
||||
## Optional TLS Config
|
||||
# enable_tls = false
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
|
|
|
|||
Loading…
Reference in New Issue