80 lines
2.2 KiB
Go
80 lines
2.2 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/IBM/sarama"
|
|
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
|
|
)
|
|
|
|
const saslTypeOAuthAWSMSKIAM = "AWS-MSK-IAM"
|
|
|
|
type SASLOAuthAWSMSKIAMConfig struct {
|
|
SASLAWSRegion string `toml:"sasl_aws_msk_iam_region"`
|
|
SASLAWSProfile string `toml:"sasl_aws_msk_iam_profile"`
|
|
SASLAWSRole string `toml:"sasl_aws_msk_iam_role"`
|
|
SASLAWSSession string `toml:"sasl_aws_msk_iam_session"`
|
|
}
|
|
|
|
func (c *SASLOAuthAWSMSKIAMConfig) tokenProvider(extensions map[string]string) (sarama.AccessTokenProvider, error) {
|
|
if c.SASLAWSRegion == "" {
|
|
return nil, errors.New("region cannot be empty")
|
|
}
|
|
|
|
if c.SASLAWSProfile != "" && (c.SASLAWSRole != "" || c.SASLAWSSession != "") {
|
|
return nil, errors.New("cannot mix profile based and role based authentication")
|
|
}
|
|
|
|
if c.SASLAWSProfile == "" && (c.SASLAWSRole == "" || c.SASLAWSSession == "") {
|
|
return nil, errors.New("both role and session must be set for role based authentication")
|
|
}
|
|
|
|
if c.SASLAWSProfile != "" {
|
|
return &oauthAWSMSKIAM{
|
|
generator: func(ctx context.Context) (string, error) {
|
|
t, _, err := signer.GenerateAuthTokenFromProfile(ctx, c.SASLAWSRegion, c.SASLAWSProfile)
|
|
return t, err
|
|
},
|
|
extensions: extensions,
|
|
}, nil
|
|
}
|
|
|
|
// Generate using role/session
|
|
if c.SASLAWSRole != "" && c.SASLAWSSession != "" {
|
|
return &oauthAWSMSKIAM{
|
|
generator: func(ctx context.Context) (string, error) {
|
|
t, _, err := signer.GenerateAuthTokenFromRole(ctx, c.SASLAWSRegion, c.SASLAWSRole, c.SASLAWSSession)
|
|
return t, err
|
|
},
|
|
extensions: extensions,
|
|
}, nil
|
|
}
|
|
|
|
return &oauthAWSMSKIAM{
|
|
generator: func(ctx context.Context) (string, error) {
|
|
t, _, err := signer.GenerateAuthToken(ctx, c.SASLAWSRegion)
|
|
return t, err
|
|
},
|
|
extensions: extensions,
|
|
}, nil
|
|
}
|
|
|
|
type oauthAWSMSKIAM struct {
|
|
generator func(context.Context) (string, error)
|
|
extensions map[string]string
|
|
}
|
|
|
|
// Token generates a token using the provided AWS MSK IAM generator function.
|
|
func (a *oauthAWSMSKIAM) Token() (*sarama.AccessToken, error) {
|
|
token, err := a.generator(context.Background())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting AWS MSK IAM token failed: %w", err)
|
|
}
|
|
return &sarama.AccessToken{
|
|
Token: token,
|
|
Extensions: a.extensions,
|
|
}, nil
|
|
}
|