optimize code of rabbitmq connection

This commit is contained in:
douxu 2026-02-06 17:55:30 +08:00
parent fff5dd9218
commit 722e8a9c0f
7 changed files with 165 additions and 43 deletions

3
.gitignore vendored
View File

@ -27,4 +27,5 @@ go.work
# Shield all log files in the log folder
/log/
# Shield config files in the configs folder
/configs/**/*.yaml
/configs/**/*.yaml
/configs/**/*.pem

View File

@ -30,12 +30,17 @@ type MongoDBConfig struct {
Timeout int `mapstructure:"timeout"`
}
// RabbitMQConfig define config struct of RabbitMQ config
type RabbitMQConfig struct {
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
CACertPath string `mapstructure:"ca_cert_path"`
ClientKeyPath string `mapstructure:"client_key_path"`
ClientKeyPassword string `mapstructure:"client_key_password"`
ClientCertPath string `mapstructure:"client_cert_path"`
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
ServerName string `mapstructure:"server_name"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
}
// ServiceConfig define config struct of service config

View File

@ -7,6 +7,7 @@ data:
# 确保允许PLAIN认证
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN
auth_mechanisms.3 = EXTERNAL
# 允许admin用户通过远程方式连接
loopback_users.admin = false
# 默认心跳和监听配置可在此扩展

View File

@ -1,4 +1,3 @@
# 创建一个名为 rabbitmq-certs 的 Secret包含文件夹下的所有证书文件
kubectl create secret generic rabbitmq-certs \
--from-file=ca_certificate.pem=./certs/ca_certificate.pem \
--from-file=server_certificate.pem=./certs/server_certificate.pem \

2
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/rabbitmq/amqp091-go v1.10.0
github.com/spf13/viper v1.21.0
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
go.mongodb.org/mongo-driver v1.17.7
go.uber.org/zap v1.27.1
)
@ -48,7 +49,6 @@ require (
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.uber.org/mock v0.5.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect

View File

@ -93,11 +93,15 @@ func main() {
go func() {
<-notifyCtx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx := context.Background()
logger.Info(ctx, "shutdown signal received, cleaning up...")
shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := server.Shutdown(shutdownCtx); err != nil {
logger.Error(shutdownCtx, "server shutdown failed", "err", err)
}
mq.CloseRabbitProxy()
logger.Info(ctx, "resources cleaned up, exiting")
}()
logger.Info(notifyCtx, "starting EventRT server")

View File

@ -3,8 +3,12 @@ package mq
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"net/url"
"os"
"sync"
"time"
@ -12,6 +16,7 @@ import (
"eventRT/logger"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/youmark/pkcs8"
)
var (
@ -21,34 +26,55 @@ var (
// RabbitMQProxy define stuct of rabbitMQ connection proxy
type RabbitMQProxy struct {
Conn *amqp.Connection
mu sync.Mutex
tlsConf *tls.Config
conn *amqp.Connection
cancel context.CancelFunc
mu sync.Mutex
}
// rabbitMQCertConf define stuct of rabbitMQ connection certificates config
type rabbitMQCertConf struct {
serverName string
insecureSkipVerify bool
clientCert tls.Certificate
caCertPool *x509.CertPool
}
// GetConn define func to return the rabbitMQ connection
func GetConn() *amqp.Connection {
_globalRabbitMQProxy.mu.Lock()
defer _globalRabbitMQProxy.mu.Unlock()
return _globalRabbitMQProxy.Conn
return _globalRabbitMQProxy.conn
}
// InitRabbitProxy return instance of rabbitMQ connection
func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy {
amqpURI := generateRabbitMQURI(rCfg)
tlsConf, err := initCertConf(rCfg)
if err != nil {
logger.Error(ctx, "init rabbitMQ cert config failed", "error", err)
panic(err)
}
rabbitMQOnce.Do(func() {
conn := initRabbitMQ(ctx, amqpURI)
_globalRabbitMQProxy = &RabbitMQProxy{Conn: conn}
go _globalRabbitMQProxy.handleReconnect(ctx, amqpURI)
cancelCtx, cancel := context.WithCancel(ctx)
conn := initRabbitMQ(ctx, amqpURI, tlsConf)
_globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel}
go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI)
})
return _globalRabbitMQProxy
}
// initRabbitMQ return instance of rabbitMQ connection
func initRabbitMQ(ctx context.Context, rabbitMQURI string) *amqp.Connection {
logger.Info(ctx, fmt.Sprintf("connecting to rabbitMQ server at: %s", rabbitMQURI))
conn, err := amqp.Dial(rabbitMQURI)
func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection {
logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI)
conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
TLSClientConfig: tlsConf,
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
Heartbeat: 10 * time.Second,
})
if err != nil {
panic(fmt.Errorf("failed to connect to rabbitMQ: %w", err))
logger.Error(ctx, "init rabbitMQ connection failed", "error", err)
panic(err)
}
return conn
@ -59,31 +85,65 @@ func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string)
closeChan := make(chan *amqp.Error)
GetConn().NotifyClose(closeChan)
err, ok := <-closeChan
if !ok {
logger.Info(ctx, "rabbitMQ notify channel closed, stopping solicitor")
break
}
if err != nil {
logger.Warn(ctx, "rabbitMQ connection closed by error", "reason", err)
} else {
logger.Info(ctx, "rabbitMQ connection closed normally (nil err)")
}
for {
time.Sleep(5 * time.Second)
newConn, err := amqp.Dial(rabbitMQURI)
if err == nil {
p.mu.Lock()
p.Conn = newConn
p.mu.Unlock()
logger.Info(ctx, "rabbitMQ reconnected successfully")
break
select {
case <-ctx.Done():
logger.Info(ctx, "context cancelled, exiting handleReconnect")
return
case err, ok := <-closeChan:
if !ok {
logger.Info(ctx, "rabbitMQ notify channel closed")
return
}
logger.Error(ctx, "rabbitMQ reconnect failed", "err", err)
if err == nil {
logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect")
return
}
logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err)
}
if !p.reconnect(ctx, rabbitMQURI) {
return
}
}
}
func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool {
for {
logger.Info(ctx, "attempting to reconnect to rabbitMQ...")
select {
case <-ctx.Done():
return false
case <-time.After(5 * time.Second):
}
newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
TLSClientConfig: p.tlsConf,
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
Heartbeat: 10 * time.Second,
})
if err == nil {
p.mu.Lock()
p.conn = newConn
p.mu.Unlock()
logger.Info(ctx, "rabbitMQ reconnected successfully")
return true
}
logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err)
}
}
// CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine
func CloseRabbitProxy() {
if _globalRabbitMQProxy != nil {
_globalRabbitMQProxy.cancel()
_globalRabbitMQProxy.mu.Lock()
if _globalRabbitMQProxy.conn != nil {
_globalRabbitMQProxy.conn.Close()
}
_globalRabbitMQProxy.mu.Unlock()
}
}
@ -91,7 +151,7 @@ func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
user := url.QueryEscape(rCfg.User)
password := url.QueryEscape(rCfg.Password)
amqpURI := fmt.Sprintf("amqp://%s:%s@%s:%d/",
amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/",
user,
password,
rCfg.Host,
@ -99,3 +159,55 @@ func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
)
return amqpURI
}
func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) {
tlsConf := &tls.Config{
InsecureSkipVerify: rCfg.InsecureSkipVerify,
ServerName: rCfg.ServerName,
}
caCert, err := os.ReadFile(rCfg.CACertPath)
if err != nil {
return nil, fmt.Errorf("read server ca file failed: %w", err)
}
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath)
}
tlsConf.RootCAs = caCertPool
certPEM, err := os.ReadFile(rCfg.ClientCertPath)
if err != nil {
return nil, fmt.Errorf("read client cert file failed: %w", err)
}
keyData, err := os.ReadFile(rCfg.ClientKeyPath)
if err != nil {
return nil, fmt.Errorf("read private key file failed: %w", err)
}
block, _ := pem.Decode(keyData)
if block == nil {
return nil, fmt.Errorf("failed to decode PEM block from private key")
}
der, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword))
if err != nil {
return nil, fmt.Errorf("parse password-protected private key failed: %w", err)
}
privBytes, err := x509.MarshalPKCS8PrivateKey(der)
if err != nil {
return nil, fmt.Errorf("marshal private key failed: %w", err)
}
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
clientCert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
return nil, fmt.Errorf("create x509 key pair failed: %w", err)
}
tlsConf.Certificates = []tls.Certificate{clientCert}
return tlsConf, nil
}