feat(common.kafka): Add AWS-MSK-IAM SASL authentication (#16524)

This commit is contained in:
Sven Rebhan 2025-04-23 16:34:16 +02:00 committed by GitHub
parent b715237606
commit 403199fb46
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 217 additions and 71 deletions

View File

@ -68,6 +68,7 @@ following works:
- github.com/armon/go-metrics [MIT License](https://github.com/armon/go-metrics/blob/master/LICENSE)
- github.com/awnumar/memcall [Apache License 2.0](https://github.com/awnumar/memcall/blob/master/LICENSE)
- github.com/awnumar/memguard [Apache License 2.0](https://github.com/awnumar/memguard/blob/master/LICENSE)
- github.com/aws/aws-msk-iam-sasl-signer-go [Apache License 2.0](https://github.com/aws/aws-msk-iam-sasl-signer-go/blob/main/LICENSE)
- github.com/aws/aws-sdk-go-v2 [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/aws/protocol/eventstream/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/config [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/config/LICENSE.txt)

1
go.mod
View File

@ -48,6 +48,7 @@ require (
github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/awnumar/memguard v0.22.5
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1
github.com/aws/aws-sdk-go-v2 v1.36.3
github.com/aws/aws-sdk-go-v2/config v1.29.14
github.com/aws/aws-sdk-go-v2/credentials v1.17.67

2
go.sum
View File

@ -876,6 +876,8 @@ github.com/awnumar/memcall v0.3.0 h1:8b/3Sptrtgejj2kLgL6M5F2r4OzTf19CTllO+gIXUg8
github.com/awnumar/memcall v0.3.0/go.mod h1:8xOx1YbfyuCg3Fy6TO8DK0kZUua3V42/goA5Ru47E8w=
github.com/awnumar/memguard v0.22.5 h1:PH7sbUVERS5DdXh3+mLo8FDcl1eIeVjJVYMnyuYpvuI=
github.com/awnumar/memguard v0.22.5/go.mod h1:+APmZGThMBWjnMlKiSM1X7MVpbIVewen2MTkqWkA/zE=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1 h1:nMp7diZObd4XEVUR0pEvn7/E13JIgManMX79Q6quV6E=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI=
github.com/aws/aws-sdk-go v1.29.11/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg=
github.com/aws/aws-sdk-go v1.44.263/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=

View File

@ -24,8 +24,11 @@ type SASLAuth struct {
SASLGSSAPIKeyTabPath string `toml:"sasl_gssapi_key_tab_path"`
SASLGSSAPIRealm string `toml:"sasl_gssapi_realm"`
// OAUTHBEARER config
// OAUTHBEARER token based config
SASLAccessToken config.Secret `toml:"sasl_access_token"`
// OAUTHBEARER AWS MSK IAM based config
SASLOAuthAWSMSKIAMConfig
}
// SetSASLConfig configures SASL for kafka (sarama)
@ -43,34 +46,44 @@ func (k *SASLAuth) SetSASLConfig(cfg *sarama.Config) error {
cfg.Net.SASL.Password = password.String()
defer password.Destroy()
if k.SASLMechanism != "" {
cfg.Net.SASL.Mechanism = sarama.SASLMechanism(k.SASLMechanism)
switch cfg.Net.SASL.Mechanism {
case sarama.SASLTypeSCRAMSHA256:
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case sarama.SASLTypeSCRAMSHA512:
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
case sarama.SASLTypeOAuth:
cfg.Net.SASL.TokenProvider = k // use self as token provider.
case sarama.SASLTypeGSSAPI:
cfg.Net.SASL.GSSAPI.ServiceName = k.SASLGSSAPIServiceName
cfg.Net.SASL.GSSAPI.AuthType = gssapiAuthType(k.SASLGSSAPIAuthType)
cfg.Net.SASL.GSSAPI.Username = username.String()
cfg.Net.SASL.GSSAPI.Password = password.String()
cfg.Net.SASL.GSSAPI.DisablePAFXFAST = k.SASLGSSAPIDisablePAFXFAST
cfg.Net.SASL.GSSAPI.KerberosConfigPath = k.SASLGSSAPIKerberosConfigPath
cfg.Net.SASL.GSSAPI.KeyTabPath = k.SASLGSSAPIKeyTabPath
cfg.Net.SASL.GSSAPI.Realm = k.SASLGSSAPIRealm
mechanism := k.SASLMechanism
case sarama.SASLTypePlaintext:
// nothing.
default:
switch k.SASLMechanism {
case sarama.SASLTypeSCRAMSHA256:
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case sarama.SASLTypeSCRAMSHA512:
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
case sarama.SASLTypeGSSAPI:
cfg.Net.SASL.GSSAPI.ServiceName = k.SASLGSSAPIServiceName
cfg.Net.SASL.GSSAPI.AuthType = gssapiAuthType(k.SASLGSSAPIAuthType)
cfg.Net.SASL.GSSAPI.Username = username.String()
cfg.Net.SASL.GSSAPI.Password = password.String()
cfg.Net.SASL.GSSAPI.DisablePAFXFAST = k.SASLGSSAPIDisablePAFXFAST
cfg.Net.SASL.GSSAPI.KerberosConfigPath = k.SASLGSSAPIKerberosConfigPath
cfg.Net.SASL.GSSAPI.KeyTabPath = k.SASLGSSAPIKeyTabPath
cfg.Net.SASL.GSSAPI.Realm = k.SASLGSSAPIRealm
case sarama.SASLTypeOAuth: // OAUTHBEARER secret based auth
cfg.Net.SASL.TokenProvider = &oauthToken{
token: k.SASLAccessToken,
extensions: k.SASLExtensions,
}
case saslTypeOAuthAWSMSKIAM: // AWS-MSK-IAM based auth
p, err := k.SASLOAuthAWSMSKIAMConfig.tokenProvider(k.SASLExtensions)
if err != nil {
return fmt.Errorf("creating AWS MSK IAM token provider failed: %w", err)
}
mechanism = sarama.SASLTypeOAuth
cfg.Net.SASL.TokenProvider = p
case sarama.SASLTypePlaintext:
// nothing.
case "":
// no SASL
}
cfg.Net.SASL.Mechanism = sarama.SASLMechanism(mechanism)
if !k.SASLUsername.Empty() || k.SASLMechanism != "" {
cfg.Net.SASL.Enable = true
@ -84,19 +97,6 @@ func (k *SASLAuth) SetSASLConfig(cfg *sarama.Config) error {
return nil
}
// Token does nothing smart, it just grabs a hard-coded token from config.
func (k *SASLAuth) Token() (*sarama.AccessToken, error) {
token, err := k.SASLAccessToken.Get()
if err != nil {
return nil, fmt.Errorf("getting token failed: %w", err)
}
defer token.Destroy()
return &sarama.AccessToken{
Token: token.String(),
Extensions: k.SASLExtensions,
}, nil
}
func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, error) {
if saslVersion == nil {
if kafkaVersion.IsAtLeast(sarama.V1_0_0_0) {

View File

@ -0,0 +1,79 @@
package kafka
import (
"context"
"errors"
"fmt"
"github.com/IBM/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
)
const saslTypeOAuthAWSMSKIAM = "AWS-MSK-IAM"
type SASLOAuthAWSMSKIAMConfig struct {
SASLAWSRegion string `toml:"sasl_aws_msk_iam_region"`
SASLAWSProfile string `toml:"sasl_aws_msk_iam_profile"`
SASLAWSRole string `toml:"sasl_aws_msk_iam_role"`
SASLAWSSession string `toml:"sasl_aws_msk_iam_session"`
}
func (c *SASLOAuthAWSMSKIAMConfig) tokenProvider(extensions map[string]string) (sarama.AccessTokenProvider, error) {
if c.SASLAWSRegion == "" {
return nil, errors.New("region cannot be empty")
}
if c.SASLAWSProfile != "" && (c.SASLAWSRole != "" || c.SASLAWSSession != "") {
return nil, errors.New("cannot mix profile based and role based authentication")
}
if c.SASLAWSProfile == "" && (c.SASLAWSRole == "" || c.SASLAWSSession == "") {
return nil, errors.New("both role and session must be set for role based authentication")
}
if c.SASLAWSProfile != "" {
return &oauthAWSMSKIAM{
generator: func(ctx context.Context) (string, error) {
t, _, err := signer.GenerateAuthTokenFromProfile(ctx, c.SASLAWSRegion, c.SASLAWSProfile)
return t, err
},
extensions: extensions,
}, nil
}
// Generate using role/session
if c.SASLAWSRole != "" && c.SASLAWSSession != "" {
return &oauthAWSMSKIAM{
generator: func(ctx context.Context) (string, error) {
t, _, err := signer.GenerateAuthTokenFromRole(ctx, c.SASLAWSRegion, c.SASLAWSRole, c.SASLAWSSession)
return t, err
},
extensions: extensions,
}, nil
}
return &oauthAWSMSKIAM{
generator: func(ctx context.Context) (string, error) {
t, _, err := signer.GenerateAuthToken(ctx, c.SASLAWSRegion)
return t, err
},
extensions: extensions,
}, nil
}
type oauthAWSMSKIAM struct {
generator func(context.Context) (string, error)
extensions map[string]string
}
// Token generates a token using the provided AWS MSK IAM generator function.
func (a *oauthAWSMSKIAM) Token() (*sarama.AccessToken, error) {
token, err := a.generator(context.Background())
if err != nil {
return nil, fmt.Errorf("getting AWS MSK IAM token failed: %w", err)
}
return &sarama.AccessToken{
Token: token,
Extensions: a.extensions,
}, nil
}

View File

@ -0,0 +1,27 @@
package kafka
import (
"fmt"
"github.com/IBM/sarama"
"github.com/influxdata/telegraf/config"
)
type oauthToken struct {
token config.Secret
extensions map[string]string
}
// Token does nothing smart, it just grabs a hard-coded token from config.
func (a *oauthToken) Token() (*sarama.AccessToken, error) {
token, err := a.token.Get()
if err != nil {
return nil, fmt.Errorf("getting token failed: %w", err)
}
defer token.Destroy()
return &sarama.AccessToken{
Token: token.String(),
Extensions: a.extensions,
}, nil
}

View File

@ -67,7 +67,7 @@ to use them.
## Set the minimal supported Kafka version. Should be a string contains
## 4 digits in case if it is 0 version and 3 digits for versions starting
## from 1.0.0 separated by dot. This setting enables the use of new
## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater.
## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater.
## Please, check the list of supported versions at
## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions
## ex: kafka_version = "2.6.0"
@ -77,7 +77,7 @@ to use them.
## Topics to consume.
topics = ["telegraf"]
## Topic regular expressions to consume. Matches will be added to topics.
## Topic regular expressions to consume. Matches will be added to topics.
## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
# topic_regexps = [ ]
@ -116,14 +116,13 @@ to use them.
## Defaults to the OS configuration if not specified or zero.
# keep_alive_period = "15s"
## SASL authentication credentials. These settings should typically be used
## SASL authentication credentials. These settings should typically be used
## with TLS encryption enabled
# sasl_username = "kafka"
# sasl_password = "secret"
# sasl_username = ""
# sasl_password = ""
## Optional SASL:
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
## (defaults to PLAIN)
## Optional SASL, one of:
## OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, AWS-MSK-IAM
# sasl_mechanism = ""
## used if sasl_mechanism is GSSAPI
@ -138,7 +137,19 @@ to use them.
## used if sasl_mechanism is OAUTHBEARER
# sasl_access_token = ""
## SASL protocol version. When connecting to Azure EventHub set to 0.
## used if sasl_mechanism is AWS-MSK-IAM
# sasl_aws_msk_iam_region = ""
## for profile based auth
## sasl_aws_msk_iam_profile = ""
## for role based auth
## sasl_aws_msk_iam_role = ""
## sasl_aws_msk_iam_session = ""
## Arbitrary key value string pairs to pass as a TOML table. For example:
## {logicalCluster = "cluster-042", poolId = "pool-027"}
# sasl_extensions = {}
## SASL protocol version. When connecting to Azure EventHub set to 0.
# sasl_version = 1
# Disable Kafka metadata full fetch
@ -221,7 +232,7 @@ to use them.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
# data_format = "influx"
```
## Metrics

View File

@ -6,7 +6,7 @@
## Set the minimal supported Kafka version. Should be a string contains
## 4 digits in case if it is 0 version and 3 digits for versions starting
## from 1.0.0 separated by dot. This setting enables the use of new
## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater.
## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater.
## Please, check the list of supported versions at
## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions
## ex: kafka_version = "2.6.0"
@ -16,7 +16,7 @@
## Topics to consume.
topics = ["telegraf"]
## Topic regular expressions to consume. Matches will be added to topics.
## Topic regular expressions to consume. Matches will be added to topics.
## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
# topic_regexps = [ ]
@ -55,14 +55,13 @@
## Defaults to the OS configuration if not specified or zero.
# keep_alive_period = "15s"
## SASL authentication credentials. These settings should typically be used
## SASL authentication credentials. These settings should typically be used
## with TLS encryption enabled
# sasl_username = "kafka"
# sasl_password = "secret"
# sasl_username = ""
# sasl_password = ""
## Optional SASL:
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
## (defaults to PLAIN)
## Optional SASL, one of:
## OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, AWS-MSK-IAM
# sasl_mechanism = ""
## used if sasl_mechanism is GSSAPI
@ -77,7 +76,19 @@
## used if sasl_mechanism is OAUTHBEARER
# sasl_access_token = ""
## SASL protocol version. When connecting to Azure EventHub set to 0.
## used if sasl_mechanism is AWS-MSK-IAM
# sasl_aws_msk_iam_region = ""
## for profile based auth
## sasl_aws_msk_iam_profile = ""
## for role based auth
## sasl_aws_msk_iam_role = ""
## sasl_aws_msk_iam_session = ""
## Arbitrary key value string pairs to pass as a TOML table. For example:
## {logicalCluster = "cluster-042", poolId = "pool-027"}
# sasl_extensions = {}
## SASL protocol version. When connecting to Azure EventHub set to 0.
# sasl_version = 1
# Disable Kafka metadata full fetch
@ -160,4 +171,4 @@
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
# data_format = "influx"

View File

@ -154,12 +154,11 @@ to use them.
# socks5_password = "pass123"
## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"
# sasl_username = ""
# sasl_password = ""
## Optional SASL:
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
## (defaults to PLAIN)
## Optional SASL, one of:
## OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, AWS-MSK-IAM
# sasl_mechanism = ""
## used if sasl_mechanism is GSSAPI
@ -174,8 +173,16 @@ to use them.
## Access token used if sasl_mechanism is OAUTHBEARER
# sasl_access_token = ""
## used if sasl_mechanism is AWS-MSK-IAM
# sasl_aws_msk_iam_region = ""
## for profile based auth
## sasl_aws_msk_iam_profile = ""
## for role based auth
## sasl_aws_msk_iam_role = ""
## sasl_aws_msk_iam_session = ""
## Arbitrary key value string pairs to pass as a TOML table. For example:
# {logicalCluster = "cluster-042", poolId = "pool-027"}
## {logicalCluster = "cluster-042", poolId = "pool-027"}
# sasl_extensions = {}
## SASL protocol version. When connecting to Azure EventHub set to 0.

View File

@ -109,12 +109,11 @@
# socks5_password = "pass123"
## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"
# sasl_username = ""
# sasl_password = ""
## Optional SASL:
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
## (defaults to PLAIN)
## Optional SASL, one of:
## OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, AWS-MSK-IAM
# sasl_mechanism = ""
## used if sasl_mechanism is GSSAPI
@ -129,8 +128,16 @@
## Access token used if sasl_mechanism is OAUTHBEARER
# sasl_access_token = ""
## used if sasl_mechanism is AWS-MSK-IAM
# sasl_aws_msk_iam_region = ""
## for profile based auth
## sasl_aws_msk_iam_profile = ""
## for role based auth
## sasl_aws_msk_iam_role = ""
## sasl_aws_msk_iam_session = ""
## Arbitrary key value string pairs to pass as a TOML table. For example:
# {logicalCluster = "cluster-042", poolId = "pool-027"}
## {logicalCluster = "cluster-042", poolId = "pool-027"}
# sasl_extensions = {}
## SASL protocol version. When connecting to Azure EventHub set to 0.