Enable kafka zstd compression and idempotent writes (#8435)

This commit is contained in:
Steven Soroka 2020-11-23 15:51:58 -05:00 committed by GitHub
parent 0fcfee0caf
commit 8b30bb9534
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 194 additions and 152 deletions

View File

@ -0,0 +1,94 @@
package kafka
import (
"log"
"github.com/Shopify/sarama"
"github.com/influxdata/telegraf/plugins/common/tls"
)
// ReadConfig for kafka clients meaning to read from Kafka.
type ReadConfig struct {
Config
}
// SetConfig on the sarama.Config object from the ReadConfig struct.
func (k *ReadConfig) SetConfig(config *sarama.Config) error {
config.Consumer.Return.Errors = true
return k.Config.SetConfig(config)
}
// WriteConfig for kafka clients meaning to write to kafka
type WriteConfig struct {
Config
RequiredAcks int `toml:"required_acks"`
MaxRetry int `toml:"max_retry"`
MaxMessageBytes int `toml:"max_message_bytes"`
IdempotentWrites bool `toml:"idempotent_writes"`
}
// SetConfig on the sarama.Config object from the WriteConfig struct.
func (k *WriteConfig) SetConfig(config *sarama.Config) error {
config.Producer.Return.Successes = true
config.Producer.Idempotent = k.IdempotentWrites
config.Producer.Retry.Max = k.MaxRetry
if k.MaxMessageBytes > 0 {
config.Producer.MaxMessageBytes = k.MaxMessageBytes
}
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
return k.Config.SetConfig(config)
}
// Config common to all Kafka clients.
type Config struct {
SASLAuth
tls.ClientConfig
Version string `toml:"version"`
ClientID string `toml:"client_id"`
CompressionCodec int `toml:"compression_codec"`
// EnableTLS deprecated
EnableTLS *bool `toml:"enable_tls"`
}
// SetConfig on the sarama.Config object from the Config struct.
func (k *Config) SetConfig(config *sarama.Config) error {
if k.EnableTLS != nil {
log.Printf("W! [kafka] enable_tls is deprecated, and the setting does nothing, you can safely remove it from the config")
}
if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return err
}
config.Version = version
}
if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
config.ClientID = "Telegraf"
}
config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)
tlsConfig, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
if err := k.SetSASLConfig(config); err != nil {
return err
}
return nil
}

View File

@ -35,7 +35,7 @@ and use the old zookeeper connection method.
# insecure_skip_verify = false
## SASL authentication credentials. These settings should typically be used
## with TLS encryption enabled using the "enable_tls" option.
## with TLS encryption enabled
# sasl_username = "kafka"
# sasl_password = "secret"
@ -62,6 +62,14 @@ and use the old zookeeper connection method.
## Name of the consumer group.
# consumer_group = "telegraf_metrics_consumers"
## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"

View File

@ -12,7 +12,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
@ -36,7 +35,6 @@ const sampleConfig = `
# version = ""
## Optional TLS Config
# enable_tls = true
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
@ -44,7 +42,7 @@ const sampleConfig = `
# insecure_skip_verify = false
## SASL authentication credentials. These settings should typically be used
## with TLS encryption enabled using the "enable_tls" option.
## with TLS encryption enabled
# sasl_username = "kafka"
# sasl_password = "secret"
@ -71,6 +69,15 @@ const sampleConfig = `
## Name of the consumer group.
# consumer_group = "telegraf_metrics_consumers"
## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"
@ -110,7 +117,6 @@ type semaphore chan empty
type KafkaConsumer struct {
Brokers []string `toml:"brokers"`
ClientID string `toml:"client_id"`
ConsumerGroup string `toml:"consumer_group"`
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
@ -118,12 +124,8 @@ type KafkaConsumer struct {
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicTag string `toml:"topic_tag"`
Version string `toml:"version"`
kafka.SASLAuth
EnableTLS *bool `toml:"enable_tls"`
tls.ClientConfig
kafka.ReadConfig
Log telegraf.Logger `toml:"-"`
@ -173,50 +175,14 @@ func (k *KafkaConsumer) Init() error {
}
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
// Kafka version 0.10.2.0 is required for consumer groups.
config.Version = sarama.V0_10_2_0
if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return err
}
config.Version = version
}
if k.EnableTLS != nil && *k.EnableTLS {
config.Net.TLS.Enable = true
}
tlsConfig, err := k.ClientConfig.TLSConfig()
if err != nil {
if err := k.SetConfig(config); err != nil {
return err
}
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
// To maintain backwards compatibility, if the enable_tls option is not
// set TLS is enabled if a non-default TLS config is used.
if k.EnableTLS == nil {
k.Log.Warnf("Use of deprecated configuration: enable_tls should be set when using TLS")
config.Net.TLS.Enable = true
}
}
if err := k.SetSASLConfig(config); err != nil {
return err
}
if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
config.ClientID = "Telegraf"
}
switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Consumer.Offsets.Initial = sarama.OffsetOldest

View File

@ -7,6 +7,7 @@ import (
"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/testutil"
@ -68,8 +69,12 @@ func TestInit(t *testing.T) {
{
name: "parses valid version string",
plugin: &KafkaConsumer{
Version: "1.0.0",
Log: testutil.Logger{},
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
Version: "1.0.0",
},
},
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.Equal(t, plugin.config.Version, sarama.V1_0_0_0)
@ -78,16 +83,24 @@ func TestInit(t *testing.T) {
{
name: "invalid version string",
plugin: &KafkaConsumer{
Version: "100",
Log: testutil.Logger{},
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
Version: "100",
},
},
Log: testutil.Logger{},
},
initError: true,
},
{
name: "custom client_id",
plugin: &KafkaConsumer{
ClientID: "custom",
Log: testutil.Logger{},
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
ClientID: "custom",
},
},
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.Equal(t, plugin.config.ClientID, "custom")
@ -123,8 +136,12 @@ func TestInit(t *testing.T) {
{
name: "default tls with a tls config",
plugin: &KafkaConsumer{
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
},
},
},
Log: testutil.Logger{},
},
@ -133,24 +150,17 @@ func TestInit(t *testing.T) {
},
},
{
name: "disable tls",
name: "Insecure tls",
plugin: &KafkaConsumer{
EnableTLS: func() *bool { v := false; return &v }(),
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
},
},
},
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.False(t, plugin.config.Net.TLS.Enable)
},
},
{
name: "enable tls",
plugin: &KafkaConsumer{
EnableTLS: func() *bool { v := true; return &v }(),
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.True(t, plugin.config.Net.TLS.Enable)
},

View File

@ -72,13 +72,18 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## routing_key = "telegraf"
# routing_key = ""
## CompressionCodec represents the various compression codecs recognized by
## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : No compression
## 1 : Gzip compression
## 2 : Snappy compression
## 3 : LZ4 compression
# compression_codec = 0
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0
## Idempotent Writes
## If enabled, exactly one copy of each message is written.
# idempotent_writes = false
## RequiredAcks is used in Produce Requests to tell the broker how many
## replica acknowledgements it must see before responding

View File

@ -11,7 +11,6 @@ import (
"github.com/gofrs/uuid"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/kafka"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
@ -25,20 +24,13 @@ var ValidTopicSuffixMethods = []string{
var zeroTime = time.Unix(0, 0)
type Kafka struct {
Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
TopicTag string `toml:"topic_tag"`
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
ClientID string `toml:"client_id"`
TopicSuffix TopicSuffix `toml:"topic_suffix"`
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
CompressionCodec int `toml:"compression_codec"`
RequiredAcks int `toml:"required_acks"`
MaxRetry int `toml:"max_retry"`
MaxMessageBytes int `toml:"max_message_bytes"`
Version string `toml:"version"`
Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
TopicTag string `toml:"topic_tag"`
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
TopicSuffix TopicSuffix `toml:"topic_suffix"`
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
// Legacy TLS config options
// TLS client certificate
@ -48,10 +40,7 @@ type Kafka struct {
// TLS certificate authority
CA string
EnableTLS *bool `toml:"enable_tls"`
tlsint.ClientConfig
kafka.SASLAuth
kafka.WriteConfig
Log telegraf.Logger `toml:"-"`
@ -158,14 +147,19 @@ var sampleConfig = `
## routing_key = "telegraf"
# routing_key = ""
## CompressionCodec represents the various compression codecs recognized by
## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : No compression
## 1 : Gzip compression
## 2 : Snappy compression
## 3 : LZ4 compression
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0
## Idempotent Writes
## If enabled, exactly one copy of each message is written.
# idempotent_writes = false
## RequiredAcks is used in Produce Requests to tell the broker how many
## replica acknowledgements it must see before responding
## 0 : the producer never waits for an acknowledgement from the broker.
@ -191,7 +185,6 @@ var sampleConfig = `
# max_message_bytes = 1000000
## Optional TLS Config
# enable_tls = true
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
@ -278,34 +271,15 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}
func (k *Kafka) Connect() error {
func (k *Kafka) Init() error {
err := ValidateTopicSuffixMethod(k.TopicSuffix.Method)
if err != nil {
return err
}
config := sarama.NewConfig()
if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return err
}
config.Version = version
}
if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
config.ClientID = "Telegraf"
}
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)
config.Producer.Retry.Max = k.MaxRetry
config.Producer.Return.Successes = true
if k.MaxMessageBytes > 0 {
config.Producer.MaxMessageBytes = k.MaxMessageBytes
if err := k.SetConfig(config); err != nil {
return err
}
// Legacy support ssl config
@ -315,30 +289,6 @@ func (k *Kafka) Connect() error {
k.TLSKey = k.Key
}
if k.EnableTLS != nil && *k.EnableTLS {
config.Net.TLS.Enable = true
}
tlsConfig, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
// To maintain backwards compatibility, if the enable_tls option is not
// set TLS is enabled if a non-default TLS config is used.
if k.EnableTLS == nil {
k.Log.Warnf("Use of deprecated configuration: enable_tls should be set when using TLS")
config.Net.TLS.Enable = true
}
}
if err := k.SetSASLConfig(config); err != nil {
return err
}
producer, err := k.producerFunc(k.Brokers, config)
if err != nil {
return err
@ -347,6 +297,10 @@ func (k *Kafka) Connect() error {
return nil
}
func (k *Kafka) Connect() error {
return nil
}
func (k *Kafka) Close() error {
return k.producer.Close()
}
@ -436,8 +390,10 @@ func init() {
sarama.Logger = &DebugLogger{}
outputs.Add("kafka", func() telegraf.Output {
return &Kafka{
MaxRetry: 3,
RequiredAcks: -1,
WriteConfig: kafka.WriteConfig{
MaxRetry: 3,
RequiredAcks: -1,
},
producerFunc: sarama.NewSyncProducer,
}
})

View File

@ -25,13 +25,16 @@ func TestConnectAndWrite(t *testing.T) {
brokers := []string{testutil.GetLocalHost() + ":9092"}
s, _ := serializers.NewInfluxSerializer()
k := &Kafka{
Brokers: brokers,
Topic: "Test",
serializer: s,
Brokers: brokers,
Topic: "Test",
serializer: s,
producerFunc: sarama.NewSyncProducer,
}
// Verify that we can connect to the Kafka broker
err := k.Connect()
err := k.Init()
require.NoError(t, err)
err = k.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to the kafka broker