From e83a1656351058571a7235471b9cdf46326c010a Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Wed, 28 Oct 2020 12:16:59 -0400 Subject: [PATCH] kafka sasl-mechanism auth support for SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (#8318) --- docs/LICENSE_OF_DEPENDENCIES.md | 2 + go.mod | 1 + go.sum | 2 + plugins/common/kafka/sasl.go | 84 ++++++++++++++ plugins/common/kafka/scram_client.go | 36 ++++++ plugins/inputs/kafka_consumer/README.md | 17 +++ .../inputs/kafka_consumer/kafka_consumer.go | 34 +++--- plugins/outputs/kafka/README.md | 17 +++ plugins/outputs/kafka/kafka.go | 104 +++++++++--------- 9 files changed, 235 insertions(+), 62 deletions(-) create mode 100644 plugins/common/kafka/scram_client.go diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index a6ade91a5..66dc38b43 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -141,6 +141,8 @@ following works: - github.com/wavefronthq/wavefront-sdk-go [Apache License 2.0](https://github.com/wavefrontHQ/wavefront-sdk-go/blob/master/LICENSE) - github.com/wvanbergen/kafka [MIT License](https://github.com/wvanbergen/kafka/blob/master/LICENSE) - github.com/wvanbergen/kazoo-go [MIT License](https://github.com/wvanbergen/kazoo-go/blob/master/MIT-LICENSE) +- github.com/xdg/scram [Apache License 2.0](https://github.com/xdg-go/scram/blob/master/LICENSE) +- github.com/xdg/stringprep [Apache License 2.0](https://github.com/xdg-go/stringprep/blob/master/LICENSE) - github.com/yuin/gopher-lua [MIT License](https://github.com/yuin/gopher-lua/blob/master/LICENSE) - go.opencensus.io [Apache License 2.0](https://github.com/census-instrumentation/opencensus-go/blob/master/LICENSE) - go.starlark.net [BSD 3-Clause "New" or "Revised" License](https://github.com/google/starlark-go/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 92de24ee1..ecc7f1d9f 100644 --- a/go.mod +++ b/go.mod @@ -127,6 +127,7 @@ require ( github.com/wavefronthq/wavefront-sdk-go v0.9.2 github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 // indirect go.starlark.net v0.0.0-20200901195727-6e684ef5eeee golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect diff --git a/go.sum b/go.sum index 1297eec30..37d106f3f 100644 --- a/go.sum +++ b/go.sum @@ -592,7 +592,9 @@ github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf h1:TOV5PC6fIWwFOF github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf/go.mod h1:nxx7XRXbR9ykhnC8lXqQyJS0rfvJGxKyKw/sT1YOttg= github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a h1:ILoU84rj4AQ3q6cjQvtb9jBjx4xzR/Riq/zYhmDQiOk= github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a/go.mod h1:vQQATAGxVK20DC1rRubTJbZDDhhpA4QfU02pMdPxGO4= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 h1:f6CCNiTjQZ0uWK4jPwhwYB8QIGGfn0ssD9kVzRUUUpk= diff --git a/plugins/common/kafka/sasl.go b/plugins/common/kafka/sasl.go index cd3358b38..e565aea58 100644 --- a/plugins/common/kafka/sasl.go +++ b/plugins/common/kafka/sasl.go @@ -6,6 +6,78 @@ import ( "github.com/Shopify/sarama" ) +type SASLAuth struct { + SASLUsername string `toml:"sasl_username"` + SASLPassword string `toml:"sasl_password"` + SASLMechanism string `toml:"sasl_mechanism"` + SASLVersion *int `toml:"sasl_version"` + + // GSSAPI config + SASLGSSAPIServiceName string `toml:"sasl_gssapi_service_name"` + SASLGSSAPIAuthType string `toml:"sasl_gssapi_auth_type"` + SASLGSSAPIDisablePAFXFAST bool `toml:"sasl_gssapi_disable_pafxfast"` + SASLGSSAPIKerberosConfigPath string `toml:"sasl_gssapi_kerberos_config_path"` + SASLGSSAPIKeyTabPath string `toml:"sasl_gssapi_key_tab_path"` + SASLGSSAPIRealm string `toml:"sasl_gssapi_realm"` + + // OAUTHBEARER config. experimental. undoubtedly this is not good enough. + SASLAccessToken string `toml:"sasl_access_token"` +} + +// SetSASLConfig configures SASL for kafka (sarama) +func (k *SASLAuth) SetSASLConfig(config *sarama.Config) error { + config.Net.SASL.User = k.SASLUsername + config.Net.SASL.Password = k.SASLPassword + + if k.SASLMechanism != "" { + config.Net.SASL.Mechanism = sarama.SASLMechanism(k.SASLMechanism) + switch config.Net.SASL.Mechanism { + case sarama.SASLTypeSCRAMSHA256: + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA256} + } + case sarama.SASLTypeSCRAMSHA512: + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA512} + } + case sarama.SASLTypeOAuth: + config.Net.SASL.TokenProvider = k // use self as token provider. + case sarama.SASLTypeGSSAPI: + config.Net.SASL.GSSAPI.ServiceName = k.SASLGSSAPIServiceName + config.Net.SASL.GSSAPI.AuthType = gssapiAuthType(k.SASLGSSAPIAuthType) + config.Net.SASL.GSSAPI.Username = k.SASLUsername + config.Net.SASL.GSSAPI.Password = k.SASLPassword + config.Net.SASL.GSSAPI.DisablePAFXFAST = k.SASLGSSAPIDisablePAFXFAST + config.Net.SASL.GSSAPI.KerberosConfigPath = k.SASLGSSAPIKerberosConfigPath + config.Net.SASL.GSSAPI.KeyTabPath = k.SASLGSSAPIKeyTabPath + config.Net.SASL.GSSAPI.Realm = k.SASLGSSAPIRealm + + case sarama.SASLTypePlaintext: + // nothing. + default: + } + } + + if k.SASLUsername != "" || k.SASLMechanism != "" { + config.Net.SASL.Enable = true + + version, err := SASLVersion(config.Version, k.SASLVersion) + if err != nil { + return err + } + config.Net.SASL.Version = version + } + return nil +} + +// Token does nothing smart, it just grabs a hard-coded token from config. +func (k *SASLAuth) Token() (*sarama.AccessToken, error) { + return &sarama.AccessToken{ + Token: k.SASLAccessToken, + Extensions: map[string]string{}, + }, nil +} + func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, error) { if saslVersion == nil { if kafkaVersion.IsAtLeast(sarama.V1_0_0_0) { @@ -23,3 +95,15 @@ func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, err return 0, errors.New("invalid SASL version") } } + +func gssapiAuthType(authType string) int { + switch authType { + case "KRB5_USER_AUTH": + return sarama.KRB5_USER_AUTH + case "KRB5_KEYTAB_AUTH": + return sarama.KRB5_KEYTAB_AUTH + default: + return 0 + } + +} diff --git a/plugins/common/kafka/scram_client.go b/plugins/common/kafka/scram_client.go new file mode 100644 index 000000000..f6aa9d6c4 --- /dev/null +++ b/plugins/common/kafka/scram_client.go @@ -0,0 +1,36 @@ +package kafka + +import ( + "crypto/sha256" + "crypto/sha512" + "hash" + + "github.com/xdg/scram" +) + +var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } +var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index dec39cc32..3535f8fce 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -39,6 +39,23 @@ and use the old zookeeper connection method. # sasl_username = "kafka" # sasl_password = "secret" + ## Optional SASL: + ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI + ## (defaults to PLAIN) + # sasl_mechanism = "" + + ## used if sasl_mechanism is GSSAPI (experimental) + # sasl_gssapi_service_name = "" + # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH + # sasl_gssapi_auth_type = "KRB5_USER_AUTH" + # sasl_gssapi_kerberos_config_path = "/" + # sasl_gssapi_realm = "realm" + # sasl_gssapi_key_tab_path = "" + # sasl_gssapi_disable_pafxfast = false + + ## used if sasl_mechanism is OAUTHBEARER (experimental) + # sasl_access_token = "" + ## SASL protocol version. When connecting to Azure EventHub set to 0. # sasl_version = 1 diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 0fd7d3693..a0b4b41cf 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -48,6 +48,23 @@ const sampleConfig = ` # sasl_username = "kafka" # sasl_password = "secret" + ## Optional SASL: + ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI + ## (defaults to PLAIN) + # sasl_mechanism = "" + + ## used if sasl_mechanism is GSSAPI (experimental) + # sasl_gssapi_service_name = "" + # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH + # sasl_gssapi_auth_type = "KRB5_USER_AUTH" + # sasl_gssapi_kerberos_config_path = "/" + # sasl_gssapi_realm = "realm" + # sasl_gssapi_key_tab_path = "" + # sasl_gssapi_disable_pafxfast = false + + ## used if sasl_mechanism is OAUTHBEARER (experimental) + # sasl_access_token = "" + ## SASL protocol version. When connecting to Azure EventHub set to 0. # sasl_version = 1 @@ -102,9 +119,8 @@ type KafkaConsumer struct { Topics []string `toml:"topics"` TopicTag string `toml:"topic_tag"` Version string `toml:"version"` - SASLPassword string `toml:"sasl_password"` - SASLUsername string `toml:"sasl_username"` - SASLVersion *int `toml:"sasl_version"` + + kafka.SASLAuth EnableTLS *bool `toml:"enable_tls"` tls.ClientConfig @@ -191,16 +207,8 @@ func (k *KafkaConsumer) Init() error { } } - if k.SASLUsername != "" && k.SASLPassword != "" { - config.Net.SASL.User = k.SASLUsername - config.Net.SASL.Password = k.SASLPassword - config.Net.SASL.Enable = true - - version, err := kafka.SASLVersion(config.Version, k.SASLVersion) - if err != nil { - return err - } - config.Net.SASL.Version = version + if err := k.SetSASLConfig(config); err != nil { + return err } if k.ClientID != "" { diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index d1cc9f0cb..8c16ee054 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -111,6 +111,23 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm # sasl_username = "kafka" # sasl_password = "secret" + ## Optional SASL: + ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI + ## (defaults to PLAIN) + # sasl_mechanism = "" + + ## used if sasl_mechanism is GSSAPI (experimental) + # sasl_gssapi_service_name = "" + # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH + # sasl_gssapi_auth_type = "KRB5_USER_AUTH" + # sasl_gssapi_kerberos_config_path = "/" + # sasl_gssapi_realm = "realm" + # sasl_gssapi_key_tab_path = "" + # sasl_gssapi_disable_pafxfast = false + + ## used if sasl_mechanism is OAUTHBEARER (experimental) + # sasl_access_token = "" + ## SASL protocol version. When connecting to Azure EventHub set to 0. # sasl_version = 1 diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 26a0c5bdb..5fdfae48d 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -24,53 +24,50 @@ var ValidTopicSuffixMethods = []string{ var zeroTime = time.Unix(0, 0) -type ( - Kafka struct { - Brokers []string `toml:"brokers"` - Topic string `toml:"topic"` - TopicTag string `toml:"topic_tag"` - ExcludeTopicTag bool `toml:"exclude_topic_tag"` - ClientID string `toml:"client_id"` - TopicSuffix TopicSuffix `toml:"topic_suffix"` - RoutingTag string `toml:"routing_tag"` - RoutingKey string `toml:"routing_key"` - CompressionCodec int `toml:"compression_codec"` - RequiredAcks int `toml:"required_acks"` - MaxRetry int `toml:"max_retry"` - MaxMessageBytes int `toml:"max_message_bytes"` +type Kafka struct { + Brokers []string `toml:"brokers"` + Topic string `toml:"topic"` + TopicTag string `toml:"topic_tag"` + ExcludeTopicTag bool `toml:"exclude_topic_tag"` + ClientID string `toml:"client_id"` + TopicSuffix TopicSuffix `toml:"topic_suffix"` + RoutingTag string `toml:"routing_tag"` + RoutingKey string `toml:"routing_key"` + CompressionCodec int `toml:"compression_codec"` + RequiredAcks int `toml:"required_acks"` + MaxRetry int `toml:"max_retry"` + MaxMessageBytes int `toml:"max_message_bytes"` - Version string `toml:"version"` + Version string `toml:"version"` - // Legacy TLS config options - // TLS client certificate - Certificate string - // TLS client key - Key string - // TLS certificate authority - CA string + // Legacy TLS config options + // TLS client certificate + Certificate string + // TLS client key + Key string + // TLS certificate authority + CA string - EnableTLS *bool `toml:"enable_tls"` - tlsint.ClientConfig + EnableTLS *bool `toml:"enable_tls"` + tlsint.ClientConfig - SASLUsername string `toml:"sasl_username"` - SASLPassword string `toml:"sasl_password"` - SASLVersion *int `toml:"sasl_version"` + kafka.SASLAuth - Log telegraf.Logger `toml:"-"` + Log telegraf.Logger `toml:"-"` - tlsConfig tls.Config + tlsConfig tls.Config - producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error) - producer sarama.SyncProducer + producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error) + producer sarama.SyncProducer - serializer serializers.Serializer - } - TopicSuffix struct { - Method string `toml:"method"` - Keys []string `toml:"keys"` - Separator string `toml:"separator"` - } -) + serializer serializers.Serializer +} + +type TopicSuffix struct { + Method string `toml:"method"` + Keys []string `toml:"keys"` + Separator string `toml:"separator"` +} // DebugLogger logs messages from sarama at the debug level. type DebugLogger struct { @@ -205,6 +202,23 @@ var sampleConfig = ` # sasl_username = "kafka" # sasl_password = "secret" + ## Optional SASL: + ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI + ## (defaults to PLAIN) + # sasl_mechanism = "" + + ## used if sasl_mechanism is GSSAPI (experimental) + # sasl_gssapi_service_name = "" + # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH + # sasl_gssapi_auth_type = "KRB5_USER_AUTH" + # sasl_gssapi_kerberos_config_path = "/" + # sasl_gssapi_realm = "realm" + # sasl_gssapi_key_tab_path = "" + # sasl_gssapi_disable_pafxfast = false + + ## used if sasl_mechanism is OAUTHBEARER (experimental) + # sasl_access_token = "" + ## SASL protocol version. When connecting to Azure EventHub set to 0. # sasl_version = 1 @@ -321,16 +335,8 @@ func (k *Kafka) Connect() error { } } - if k.SASLUsername != "" && k.SASLPassword != "" { - config.Net.SASL.User = k.SASLUsername - config.Net.SASL.Password = k.SASLPassword - config.Net.SASL.Enable = true - - version, err := kafka.SASLVersion(config.Version, k.SASLVersion) - if err != nil { - return err - } - config.Net.SASL.Version = version + if err := k.SetSASLConfig(config); err != nil { + return err } producer, err := k.producerFunc(k.Brokers, config)