diff --git a/plugins/common/kafka/config.go b/plugins/common/kafka/config.go new file mode 100644 index 000000000..f68030403 --- /dev/null +++ b/plugins/common/kafka/config.go @@ -0,0 +1,94 @@ +package kafka + +import ( + "log" + + "github.com/Shopify/sarama" + "github.com/influxdata/telegraf/plugins/common/tls" +) + +// ReadConfig for kafka clients meaning to read from Kafka. +type ReadConfig struct { + Config +} + +// SetConfig on the sarama.Config object from the ReadConfig struct. +func (k *ReadConfig) SetConfig(config *sarama.Config) error { + config.Consumer.Return.Errors = true + + return k.Config.SetConfig(config) +} + +// WriteConfig for kafka clients meaning to write to kafka +type WriteConfig struct { + Config + + RequiredAcks int `toml:"required_acks"` + MaxRetry int `toml:"max_retry"` + MaxMessageBytes int `toml:"max_message_bytes"` + IdempotentWrites bool `toml:"idempotent_writes"` +} + +// SetConfig on the sarama.Config object from the WriteConfig struct. +func (k *WriteConfig) SetConfig(config *sarama.Config) error { + config.Producer.Return.Successes = true + config.Producer.Idempotent = k.IdempotentWrites + config.Producer.Retry.Max = k.MaxRetry + if k.MaxMessageBytes > 0 { + config.Producer.MaxMessageBytes = k.MaxMessageBytes + } + config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks) + return k.Config.SetConfig(config) +} + +// Config common to all Kafka clients. +type Config struct { + SASLAuth + tls.ClientConfig + + Version string `toml:"version"` + ClientID string `toml:"client_id"` + CompressionCodec int `toml:"compression_codec"` + + // EnableTLS deprecated + EnableTLS *bool `toml:"enable_tls"` +} + +// SetConfig on the sarama.Config object from the Config struct. +func (k *Config) SetConfig(config *sarama.Config) error { + if k.EnableTLS != nil { + log.Printf("W! [kafka] enable_tls is deprecated, and the setting does nothing, you can safely remove it from the config") + } + if k.Version != "" { + version, err := sarama.ParseKafkaVersion(k.Version) + if err != nil { + return err + } + + config.Version = version + } + + if k.ClientID != "" { + config.ClientID = k.ClientID + } else { + config.ClientID = "Telegraf" + } + + config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec) + + tlsConfig, err := k.ClientConfig.TLSConfig() + if err != nil { + return err + } + + if tlsConfig != nil { + config.Net.TLS.Config = tlsConfig + config.Net.TLS.Enable = true + } + + if err := k.SetSASLConfig(config); err != nil { + return err + } + + return nil +} diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 3535f8fce..ac04925a2 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -35,7 +35,7 @@ and use the old zookeeper connection method. # insecure_skip_verify = false ## SASL authentication credentials. These settings should typically be used - ## with TLS encryption enabled using the "enable_tls" option. + ## with TLS encryption enabled # sasl_username = "kafka" # sasl_password = "secret" @@ -62,6 +62,14 @@ and use the old zookeeper connection method. ## Name of the consumer group. # consumer_group = "telegraf_metrics_consumers" + ## Compression codec represents the various compression codecs recognized by + ## Kafka in messages. + ## 0 : None + ## 1 : Gzip + ## 2 : Snappy + ## 3 : LZ4 + ## 4 : ZSTD + # compression_codec = 0 ## Initial offset position; one of "oldest" or "newest". # offset = "oldest" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index a0b4b41cf..78feacdd3 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -12,7 +12,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/kafka" - "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" ) @@ -36,7 +35,6 @@ const sampleConfig = ` # version = "" ## Optional TLS Config - # enable_tls = true # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" @@ -44,7 +42,7 @@ const sampleConfig = ` # insecure_skip_verify = false ## SASL authentication credentials. These settings should typically be used - ## with TLS encryption enabled using the "enable_tls" option. + ## with TLS encryption enabled # sasl_username = "kafka" # sasl_password = "secret" @@ -71,6 +69,15 @@ const sampleConfig = ` ## Name of the consumer group. # consumer_group = "telegraf_metrics_consumers" + ## Compression codec represents the various compression codecs recognized by + ## Kafka in messages. + ## 0 : None + ## 1 : Gzip + ## 2 : Snappy + ## 3 : LZ4 + ## 4 : ZSTD + # compression_codec = 0 + ## Initial offset position; one of "oldest" or "newest". # offset = "oldest" @@ -110,7 +117,6 @@ type semaphore chan empty type KafkaConsumer struct { Brokers []string `toml:"brokers"` - ClientID string `toml:"client_id"` ConsumerGroup string `toml:"consumer_group"` MaxMessageLen int `toml:"max_message_len"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` @@ -118,12 +124,8 @@ type KafkaConsumer struct { BalanceStrategy string `toml:"balance_strategy"` Topics []string `toml:"topics"` TopicTag string `toml:"topic_tag"` - Version string `toml:"version"` - kafka.SASLAuth - - EnableTLS *bool `toml:"enable_tls"` - tls.ClientConfig + kafka.ReadConfig Log telegraf.Logger `toml:"-"` @@ -173,50 +175,14 @@ func (k *KafkaConsumer) Init() error { } config := sarama.NewConfig() - config.Consumer.Return.Errors = true // Kafka version 0.10.2.0 is required for consumer groups. config.Version = sarama.V0_10_2_0 - if k.Version != "" { - version, err := sarama.ParseKafkaVersion(k.Version) - if err != nil { - return err - } - - config.Version = version - } - - if k.EnableTLS != nil && *k.EnableTLS { - config.Net.TLS.Enable = true - } - - tlsConfig, err := k.ClientConfig.TLSConfig() - if err != nil { + if err := k.SetConfig(config); err != nil { return err } - if tlsConfig != nil { - config.Net.TLS.Config = tlsConfig - - // To maintain backwards compatibility, if the enable_tls option is not - // set TLS is enabled if a non-default TLS config is used. - if k.EnableTLS == nil { - k.Log.Warnf("Use of deprecated configuration: enable_tls should be set when using TLS") - config.Net.TLS.Enable = true - } - } - - if err := k.SetSASLConfig(config); err != nil { - return err - } - - if k.ClientID != "" { - config.ClientID = k.ClientID - } else { - config.ClientID = "Telegraf" - } - switch strings.ToLower(k.Offset) { case "oldest", "": config.Consumer.Offsets.Initial = sarama.OffsetOldest diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 5973fa82a..d7804a01b 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -7,6 +7,7 @@ import ( "github.com/Shopify/sarama" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/testutil" @@ -68,8 +69,12 @@ func TestInit(t *testing.T) { { name: "parses valid version string", plugin: &KafkaConsumer{ - Version: "1.0.0", - Log: testutil.Logger{}, + ReadConfig: kafka.ReadConfig{ + Config: kafka.Config{ + Version: "1.0.0", + }, + }, + Log: testutil.Logger{}, }, check: func(t *testing.T, plugin *KafkaConsumer) { require.Equal(t, plugin.config.Version, sarama.V1_0_0_0) @@ -78,16 +83,24 @@ func TestInit(t *testing.T) { { name: "invalid version string", plugin: &KafkaConsumer{ - Version: "100", - Log: testutil.Logger{}, + ReadConfig: kafka.ReadConfig{ + Config: kafka.Config{ + Version: "100", + }, + }, + Log: testutil.Logger{}, }, initError: true, }, { name: "custom client_id", plugin: &KafkaConsumer{ - ClientID: "custom", - Log: testutil.Logger{}, + ReadConfig: kafka.ReadConfig{ + Config: kafka.Config{ + ClientID: "custom", + }, + }, + Log: testutil.Logger{}, }, check: func(t *testing.T, plugin *KafkaConsumer) { require.Equal(t, plugin.config.ClientID, "custom") @@ -123,8 +136,12 @@ func TestInit(t *testing.T) { { name: "default tls with a tls config", plugin: &KafkaConsumer{ - ClientConfig: tls.ClientConfig{ - InsecureSkipVerify: true, + ReadConfig: kafka.ReadConfig{ + Config: kafka.Config{ + ClientConfig: tls.ClientConfig{ + InsecureSkipVerify: true, + }, + }, }, Log: testutil.Logger{}, }, @@ -133,24 +150,17 @@ func TestInit(t *testing.T) { }, }, { - name: "disable tls", + name: "Insecure tls", plugin: &KafkaConsumer{ - EnableTLS: func() *bool { v := false; return &v }(), - ClientConfig: tls.ClientConfig{ - InsecureSkipVerify: true, + ReadConfig: kafka.ReadConfig{ + Config: kafka.Config{ + ClientConfig: tls.ClientConfig{ + InsecureSkipVerify: true, + }, + }, }, Log: testutil.Logger{}, }, - check: func(t *testing.T, plugin *KafkaConsumer) { - require.False(t, plugin.config.Net.TLS.Enable) - }, - }, - { - name: "enable tls", - plugin: &KafkaConsumer{ - EnableTLS: func() *bool { v := true; return &v }(), - Log: testutil.Logger{}, - }, check: func(t *testing.T, plugin *KafkaConsumer) { require.True(t, plugin.config.Net.TLS.Enable) }, diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 8c16ee054..e76522018 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -72,13 +72,18 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## routing_key = "telegraf" # routing_key = "" - ## CompressionCodec represents the various compression codecs recognized by + ## Compression codec represents the various compression codecs recognized by ## Kafka in messages. - ## 0 : No compression - ## 1 : Gzip compression - ## 2 : Snappy compression - ## 3 : LZ4 compression - # compression_codec = 0 + ## 0 : None + ## 1 : Gzip + ## 2 : Snappy + ## 3 : LZ4 + ## 4 : ZSTD + # compression_codec = 0 + + ## Idempotent Writes + ## If enabled, exactly one copy of each message is written. + # idempotent_writes = false ## RequiredAcks is used in Produce Requests to tell the broker how many ## replica acknowledgements it must see before responding diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 5fdfae48d..ceb2b93a6 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -11,7 +11,6 @@ import ( "github.com/gofrs/uuid" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/common/kafka" - tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" ) @@ -25,20 +24,13 @@ var ValidTopicSuffixMethods = []string{ var zeroTime = time.Unix(0, 0) type Kafka struct { - Brokers []string `toml:"brokers"` - Topic string `toml:"topic"` - TopicTag string `toml:"topic_tag"` - ExcludeTopicTag bool `toml:"exclude_topic_tag"` - ClientID string `toml:"client_id"` - TopicSuffix TopicSuffix `toml:"topic_suffix"` - RoutingTag string `toml:"routing_tag"` - RoutingKey string `toml:"routing_key"` - CompressionCodec int `toml:"compression_codec"` - RequiredAcks int `toml:"required_acks"` - MaxRetry int `toml:"max_retry"` - MaxMessageBytes int `toml:"max_message_bytes"` - - Version string `toml:"version"` + Brokers []string `toml:"brokers"` + Topic string `toml:"topic"` + TopicTag string `toml:"topic_tag"` + ExcludeTopicTag bool `toml:"exclude_topic_tag"` + TopicSuffix TopicSuffix `toml:"topic_suffix"` + RoutingTag string `toml:"routing_tag"` + RoutingKey string `toml:"routing_key"` // Legacy TLS config options // TLS client certificate @@ -48,10 +40,7 @@ type Kafka struct { // TLS certificate authority CA string - EnableTLS *bool `toml:"enable_tls"` - tlsint.ClientConfig - - kafka.SASLAuth + kafka.WriteConfig Log telegraf.Logger `toml:"-"` @@ -158,14 +147,19 @@ var sampleConfig = ` ## routing_key = "telegraf" # routing_key = "" - ## CompressionCodec represents the various compression codecs recognized by + ## Compression codec represents the various compression codecs recognized by ## Kafka in messages. - ## 0 : No compression - ## 1 : Gzip compression - ## 2 : Snappy compression - ## 3 : LZ4 compression + ## 0 : None + ## 1 : Gzip + ## 2 : Snappy + ## 3 : LZ4 + ## 4 : ZSTD # compression_codec = 0 + ## Idempotent Writes + ## If enabled, exactly one copy of each message is written. + # idempotent_writes = false + ## RequiredAcks is used in Produce Requests to tell the broker how many ## replica acknowledgements it must see before responding ## 0 : the producer never waits for an acknowledgement from the broker. @@ -191,7 +185,6 @@ var sampleConfig = ` # max_message_bytes = 1000000 ## Optional TLS Config - # enable_tls = true # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" @@ -278,34 +271,15 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) { k.serializer = serializer } -func (k *Kafka) Connect() error { +func (k *Kafka) Init() error { err := ValidateTopicSuffixMethod(k.TopicSuffix.Method) if err != nil { return err } config := sarama.NewConfig() - if k.Version != "" { - version, err := sarama.ParseKafkaVersion(k.Version) - if err != nil { - return err - } - config.Version = version - } - - if k.ClientID != "" { - config.ClientID = k.ClientID - } else { - config.ClientID = "Telegraf" - } - - config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks) - config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec) - config.Producer.Retry.Max = k.MaxRetry - config.Producer.Return.Successes = true - - if k.MaxMessageBytes > 0 { - config.Producer.MaxMessageBytes = k.MaxMessageBytes + if err := k.SetConfig(config); err != nil { + return err } // Legacy support ssl config @@ -315,30 +289,6 @@ func (k *Kafka) Connect() error { k.TLSKey = k.Key } - if k.EnableTLS != nil && *k.EnableTLS { - config.Net.TLS.Enable = true - } - - tlsConfig, err := k.ClientConfig.TLSConfig() - if err != nil { - return err - } - - if tlsConfig != nil { - config.Net.TLS.Config = tlsConfig - - // To maintain backwards compatibility, if the enable_tls option is not - // set TLS is enabled if a non-default TLS config is used. - if k.EnableTLS == nil { - k.Log.Warnf("Use of deprecated configuration: enable_tls should be set when using TLS") - config.Net.TLS.Enable = true - } - } - - if err := k.SetSASLConfig(config); err != nil { - return err - } - producer, err := k.producerFunc(k.Brokers, config) if err != nil { return err @@ -347,6 +297,10 @@ func (k *Kafka) Connect() error { return nil } +func (k *Kafka) Connect() error { + return nil +} + func (k *Kafka) Close() error { return k.producer.Close() } @@ -436,8 +390,10 @@ func init() { sarama.Logger = &DebugLogger{} outputs.Add("kafka", func() telegraf.Output { return &Kafka{ - MaxRetry: 3, - RequiredAcks: -1, + WriteConfig: kafka.WriteConfig{ + MaxRetry: 3, + RequiredAcks: -1, + }, producerFunc: sarama.NewSyncProducer, } }) diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index 070eea3f9..4e93515fe 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -25,13 +25,16 @@ func TestConnectAndWrite(t *testing.T) { brokers := []string{testutil.GetLocalHost() + ":9092"} s, _ := serializers.NewInfluxSerializer() k := &Kafka{ - Brokers: brokers, - Topic: "Test", - serializer: s, + Brokers: brokers, + Topic: "Test", + serializer: s, + producerFunc: sarama.NewSyncProducer, } // Verify that we can connect to the Kafka broker - err := k.Connect() + err := k.Init() + require.NoError(t, err) + err = k.Connect() require.NoError(t, err) // Verify that we can successfully write data to the kafka broker