kafka sasl-mechanism auth support for SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (#8318)
This commit is contained in:
parent
1313f2314f
commit
e83a165635
|
|
@ -141,6 +141,8 @@ following works:
|
||||||
- github.com/wavefronthq/wavefront-sdk-go [Apache License 2.0](https://github.com/wavefrontHQ/wavefront-sdk-go/blob/master/LICENSE)
|
- github.com/wavefronthq/wavefront-sdk-go [Apache License 2.0](https://github.com/wavefrontHQ/wavefront-sdk-go/blob/master/LICENSE)
|
||||||
- github.com/wvanbergen/kafka [MIT License](https://github.com/wvanbergen/kafka/blob/master/LICENSE)
|
- github.com/wvanbergen/kafka [MIT License](https://github.com/wvanbergen/kafka/blob/master/LICENSE)
|
||||||
- github.com/wvanbergen/kazoo-go [MIT License](https://github.com/wvanbergen/kazoo-go/blob/master/MIT-LICENSE)
|
- github.com/wvanbergen/kazoo-go [MIT License](https://github.com/wvanbergen/kazoo-go/blob/master/MIT-LICENSE)
|
||||||
|
- github.com/xdg/scram [Apache License 2.0](https://github.com/xdg-go/scram/blob/master/LICENSE)
|
||||||
|
- github.com/xdg/stringprep [Apache License 2.0](https://github.com/xdg-go/stringprep/blob/master/LICENSE)
|
||||||
- github.com/yuin/gopher-lua [MIT License](https://github.com/yuin/gopher-lua/blob/master/LICENSE)
|
- github.com/yuin/gopher-lua [MIT License](https://github.com/yuin/gopher-lua/blob/master/LICENSE)
|
||||||
- go.opencensus.io [Apache License 2.0](https://github.com/census-instrumentation/opencensus-go/blob/master/LICENSE)
|
- go.opencensus.io [Apache License 2.0](https://github.com/census-instrumentation/opencensus-go/blob/master/LICENSE)
|
||||||
- go.starlark.net [BSD 3-Clause "New" or "Revised" License](https://github.com/google/starlark-go/blob/master/LICENSE)
|
- go.starlark.net [BSD 3-Clause "New" or "Revised" License](https://github.com/google/starlark-go/blob/master/LICENSE)
|
||||||
|
|
|
||||||
1
go.mod
1
go.mod
|
|
@ -127,6 +127,7 @@ require (
|
||||||
github.com/wavefronthq/wavefront-sdk-go v0.9.2
|
github.com/wavefronthq/wavefront-sdk-go v0.9.2
|
||||||
github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf
|
github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf
|
||||||
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect
|
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect
|
||||||
|
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
|
||||||
github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 // indirect
|
github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 // indirect
|
||||||
go.starlark.net v0.0.0-20200901195727-6e684ef5eeee
|
go.starlark.net v0.0.0-20200901195727-6e684ef5eeee
|
||||||
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
|
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
|
||||||
|
|
|
||||||
2
go.sum
2
go.sum
|
|
@ -592,7 +592,9 @@ github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf h1:TOV5PC6fIWwFOF
|
||||||
github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf/go.mod h1:nxx7XRXbR9ykhnC8lXqQyJS0rfvJGxKyKw/sT1YOttg=
|
github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf/go.mod h1:nxx7XRXbR9ykhnC8lXqQyJS0rfvJGxKyKw/sT1YOttg=
|
||||||
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a h1:ILoU84rj4AQ3q6cjQvtb9jBjx4xzR/Riq/zYhmDQiOk=
|
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a h1:ILoU84rj4AQ3q6cjQvtb9jBjx4xzR/Riq/zYhmDQiOk=
|
||||||
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a/go.mod h1:vQQATAGxVK20DC1rRubTJbZDDhhpA4QfU02pMdPxGO4=
|
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a/go.mod h1:vQQATAGxVK20DC1rRubTJbZDDhhpA4QfU02pMdPxGO4=
|
||||||
|
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
|
||||||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
|
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
|
||||||
|
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
|
||||||
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
|
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
|
||||||
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||||
github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 h1:f6CCNiTjQZ0uWK4jPwhwYB8QIGGfn0ssD9kVzRUUUpk=
|
github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 h1:f6CCNiTjQZ0uWK4jPwhwYB8QIGGfn0ssD9kVzRUUUpk=
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,78 @@ import (
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type SASLAuth struct {
|
||||||
|
SASLUsername string `toml:"sasl_username"`
|
||||||
|
SASLPassword string `toml:"sasl_password"`
|
||||||
|
SASLMechanism string `toml:"sasl_mechanism"`
|
||||||
|
SASLVersion *int `toml:"sasl_version"`
|
||||||
|
|
||||||
|
// GSSAPI config
|
||||||
|
SASLGSSAPIServiceName string `toml:"sasl_gssapi_service_name"`
|
||||||
|
SASLGSSAPIAuthType string `toml:"sasl_gssapi_auth_type"`
|
||||||
|
SASLGSSAPIDisablePAFXFAST bool `toml:"sasl_gssapi_disable_pafxfast"`
|
||||||
|
SASLGSSAPIKerberosConfigPath string `toml:"sasl_gssapi_kerberos_config_path"`
|
||||||
|
SASLGSSAPIKeyTabPath string `toml:"sasl_gssapi_key_tab_path"`
|
||||||
|
SASLGSSAPIRealm string `toml:"sasl_gssapi_realm"`
|
||||||
|
|
||||||
|
// OAUTHBEARER config. experimental. undoubtedly this is not good enough.
|
||||||
|
SASLAccessToken string `toml:"sasl_access_token"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetSASLConfig configures SASL for kafka (sarama)
|
||||||
|
func (k *SASLAuth) SetSASLConfig(config *sarama.Config) error {
|
||||||
|
config.Net.SASL.User = k.SASLUsername
|
||||||
|
config.Net.SASL.Password = k.SASLPassword
|
||||||
|
|
||||||
|
if k.SASLMechanism != "" {
|
||||||
|
config.Net.SASL.Mechanism = sarama.SASLMechanism(k.SASLMechanism)
|
||||||
|
switch config.Net.SASL.Mechanism {
|
||||||
|
case sarama.SASLTypeSCRAMSHA256:
|
||||||
|
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||||
|
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
|
||||||
|
}
|
||||||
|
case sarama.SASLTypeSCRAMSHA512:
|
||||||
|
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||||
|
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
|
||||||
|
}
|
||||||
|
case sarama.SASLTypeOAuth:
|
||||||
|
config.Net.SASL.TokenProvider = k // use self as token provider.
|
||||||
|
case sarama.SASLTypeGSSAPI:
|
||||||
|
config.Net.SASL.GSSAPI.ServiceName = k.SASLGSSAPIServiceName
|
||||||
|
config.Net.SASL.GSSAPI.AuthType = gssapiAuthType(k.SASLGSSAPIAuthType)
|
||||||
|
config.Net.SASL.GSSAPI.Username = k.SASLUsername
|
||||||
|
config.Net.SASL.GSSAPI.Password = k.SASLPassword
|
||||||
|
config.Net.SASL.GSSAPI.DisablePAFXFAST = k.SASLGSSAPIDisablePAFXFAST
|
||||||
|
config.Net.SASL.GSSAPI.KerberosConfigPath = k.SASLGSSAPIKerberosConfigPath
|
||||||
|
config.Net.SASL.GSSAPI.KeyTabPath = k.SASLGSSAPIKeyTabPath
|
||||||
|
config.Net.SASL.GSSAPI.Realm = k.SASLGSSAPIRealm
|
||||||
|
|
||||||
|
case sarama.SASLTypePlaintext:
|
||||||
|
// nothing.
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if k.SASLUsername != "" || k.SASLMechanism != "" {
|
||||||
|
config.Net.SASL.Enable = true
|
||||||
|
|
||||||
|
version, err := SASLVersion(config.Version, k.SASLVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
config.Net.SASL.Version = version
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Token does nothing smart, it just grabs a hard-coded token from config.
|
||||||
|
func (k *SASLAuth) Token() (*sarama.AccessToken, error) {
|
||||||
|
return &sarama.AccessToken{
|
||||||
|
Token: k.SASLAccessToken,
|
||||||
|
Extensions: map[string]string{},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, error) {
|
func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, error) {
|
||||||
if saslVersion == nil {
|
if saslVersion == nil {
|
||||||
if kafkaVersion.IsAtLeast(sarama.V1_0_0_0) {
|
if kafkaVersion.IsAtLeast(sarama.V1_0_0_0) {
|
||||||
|
|
@ -23,3 +95,15 @@ func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, err
|
||||||
return 0, errors.New("invalid SASL version")
|
return 0, errors.New("invalid SASL version")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func gssapiAuthType(authType string) int {
|
||||||
|
switch authType {
|
||||||
|
case "KRB5_USER_AUTH":
|
||||||
|
return sarama.KRB5_USER_AUTH
|
||||||
|
case "KRB5_KEYTAB_AUTH":
|
||||||
|
return sarama.KRB5_KEYTAB_AUTH
|
||||||
|
default:
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,36 @@
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"crypto/sha512"
|
||||||
|
"hash"
|
||||||
|
|
||||||
|
"github.com/xdg/scram"
|
||||||
|
)
|
||||||
|
|
||||||
|
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
|
||||||
|
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
|
||||||
|
|
||||||
|
type XDGSCRAMClient struct {
|
||||||
|
*scram.Client
|
||||||
|
*scram.ClientConversation
|
||||||
|
scram.HashGeneratorFcn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
|
||||||
|
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
x.ClientConversation = x.Client.NewConversation()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
|
||||||
|
response, err = x.ClientConversation.Step(challenge)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *XDGSCRAMClient) Done() bool {
|
||||||
|
return x.ClientConversation.Done()
|
||||||
|
}
|
||||||
|
|
@ -39,6 +39,23 @@ and use the old zookeeper connection method.
|
||||||
# sasl_username = "kafka"
|
# sasl_username = "kafka"
|
||||||
# sasl_password = "secret"
|
# sasl_password = "secret"
|
||||||
|
|
||||||
|
## Optional SASL:
|
||||||
|
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
|
||||||
|
## (defaults to PLAIN)
|
||||||
|
# sasl_mechanism = ""
|
||||||
|
|
||||||
|
## used if sasl_mechanism is GSSAPI (experimental)
|
||||||
|
# sasl_gssapi_service_name = ""
|
||||||
|
# ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
|
||||||
|
# sasl_gssapi_auth_type = "KRB5_USER_AUTH"
|
||||||
|
# sasl_gssapi_kerberos_config_path = "/"
|
||||||
|
# sasl_gssapi_realm = "realm"
|
||||||
|
# sasl_gssapi_key_tab_path = ""
|
||||||
|
# sasl_gssapi_disable_pafxfast = false
|
||||||
|
|
||||||
|
## used if sasl_mechanism is OAUTHBEARER (experimental)
|
||||||
|
# sasl_access_token = ""
|
||||||
|
|
||||||
## SASL protocol version. When connecting to Azure EventHub set to 0.
|
## SASL protocol version. When connecting to Azure EventHub set to 0.
|
||||||
# sasl_version = 1
|
# sasl_version = 1
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -48,6 +48,23 @@ const sampleConfig = `
|
||||||
# sasl_username = "kafka"
|
# sasl_username = "kafka"
|
||||||
# sasl_password = "secret"
|
# sasl_password = "secret"
|
||||||
|
|
||||||
|
## Optional SASL:
|
||||||
|
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
|
||||||
|
## (defaults to PLAIN)
|
||||||
|
# sasl_mechanism = ""
|
||||||
|
|
||||||
|
## used if sasl_mechanism is GSSAPI (experimental)
|
||||||
|
# sasl_gssapi_service_name = ""
|
||||||
|
# ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
|
||||||
|
# sasl_gssapi_auth_type = "KRB5_USER_AUTH"
|
||||||
|
# sasl_gssapi_kerberos_config_path = "/"
|
||||||
|
# sasl_gssapi_realm = "realm"
|
||||||
|
# sasl_gssapi_key_tab_path = ""
|
||||||
|
# sasl_gssapi_disable_pafxfast = false
|
||||||
|
|
||||||
|
## used if sasl_mechanism is OAUTHBEARER (experimental)
|
||||||
|
# sasl_access_token = ""
|
||||||
|
|
||||||
## SASL protocol version. When connecting to Azure EventHub set to 0.
|
## SASL protocol version. When connecting to Azure EventHub set to 0.
|
||||||
# sasl_version = 1
|
# sasl_version = 1
|
||||||
|
|
||||||
|
|
@ -102,9 +119,8 @@ type KafkaConsumer struct {
|
||||||
Topics []string `toml:"topics"`
|
Topics []string `toml:"topics"`
|
||||||
TopicTag string `toml:"topic_tag"`
|
TopicTag string `toml:"topic_tag"`
|
||||||
Version string `toml:"version"`
|
Version string `toml:"version"`
|
||||||
SASLPassword string `toml:"sasl_password"`
|
|
||||||
SASLUsername string `toml:"sasl_username"`
|
kafka.SASLAuth
|
||||||
SASLVersion *int `toml:"sasl_version"`
|
|
||||||
|
|
||||||
EnableTLS *bool `toml:"enable_tls"`
|
EnableTLS *bool `toml:"enable_tls"`
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
|
|
@ -191,16 +207,8 @@ func (k *KafkaConsumer) Init() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.SASLUsername != "" && k.SASLPassword != "" {
|
if err := k.SetSASLConfig(config); err != nil {
|
||||||
config.Net.SASL.User = k.SASLUsername
|
return err
|
||||||
config.Net.SASL.Password = k.SASLPassword
|
|
||||||
config.Net.SASL.Enable = true
|
|
||||||
|
|
||||||
version, err := kafka.SASLVersion(config.Version, k.SASLVersion)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
config.Net.SASL.Version = version
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.ClientID != "" {
|
if k.ClientID != "" {
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,23 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
|
||||||
# sasl_username = "kafka"
|
# sasl_username = "kafka"
|
||||||
# sasl_password = "secret"
|
# sasl_password = "secret"
|
||||||
|
|
||||||
|
## Optional SASL:
|
||||||
|
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
|
||||||
|
## (defaults to PLAIN)
|
||||||
|
# sasl_mechanism = ""
|
||||||
|
|
||||||
|
## used if sasl_mechanism is GSSAPI (experimental)
|
||||||
|
# sasl_gssapi_service_name = ""
|
||||||
|
# ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
|
||||||
|
# sasl_gssapi_auth_type = "KRB5_USER_AUTH"
|
||||||
|
# sasl_gssapi_kerberos_config_path = "/"
|
||||||
|
# sasl_gssapi_realm = "realm"
|
||||||
|
# sasl_gssapi_key_tab_path = ""
|
||||||
|
# sasl_gssapi_disable_pafxfast = false
|
||||||
|
|
||||||
|
## used if sasl_mechanism is OAUTHBEARER (experimental)
|
||||||
|
# sasl_access_token = ""
|
||||||
|
|
||||||
## SASL protocol version. When connecting to Azure EventHub set to 0.
|
## SASL protocol version. When connecting to Azure EventHub set to 0.
|
||||||
# sasl_version = 1
|
# sasl_version = 1
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,53 +24,50 @@ var ValidTopicSuffixMethods = []string{
|
||||||
|
|
||||||
var zeroTime = time.Unix(0, 0)
|
var zeroTime = time.Unix(0, 0)
|
||||||
|
|
||||||
type (
|
type Kafka struct {
|
||||||
Kafka struct {
|
Brokers []string `toml:"brokers"`
|
||||||
Brokers []string `toml:"brokers"`
|
Topic string `toml:"topic"`
|
||||||
Topic string `toml:"topic"`
|
TopicTag string `toml:"topic_tag"`
|
||||||
TopicTag string `toml:"topic_tag"`
|
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
|
||||||
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
|
ClientID string `toml:"client_id"`
|
||||||
ClientID string `toml:"client_id"`
|
TopicSuffix TopicSuffix `toml:"topic_suffix"`
|
||||||
TopicSuffix TopicSuffix `toml:"topic_suffix"`
|
RoutingTag string `toml:"routing_tag"`
|
||||||
RoutingTag string `toml:"routing_tag"`
|
RoutingKey string `toml:"routing_key"`
|
||||||
RoutingKey string `toml:"routing_key"`
|
CompressionCodec int `toml:"compression_codec"`
|
||||||
CompressionCodec int `toml:"compression_codec"`
|
RequiredAcks int `toml:"required_acks"`
|
||||||
RequiredAcks int `toml:"required_acks"`
|
MaxRetry int `toml:"max_retry"`
|
||||||
MaxRetry int `toml:"max_retry"`
|
MaxMessageBytes int `toml:"max_message_bytes"`
|
||||||
MaxMessageBytes int `toml:"max_message_bytes"`
|
|
||||||
|
|
||||||
Version string `toml:"version"`
|
Version string `toml:"version"`
|
||||||
|
|
||||||
// Legacy TLS config options
|
// Legacy TLS config options
|
||||||
// TLS client certificate
|
// TLS client certificate
|
||||||
Certificate string
|
Certificate string
|
||||||
// TLS client key
|
// TLS client key
|
||||||
Key string
|
Key string
|
||||||
// TLS certificate authority
|
// TLS certificate authority
|
||||||
CA string
|
CA string
|
||||||
|
|
||||||
EnableTLS *bool `toml:"enable_tls"`
|
EnableTLS *bool `toml:"enable_tls"`
|
||||||
tlsint.ClientConfig
|
tlsint.ClientConfig
|
||||||
|
|
||||||
SASLUsername string `toml:"sasl_username"`
|
kafka.SASLAuth
|
||||||
SASLPassword string `toml:"sasl_password"`
|
|
||||||
SASLVersion *int `toml:"sasl_version"`
|
|
||||||
|
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
tlsConfig tls.Config
|
tlsConfig tls.Config
|
||||||
|
|
||||||
producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error)
|
producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error)
|
||||||
producer sarama.SyncProducer
|
producer sarama.SyncProducer
|
||||||
|
|
||||||
serializer serializers.Serializer
|
serializer serializers.Serializer
|
||||||
}
|
}
|
||||||
TopicSuffix struct {
|
|
||||||
Method string `toml:"method"`
|
type TopicSuffix struct {
|
||||||
Keys []string `toml:"keys"`
|
Method string `toml:"method"`
|
||||||
Separator string `toml:"separator"`
|
Keys []string `toml:"keys"`
|
||||||
}
|
Separator string `toml:"separator"`
|
||||||
)
|
}
|
||||||
|
|
||||||
// DebugLogger logs messages from sarama at the debug level.
|
// DebugLogger logs messages from sarama at the debug level.
|
||||||
type DebugLogger struct {
|
type DebugLogger struct {
|
||||||
|
|
@ -205,6 +202,23 @@ var sampleConfig = `
|
||||||
# sasl_username = "kafka"
|
# sasl_username = "kafka"
|
||||||
# sasl_password = "secret"
|
# sasl_password = "secret"
|
||||||
|
|
||||||
|
## Optional SASL:
|
||||||
|
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
|
||||||
|
## (defaults to PLAIN)
|
||||||
|
# sasl_mechanism = ""
|
||||||
|
|
||||||
|
## used if sasl_mechanism is GSSAPI (experimental)
|
||||||
|
# sasl_gssapi_service_name = ""
|
||||||
|
# ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
|
||||||
|
# sasl_gssapi_auth_type = "KRB5_USER_AUTH"
|
||||||
|
# sasl_gssapi_kerberos_config_path = "/"
|
||||||
|
# sasl_gssapi_realm = "realm"
|
||||||
|
# sasl_gssapi_key_tab_path = ""
|
||||||
|
# sasl_gssapi_disable_pafxfast = false
|
||||||
|
|
||||||
|
## used if sasl_mechanism is OAUTHBEARER (experimental)
|
||||||
|
# sasl_access_token = ""
|
||||||
|
|
||||||
## SASL protocol version. When connecting to Azure EventHub set to 0.
|
## SASL protocol version. When connecting to Azure EventHub set to 0.
|
||||||
# sasl_version = 1
|
# sasl_version = 1
|
||||||
|
|
||||||
|
|
@ -321,16 +335,8 @@ func (k *Kafka) Connect() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.SASLUsername != "" && k.SASLPassword != "" {
|
if err := k.SetSASLConfig(config); err != nil {
|
||||||
config.Net.SASL.User = k.SASLUsername
|
return err
|
||||||
config.Net.SASL.Password = k.SASLPassword
|
|
||||||
config.Net.SASL.Enable = true
|
|
||||||
|
|
||||||
version, err := kafka.SASLVersion(config.Version, k.SASLVersion)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
config.Net.SASL.Version = version
|
|
||||||
}
|
}
|
||||||
|
|
||||||
producer, err := k.producerFunc(k.Brokers, config)
|
producer, err := k.producerFunc(k.Brokers, config)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue