From 722e8a9c0f547ab8129ed09dc10fe55cbf0fec51 Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 6 Feb 2026 17:55:30 +0800 Subject: [PATCH] optimize code of rabbitmq connection --- .gitignore | 3 +- config/config.go | 15 ++- deploy/rabbitmq-config.yaml | 1 + deploy/secert.sh | 1 - go.mod | 2 +- main.go | 6 +- mq/mq_init.go | 180 +++++++++++++++++++++++++++++------- 7 files changed, 165 insertions(+), 43 deletions(-) diff --git a/.gitignore b/.gitignore index 5ad5562..531a22a 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,5 @@ go.work # Shield all log files in the log folder /log/ # Shield config files in the configs folder -/configs/**/*.yaml \ No newline at end of file +/configs/**/*.yaml +/configs/**/*.pem \ No newline at end of file diff --git a/config/config.go b/config/config.go index e5fb6ef..c94c3b7 100644 --- a/config/config.go +++ b/config/config.go @@ -30,12 +30,17 @@ type MongoDBConfig struct { Timeout int `mapstructure:"timeout"` } -// RabbitMQConfig define config struct of RabbitMQ config type RabbitMQConfig struct { - User string `mapstructure:"user"` - Password string `mapstructure:"password"` - Host string `mapstructure:"host"` - Port int `mapstructure:"port"` + CACertPath string `mapstructure:"ca_cert_path"` + ClientKeyPath string `mapstructure:"client_key_path"` + ClientKeyPassword string `mapstructure:"client_key_password"` + ClientCertPath string `mapstructure:"client_cert_path"` + InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"` + ServerName string `mapstructure:"server_name"` + User string `mapstructure:"user"` + Password string `mapstructure:"password"` + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` } // ServiceConfig define config struct of service config diff --git a/deploy/rabbitmq-config.yaml b/deploy/rabbitmq-config.yaml index 5e27eb6..4178401 100644 --- a/deploy/rabbitmq-config.yaml +++ b/deploy/rabbitmq-config.yaml @@ -7,6 +7,7 @@ data: # 确保允许PLAIN认证 auth_mechanisms.1 = PLAIN auth_mechanisms.2 = AMQPLAIN + auth_mechanisms.3 = EXTERNAL # 允许admin用户通过远程方式连接 loopback_users.admin = false # 默认心跳和监听配置可在此扩展 diff --git a/deploy/secert.sh b/deploy/secert.sh index f0c050e..40cf1e3 100644 --- a/deploy/secert.sh +++ b/deploy/secert.sh @@ -1,4 +1,3 @@ -# 创建一个名为 rabbitmq-certs 的 Secret,包含文件夹下的所有证书文件 kubectl create secret generic rabbitmq-certs \ --from-file=ca_certificate.pem=./certs/ca_certificate.pem \ --from-file=server_certificate.pem=./certs/server_certificate.pem \ diff --git a/go.mod b/go.mod index 29b4940..3050ae4 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/natefinch/lumberjack v2.0.0+incompatible github.com/rabbitmq/amqp091-go v1.10.0 github.com/spf13/viper v1.21.0 + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 go.mongodb.org/mongo-driver v1.17.7 go.uber.org/zap v1.27.1 ) @@ -48,7 +49,6 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect - github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.10.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect diff --git a/main.go b/main.go index 6e9114f..7833fbd 100644 --- a/main.go +++ b/main.go @@ -93,11 +93,15 @@ func main() { go func() { <-notifyCtx.Done() - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx := context.Background() + logger.Info(ctx, "shutdown signal received, cleaning up...") + shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := server.Shutdown(shutdownCtx); err != nil { logger.Error(shutdownCtx, "server shutdown failed", "err", err) } + mq.CloseRabbitProxy() + logger.Info(ctx, "resources cleaned up, exiting") }() logger.Info(notifyCtx, "starting EventRT server") diff --git a/mq/mq_init.go b/mq/mq_init.go index 56ad1bb..aa8ad7b 100644 --- a/mq/mq_init.go +++ b/mq/mq_init.go @@ -3,8 +3,12 @@ package mq import ( "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" "fmt" "net/url" + "os" "sync" "time" @@ -12,6 +16,7 @@ import ( "eventRT/logger" amqp "github.com/rabbitmq/amqp091-go" + "github.com/youmark/pkcs8" ) var ( @@ -21,34 +26,55 @@ var ( // RabbitMQProxy define stuct of rabbitMQ connection proxy type RabbitMQProxy struct { - Conn *amqp.Connection - mu sync.Mutex + tlsConf *tls.Config + conn *amqp.Connection + cancel context.CancelFunc + mu sync.Mutex +} + +// rabbitMQCertConf define stuct of rabbitMQ connection certificates config +type rabbitMQCertConf struct { + serverName string + insecureSkipVerify bool + clientCert tls.Certificate + caCertPool *x509.CertPool } // GetConn define func to return the rabbitMQ connection func GetConn() *amqp.Connection { _globalRabbitMQProxy.mu.Lock() defer _globalRabbitMQProxy.mu.Unlock() - return _globalRabbitMQProxy.Conn + return _globalRabbitMQProxy.conn } // InitRabbitProxy return instance of rabbitMQ connection func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy { amqpURI := generateRabbitMQURI(rCfg) + tlsConf, err := initCertConf(rCfg) + if err != nil { + logger.Error(ctx, "init rabbitMQ cert config failed", "error", err) + panic(err) + } rabbitMQOnce.Do(func() { - conn := initRabbitMQ(ctx, amqpURI) - _globalRabbitMQProxy = &RabbitMQProxy{Conn: conn} - go _globalRabbitMQProxy.handleReconnect(ctx, amqpURI) + cancelCtx, cancel := context.WithCancel(ctx) + conn := initRabbitMQ(ctx, amqpURI, tlsConf) + _globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel} + go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI) }) return _globalRabbitMQProxy } // initRabbitMQ return instance of rabbitMQ connection -func initRabbitMQ(ctx context.Context, rabbitMQURI string) *amqp.Connection { - logger.Info(ctx, fmt.Sprintf("connecting to rabbitMQ server at: %s", rabbitMQURI)) - conn, err := amqp.Dial(rabbitMQURI) +func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection { + logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI) + conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{ + TLSClientConfig: tlsConf, + SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, + Heartbeat: 10 * time.Second, + }) if err != nil { - panic(fmt.Errorf("failed to connect to rabbitMQ: %w", err)) + logger.Error(ctx, "init rabbitMQ connection failed", "error", err) + panic(err) } return conn @@ -59,31 +85,65 @@ func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) closeChan := make(chan *amqp.Error) GetConn().NotifyClose(closeChan) - err, ok := <-closeChan - - if !ok { - logger.Info(ctx, "rabbitMQ notify channel closed, stopping solicitor") - break - } - - if err != nil { - logger.Warn(ctx, "rabbitMQ connection closed by error", "reason", err) - } else { - logger.Info(ctx, "rabbitMQ connection closed normally (nil err)") - } - - for { - time.Sleep(5 * time.Second) - newConn, err := amqp.Dial(rabbitMQURI) - if err == nil { - p.mu.Lock() - p.Conn = newConn - p.mu.Unlock() - logger.Info(ctx, "rabbitMQ reconnected successfully") - break + select { + case <-ctx.Done(): + logger.Info(ctx, "context cancelled, exiting handleReconnect") + return + case err, ok := <-closeChan: + if !ok { + logger.Info(ctx, "rabbitMQ notify channel closed") + return } - logger.Error(ctx, "rabbitMQ reconnect failed", "err", err) + + if err == nil { + logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect") + return + } + + logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err) } + + if !p.reconnect(ctx, rabbitMQURI) { + return + } + } +} + +func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool { + for { + logger.Info(ctx, "attempting to reconnect to rabbitMQ...") + select { + case <-ctx.Done(): + return false + case <-time.After(5 * time.Second): + + } + + newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{ + TLSClientConfig: p.tlsConf, + SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, + Heartbeat: 10 * time.Second, + }) + if err == nil { + p.mu.Lock() + p.conn = newConn + p.mu.Unlock() + logger.Info(ctx, "rabbitMQ reconnected successfully") + return true + } + logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err) + } +} + +// CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine +func CloseRabbitProxy() { + if _globalRabbitMQProxy != nil { + _globalRabbitMQProxy.cancel() + _globalRabbitMQProxy.mu.Lock() + if _globalRabbitMQProxy.conn != nil { + _globalRabbitMQProxy.conn.Close() + } + _globalRabbitMQProxy.mu.Unlock() } } @@ -91,7 +151,7 @@ func generateRabbitMQURI(rCfg config.RabbitMQConfig) string { user := url.QueryEscape(rCfg.User) password := url.QueryEscape(rCfg.Password) - amqpURI := fmt.Sprintf("amqp://%s:%s@%s:%d/", + amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/", user, password, rCfg.Host, @@ -99,3 +159,55 @@ func generateRabbitMQURI(rCfg config.RabbitMQConfig) string { ) return amqpURI } + +func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) { + tlsConf := &tls.Config{ + InsecureSkipVerify: rCfg.InsecureSkipVerify, + ServerName: rCfg.ServerName, + } + + caCert, err := os.ReadFile(rCfg.CACertPath) + if err != nil { + return nil, fmt.Errorf("read server ca file failed: %w", err) + } + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath) + } + tlsConf.RootCAs = caCertPool + + certPEM, err := os.ReadFile(rCfg.ClientCertPath) + if err != nil { + return nil, fmt.Errorf("read client cert file failed: %w", err) + } + + keyData, err := os.ReadFile(rCfg.ClientKeyPath) + if err != nil { + return nil, fmt.Errorf("read private key file failed: %w", err) + } + + block, _ := pem.Decode(keyData) + if block == nil { + return nil, fmt.Errorf("failed to decode PEM block from private key") + } + + der, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword)) + if err != nil { + return nil, fmt.Errorf("parse password-protected private key failed: %w", err) + } + + privBytes, err := x509.MarshalPKCS8PrivateKey(der) + if err != nil { + return nil, fmt.Errorf("marshal private key failed: %w", err) + } + + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}) + + clientCert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + return nil, fmt.Errorf("create x509 key pair failed: %w", err) + } + + tlsConf.Certificates = []tls.Certificate{clientCert} + return tlsConf, nil +}