From 6fb08bb3da4df8a583f289010307abcdf2006d17 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 5 Jan 2023 16:48:31 +0100 Subject: [PATCH] feat(kafka): Add keep-alive period setting for input and output. (#12459) --- plugins/common/kafka/config.go | 14 ++++++++++---- plugins/inputs/kafka_consumer/README.md | 4 ++++ plugins/inputs/kafka_consumer/sample.conf | 4 ++++ plugins/outputs/kafka/README.md | 6 +++++- plugins/outputs/kafka/sample.conf | 6 +++++- 5 files changed, 28 insertions(+), 6 deletions(-) diff --git a/plugins/common/kafka/config.go b/plugins/common/kafka/config.go index 69e10b777..f007d24ec 100644 --- a/plugins/common/kafka/config.go +++ b/plugins/common/kafka/config.go @@ -54,10 +54,11 @@ type Config struct { SASLAuth tls.ClientConfig - Version string `toml:"version"` - ClientID string `toml:"client_id"` - CompressionCodec int `toml:"compression_codec"` - EnableTLS *bool `toml:"enable_tls"` + Version string `toml:"version"` + ClientID string `toml:"client_id"` + CompressionCodec int `toml:"compression_codec"` + EnableTLS *bool `toml:"enable_tls"` + KeepAlivePeriod *tgConf.Duration `toml:"keep_alive_period"` MetadataRetryMax int `toml:"metadata_retry_max"` MetadataRetryType string `toml:"metadata_retry_type"` @@ -118,6 +119,11 @@ func (k *Config) SetConfig(config *sarama.Config, log telegraf.Logger) error { } } + if k.KeepAlivePeriod != nil { + // Defaults to OS setting (15s currently) + config.Net.KeepAlive = time.Duration(*k.KeepAlivePeriod) + } + if k.MetadataFull != nil { // Defaults to true in Sarama config.Metadata.Full = *k.MetadataFull diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index c1ac1a544..7dc5b7217 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -45,6 +45,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Period between keep alive probes. + ## Defaults to the OS configuration if not specified or zero. + # keep_alive_period = "15s" + ## SASL authentication credentials. These settings should typically be used ## with TLS encryption enabled # sasl_username = "kafka" diff --git a/plugins/inputs/kafka_consumer/sample.conf b/plugins/inputs/kafka_consumer/sample.conf index 1c06a65e0..5ce5afa9b 100644 --- a/plugins/inputs/kafka_consumer/sample.conf +++ b/plugins/inputs/kafka_consumer/sample.conf @@ -25,6 +25,10 @@ ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Period between keep alive probes. + ## Defaults to the OS configuration if not specified or zero. + # keep_alive_period = "15s" + ## SASL authentication credentials. These settings should typically be used ## with TLS encryption enabled # sasl_username = "kafka" diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 387e36840..488efe914 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -95,7 +95,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## 2 : Snappy ## 3 : LZ4 ## 4 : ZSTD - # compression_codec = 0 + # compression_codec = 0 ## Idempotent Writes ## If enabled, exactly one copy of each message is written. @@ -133,6 +133,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Period between keep alive probes. + ## Defaults to the OS configuration if not specified or zero. + # keep_alive_period = "15s" + ## Optional SOCKS5 proxy to use when connecting to brokers # socks5_enabled = true # socks5_address = "127.0.0.1:1080" diff --git a/plugins/outputs/kafka/sample.conf b/plugins/outputs/kafka/sample.conf index 6b3e40195..383e7313e 100644 --- a/plugins/outputs/kafka/sample.conf +++ b/plugins/outputs/kafka/sample.conf @@ -78,7 +78,7 @@ ## 2 : Snappy ## 3 : LZ4 ## 4 : ZSTD - # compression_codec = 0 + # compression_codec = 0 ## Idempotent Writes ## If enabled, exactly one copy of each message is written. @@ -116,6 +116,10 @@ ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Period between keep alive probes. + ## Defaults to the OS configuration if not specified or zero. + # keep_alive_period = "15s" + ## Optional SOCKS5 proxy to use when connecting to brokers # socks5_enabled = true # socks5_address = "127.0.0.1:1080"