From 2cab6ec7ce3659004892126693fffdad8700bfdf Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Wed, 31 Jul 2024 13:35:21 -0600 Subject: [PATCH] feat(outputs.kafka): Option to set producer message timestamp (#15689) --- plugins/outputs/kafka/README.md | 6 ++++++ plugins/outputs/kafka/kafka.go | 25 +++++++++++++++++-------- plugins/outputs/kafka/sample.conf | 6 ++++++ 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 8dd3778db..1670bd557 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -121,6 +121,12 @@ to use them. ## smaller than the broker's 'message.max.bytes'. # max_message_bytes = 1000000 + ## Producer timestamp + ## This option sets the timestamp of the kafka producer message, choose from: + ## * metric: Uses the metric's timestamp + ## * now: Uses the time of write + # producer_timestamp = metric + ## Optional TLS Config # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 13440f7ca..0b6208cd1 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -31,13 +31,14 @@ var ValidTopicSuffixMethods = []string{ var zeroTime = time.Unix(0, 0) type Kafka struct { - Brokers []string `toml:"brokers"` - Topic string `toml:"topic"` - TopicTag string `toml:"topic_tag"` - ExcludeTopicTag bool `toml:"exclude_topic_tag"` - TopicSuffix TopicSuffix `toml:"topic_suffix"` - RoutingTag string `toml:"routing_tag"` - RoutingKey string `toml:"routing_key"` + Brokers []string `toml:"brokers"` + Topic string `toml:"topic"` + TopicTag string `toml:"topic_tag"` + ExcludeTopicTag bool `toml:"exclude_topic_tag"` + TopicSuffix TopicSuffix `toml:"topic_suffix"` + RoutingTag string `toml:"routing_tag"` + RoutingKey string `toml:"routing_key"` + ProducerTimestamp string `toml:"producer_timestamp"` proxy.Socks5ProxyConfig @@ -152,6 +153,14 @@ func (k *Kafka) Init() error { } k.saramaConfig = config + switch k.ProducerTimestamp { + case "": + k.ProducerTimestamp = "metric" + case "metric", "now": + default: + return fmt.Errorf("unknown producer_timestamp option: %s", k.ProducerTimestamp) + } + return nil } @@ -207,7 +216,7 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { } // Negative timestamps are not allowed by the Kafka protocol. - if !metric.Time().Before(zeroTime) { + if k.ProducerTimestamp == "metric" && !metric.Time().Before(zeroTime) { m.Timestamp = metric.Time() } diff --git a/plugins/outputs/kafka/sample.conf b/plugins/outputs/kafka/sample.conf index cf905ea60..56916a95f 100644 --- a/plugins/outputs/kafka/sample.conf +++ b/plugins/outputs/kafka/sample.conf @@ -81,6 +81,12 @@ ## smaller than the broker's 'message.max.bytes'. # max_message_bytes = 1000000 + ## Producer timestamp + ## This option sets the timestamp of the kafka producer message, choose from: + ## * metric: Uses the metric's timestamp + ## * now: Uses the time of write + # producer_timestamp = metric + ## Optional TLS Config # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem"