feat(outputs.kafka): Option to set producer message timestamp (#15689)
This commit is contained in:
parent
b090cf621c
commit
2cab6ec7ce
|
|
@ -121,6 +121,12 @@ to use them.
|
||||||
## smaller than the broker's 'message.max.bytes'.
|
## smaller than the broker's 'message.max.bytes'.
|
||||||
# max_message_bytes = 1000000
|
# max_message_bytes = 1000000
|
||||||
|
|
||||||
|
## Producer timestamp
|
||||||
|
## This option sets the timestamp of the kafka producer message, choose from:
|
||||||
|
## * metric: Uses the metric's timestamp
|
||||||
|
## * now: Uses the time of write
|
||||||
|
# producer_timestamp = metric
|
||||||
|
|
||||||
## Optional TLS Config
|
## Optional TLS Config
|
||||||
# enable_tls = false
|
# enable_tls = false
|
||||||
# tls_ca = "/etc/telegraf/ca.pem"
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
|
|
||||||
|
|
@ -31,13 +31,14 @@ var ValidTopicSuffixMethods = []string{
|
||||||
var zeroTime = time.Unix(0, 0)
|
var zeroTime = time.Unix(0, 0)
|
||||||
|
|
||||||
type Kafka struct {
|
type Kafka struct {
|
||||||
Brokers []string `toml:"brokers"`
|
Brokers []string `toml:"brokers"`
|
||||||
Topic string `toml:"topic"`
|
Topic string `toml:"topic"`
|
||||||
TopicTag string `toml:"topic_tag"`
|
TopicTag string `toml:"topic_tag"`
|
||||||
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
|
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
|
||||||
TopicSuffix TopicSuffix `toml:"topic_suffix"`
|
TopicSuffix TopicSuffix `toml:"topic_suffix"`
|
||||||
RoutingTag string `toml:"routing_tag"`
|
RoutingTag string `toml:"routing_tag"`
|
||||||
RoutingKey string `toml:"routing_key"`
|
RoutingKey string `toml:"routing_key"`
|
||||||
|
ProducerTimestamp string `toml:"producer_timestamp"`
|
||||||
|
|
||||||
proxy.Socks5ProxyConfig
|
proxy.Socks5ProxyConfig
|
||||||
|
|
||||||
|
|
@ -152,6 +153,14 @@ func (k *Kafka) Init() error {
|
||||||
}
|
}
|
||||||
k.saramaConfig = config
|
k.saramaConfig = config
|
||||||
|
|
||||||
|
switch k.ProducerTimestamp {
|
||||||
|
case "":
|
||||||
|
k.ProducerTimestamp = "metric"
|
||||||
|
case "metric", "now":
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown producer_timestamp option: %s", k.ProducerTimestamp)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -207,7 +216,7 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Negative timestamps are not allowed by the Kafka protocol.
|
// Negative timestamps are not allowed by the Kafka protocol.
|
||||||
if !metric.Time().Before(zeroTime) {
|
if k.ProducerTimestamp == "metric" && !metric.Time().Before(zeroTime) {
|
||||||
m.Timestamp = metric.Time()
|
m.Timestamp = metric.Time()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -81,6 +81,12 @@
|
||||||
## smaller than the broker's 'message.max.bytes'.
|
## smaller than the broker's 'message.max.bytes'.
|
||||||
# max_message_bytes = 1000000
|
# max_message_bytes = 1000000
|
||||||
|
|
||||||
|
## Producer timestamp
|
||||||
|
## This option sets the timestamp of the kafka producer message, choose from:
|
||||||
|
## * metric: Uses the metric's timestamp
|
||||||
|
## * now: Uses the time of write
|
||||||
|
# producer_timestamp = metric
|
||||||
|
|
||||||
## Optional TLS Config
|
## Optional TLS Config
|
||||||
# enable_tls = false
|
# enable_tls = false
|
||||||
# tls_ca = "/etc/telegraf/ca.pem"
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue