optimzie code of rabbitmq connection

This commit is contained in:
douxu 2026-02-06 17:45:59 +08:00
parent 581153ed8d
commit 6618209bcc
3 changed files with 85 additions and 53 deletions

View File

@ -243,9 +243,12 @@ func main() {
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() { go func() {
<-done <-done
logger.Info(ctx, "shutdown signal received, cleaning up...")
if err := server.Shutdown(context.Background()); err != nil { if err := server.Shutdown(context.Background()); err != nil {
logger.Error(ctx, "shutdown serverError", "err", err) logger.Error(ctx, "shutdown serverError", "err", err)
} }
mq.CloseRabbitProxy()
logger.Info(ctx, "resources cleaned up, exiting")
}() }()
logger.Info(ctx, "starting ModelRT server") logger.Info(ctx, "starting ModelRT server")

View File

@ -26,12 +26,14 @@ var (
// RabbitMQProxy define stuct of rabbitMQ connection proxy // RabbitMQProxy define stuct of rabbitMQ connection proxy
type RabbitMQProxy struct { type RabbitMQProxy struct {
Conn *amqp.Connection tlsConf *tls.Config
mu sync.Mutex conn *amqp.Connection
cancel context.CancelFunc // 增加这个用于停止重连协程
mu sync.Mutex
} }
// RabbitMQCertConf define stuct of rabbitMQ connection certificates config // rabbitMQCertConf define stuct of rabbitMQ connection certificates config
type RabbitMQCertConf struct { type rabbitMQCertConf struct {
serverName string serverName string
insecureSkipVerify bool insecureSkipVerify bool
clientCert tls.Certificate clientCert tls.Certificate
@ -42,45 +44,38 @@ type RabbitMQCertConf struct {
func GetConn() *amqp.Connection { func GetConn() *amqp.Connection {
_globalRabbitMQProxy.mu.Lock() _globalRabbitMQProxy.mu.Lock()
defer _globalRabbitMQProxy.mu.Unlock() defer _globalRabbitMQProxy.mu.Unlock()
return _globalRabbitMQProxy.Conn return _globalRabbitMQProxy.conn
} }
// InitRabbitProxy return instance of rabbitMQ connection // InitRabbitProxy return instance of rabbitMQ connection
func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy { func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy {
amqpURI := generateRabbitMQURI(rCfg) amqpURI := generateRabbitMQURI(rCfg)
certConf, err := initCertConf(rCfg) tlsConf, err := initCertConf(rCfg)
if err != nil { if err != nil {
logger.Error(ctx, "init rabbitMQ cert config failed", "error", err) logger.Error(ctx, "init rabbitMQ cert config failed", "error", err)
panic(err) panic(err)
} }
rabbitMQOnce.Do(func() { rabbitMQOnce.Do(func() {
conn := initRabbitMQ(ctx, amqpURI, certConf) cancelCtx, cancel := context.WithCancel(ctx)
_globalRabbitMQProxy = &RabbitMQProxy{Conn: conn} conn := initRabbitMQ(ctx, amqpURI, tlsConf)
go _globalRabbitMQProxy.handleReconnect(ctx, amqpURI) _globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel}
go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI)
}) })
return _globalRabbitMQProxy return _globalRabbitMQProxy
} }
// initRabbitMQ return instance of rabbitMQ connection // initRabbitMQ return instance of rabbitMQ connection
func initRabbitMQ(ctx context.Context, rabbitMQURI string, certConf *RabbitMQCertConf) *amqp.Connection { func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection {
logger.Info(ctx, "connecting to rabbitMQ server", "rabbit_uri", rabbitMQURI) logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{certConf.clientCert},
RootCAs: certConf.caCertPool,
InsecureSkipVerify: certConf.insecureSkipVerify,
ServerName: certConf.serverName,
}
conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{ conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
TLSClientConfig: tlsConfig, TLSClientConfig: tlsConf,
SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
Heartbeat: 10 * time.Second, Heartbeat: 10 * time.Second,
}) })
if err != nil { if err != nil {
logger.Error(ctx, "init rabbitMQ connection failed", "error", err) logger.Error(ctx, "init rabbitMQ connection failed", "error", err)
panic(err)
} }
defer conn.Close()
return conn return conn
} }
@ -90,31 +85,65 @@ func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string)
closeChan := make(chan *amqp.Error) closeChan := make(chan *amqp.Error)
GetConn().NotifyClose(closeChan) GetConn().NotifyClose(closeChan)
err, ok := <-closeChan select {
case <-ctx.Done():
if !ok { logger.Info(ctx, "context cancelled, exiting handleReconnect")
logger.Info(ctx, "rabbitMQ notify channel closed, stopping solicitor") return
break case err, ok := <-closeChan:
} if !ok {
logger.Info(ctx, "rabbitMQ notify channel closed")
if err != nil { return
logger.Warn(ctx, "rabbitMQ connection closed by error", "reason", err)
} else {
logger.Info(ctx, "rabbitMQ connection closed normally (nil err)")
}
for {
time.Sleep(5 * time.Second)
newConn, err := amqp.Dial(rabbitMQURI)
if err == nil {
p.mu.Lock()
p.Conn = newConn
p.mu.Unlock()
logger.Info(ctx, "rabbitMQ reconnected successfully")
break
} }
logger.Error(ctx, "rabbitMQ reconnect failed", "err", err)
if err == nil {
logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect")
return
}
logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err)
} }
if !p.reconnect(ctx, rabbitMQURI) {
return
}
}
}
func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool {
for {
logger.Info(ctx, "attempting to reconnect to rabbitMQ...")
select {
case <-ctx.Done():
return false
case <-time.After(5 * time.Second):
}
newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
TLSClientConfig: p.tlsConf,
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
Heartbeat: 10 * time.Second,
})
if err == nil {
p.mu.Lock()
p.conn = newConn
p.mu.Unlock()
logger.Info(ctx, "rabbitMQ reconnected successfully")
return true
}
logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err)
}
}
// CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine
func CloseRabbitProxy() {
if _globalRabbitMQProxy != nil {
_globalRabbitMQProxy.cancel()
_globalRabbitMQProxy.mu.Lock()
if _globalRabbitMQProxy.conn != nil {
_globalRabbitMQProxy.conn.Close()
}
_globalRabbitMQProxy.mu.Unlock()
} }
} }
@ -122,7 +151,7 @@ func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
user := url.QueryEscape(rCfg.User) user := url.QueryEscape(rCfg.User)
password := url.QueryEscape(rCfg.Password) password := url.QueryEscape(rCfg.Password)
amqpURI := fmt.Sprintf("amqp://%s:%s@%s:%d/", amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/",
user, user,
password, password,
rCfg.Host, rCfg.Host,
@ -131,10 +160,10 @@ func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
return amqpURI return amqpURI
} }
func initCertConf(rCfg config.RabbitMQConfig) (*RabbitMQCertConf, error) { func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) {
certConf := &RabbitMQCertConf{ tlsConf := &tls.Config{
insecureSkipVerify: rCfg.InsecureSkipVerify, InsecureSkipVerify: rCfg.InsecureSkipVerify,
serverName: rCfg.ServerName, ServerName: rCfg.ServerName,
} }
caCert, err := os.ReadFile(rCfg.CACertPath) caCert, err := os.ReadFile(rCfg.CACertPath)
@ -145,7 +174,7 @@ func initCertConf(rCfg config.RabbitMQConfig) (*RabbitMQCertConf, error) {
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath) return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath)
} }
certConf.caCertPool = caCertPool tlsConf.RootCAs = caCertPool
certPEM, err := os.ReadFile(rCfg.ClientCertPath) certPEM, err := os.ReadFile(rCfg.ClientCertPath)
if err != nil { if err != nil {
@ -179,6 +208,6 @@ func initCertConf(rCfg config.RabbitMQConfig) (*RabbitMQCertConf, error) {
return nil, fmt.Errorf("create x509 key pair failed: %w", err) return nil, fmt.Errorf("create x509 key pair failed: %w", err)
} }
certConf.clientCert = clientCert tlsConf.Certificates = []tls.Certificate{clientCert}
return certConf, nil return tlsConf, nil
} }

View File

@ -165,7 +165,7 @@ func processCauseMap(data map[string]any) (map[string]any, error) {
} }
} }
if foundFloatKey == true { if foundFloatKey {
return causeResult, nil return causeResult, nil
} }