fix(common.kafka): Enable TLS in Kafka plugins without custom config (#11519)
This commit is contained in:
parent
6e924fcd5c
commit
2b03cd9151
|
|
@ -2,6 +2,7 @@ package kafka
|
|||
|
||||
import (
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||
)
|
||||
|
||||
|
|
@ -50,8 +51,9 @@ type Config struct {
|
|||
Version string `toml:"version"`
|
||||
ClientID string `toml:"client_id"`
|
||||
CompressionCodec int `toml:"compression_codec"`
|
||||
EnableTLS *bool `toml:"enable_tls"`
|
||||
|
||||
EnableTLS *bool `toml:"enable_tls" deprecated:"1.17.0;option is ignored"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
// Disable full metadata fetching
|
||||
MetadataFull *bool `toml:"metadata_full"`
|
||||
|
|
@ -76,6 +78,10 @@ func (k *Config) SetConfig(config *sarama.Config) error {
|
|||
|
||||
config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)
|
||||
|
||||
if k.EnableTLS != nil && *k.EnableTLS {
|
||||
config.Net.TLS.Enable = true
|
||||
}
|
||||
|
||||
tlsConfig, err := k.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -83,7 +89,12 @@ func (k *Config) SetConfig(config *sarama.Config) error {
|
|||
|
||||
if tlsConfig != nil {
|
||||
config.Net.TLS.Config = tlsConfig
|
||||
config.Net.TLS.Enable = true
|
||||
|
||||
// To maintain backwards compatibility, if the enable_tls option is not
|
||||
// set TLS is enabled if a non-default TLS config is used.
|
||||
if k.EnableTLS == nil {
|
||||
config.Net.TLS.Enable = true
|
||||
}
|
||||
}
|
||||
|
||||
if k.MetadataFull != nil {
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ plugin and use the old zookeeper connection method.
|
|||
# version = ""
|
||||
|
||||
## Optional TLS Config
|
||||
# enable_tls = true
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
# tls_key = "/etc/telegraf/key.pem"
|
||||
|
|
|
|||
|
|
@ -135,6 +135,20 @@ func TestInit(t *testing.T) {
|
|||
require.False(t, plugin.config.Net.TLS.Enable)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "enabled tls without tls config",
|
||||
plugin: &KafkaConsumer{
|
||||
ReadConfig: kafka.ReadConfig{
|
||||
Config: kafka.Config{
|
||||
EnableTLS: func(b bool) *bool { return &b }(true),
|
||||
},
|
||||
},
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
check: func(t *testing.T, plugin *KafkaConsumer) {
|
||||
require.True(t, plugin.config.Net.TLS.Enable)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "default tls with a tls config",
|
||||
plugin: &KafkaConsumer{
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
# version = ""
|
||||
|
||||
## Optional TLS Config
|
||||
# enable_tls = true
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
# tls_key = "/etc/telegraf/key.pem"
|
||||
|
|
|
|||
|
|
@ -113,6 +113,7 @@ Broker](http://kafka.apache.org/07/quickstart.html) acting a Kafka Producer.
|
|||
# max_message_bytes = 1000000
|
||||
|
||||
## Optional TLS Config
|
||||
# enable_tls = true
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
# tls_key = "/etc/telegraf/key.pem"
|
||||
|
|
|
|||
|
|
@ -105,6 +105,7 @@
|
|||
# max_message_bytes = 1000000
|
||||
|
||||
## Optional TLS Config
|
||||
# enable_tls = true
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
# tls_key = "/etc/telegraf/key.pem"
|
||||
|
|
|
|||
Loading…
Reference in New Issue