From 6618209bcc3709abd5164a2ee89d3087735a0b89 Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 6 Feb 2026 17:45:59 +0800 Subject: [PATCH] optimzie code of rabbitmq connection --- main.go | 3 + mq/rabbitmq_init.go | 133 +++++++++++++-------- real-time-data/real_time_data_computing.go | 2 +- 3 files changed, 85 insertions(+), 53 deletions(-) diff --git a/main.go b/main.go index 8658a56..8cc8a76 100644 --- a/main.go +++ b/main.go @@ -243,9 +243,12 @@ func main() { signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { <-done + logger.Info(ctx, "shutdown signal received, cleaning up...") if err := server.Shutdown(context.Background()); err != nil { logger.Error(ctx, "shutdown serverError", "err", err) } + mq.CloseRabbitProxy() + logger.Info(ctx, "resources cleaned up, exiting") }() logger.Info(ctx, "starting ModelRT server") diff --git a/mq/rabbitmq_init.go b/mq/rabbitmq_init.go index 9eaa9f5..511547a 100644 --- a/mq/rabbitmq_init.go +++ b/mq/rabbitmq_init.go @@ -26,12 +26,14 @@ var ( // RabbitMQProxy define stuct of rabbitMQ connection proxy type RabbitMQProxy struct { - Conn *amqp.Connection - mu sync.Mutex + tlsConf *tls.Config + conn *amqp.Connection + cancel context.CancelFunc // 增加这个用于停止重连协程 + mu sync.Mutex } -// RabbitMQCertConf define stuct of rabbitMQ connection certificates config -type RabbitMQCertConf struct { +// rabbitMQCertConf define stuct of rabbitMQ connection certificates config +type rabbitMQCertConf struct { serverName string insecureSkipVerify bool clientCert tls.Certificate @@ -42,45 +44,38 @@ type RabbitMQCertConf struct { func GetConn() *amqp.Connection { _globalRabbitMQProxy.mu.Lock() defer _globalRabbitMQProxy.mu.Unlock() - return _globalRabbitMQProxy.Conn + return _globalRabbitMQProxy.conn } // InitRabbitProxy return instance of rabbitMQ connection func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy { amqpURI := generateRabbitMQURI(rCfg) - certConf, err := initCertConf(rCfg) + tlsConf, err := initCertConf(rCfg) if err != nil { logger.Error(ctx, "init rabbitMQ cert config failed", "error", err) panic(err) } rabbitMQOnce.Do(func() { - conn := initRabbitMQ(ctx, amqpURI, certConf) - _globalRabbitMQProxy = &RabbitMQProxy{Conn: conn} - go _globalRabbitMQProxy.handleReconnect(ctx, amqpURI) + cancelCtx, cancel := context.WithCancel(ctx) + conn := initRabbitMQ(ctx, amqpURI, tlsConf) + _globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel} + go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI) }) return _globalRabbitMQProxy } // initRabbitMQ return instance of rabbitMQ connection -func initRabbitMQ(ctx context.Context, rabbitMQURI string, certConf *RabbitMQCertConf) *amqp.Connection { - logger.Info(ctx, "connecting to rabbitMQ server", "rabbit_uri", rabbitMQURI) - - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{certConf.clientCert}, - RootCAs: certConf.caCertPool, - InsecureSkipVerify: certConf.insecureSkipVerify, - ServerName: certConf.serverName, - } - +func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection { + logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI) conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{ - TLSClientConfig: tlsConfig, + TLSClientConfig: tlsConf, SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, Heartbeat: 10 * time.Second, }) if err != nil { logger.Error(ctx, "init rabbitMQ connection failed", "error", err) + panic(err) } - defer conn.Close() return conn } @@ -90,31 +85,65 @@ func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) closeChan := make(chan *amqp.Error) GetConn().NotifyClose(closeChan) - err, ok := <-closeChan - - if !ok { - logger.Info(ctx, "rabbitMQ notify channel closed, stopping solicitor") - break - } - - if err != nil { - logger.Warn(ctx, "rabbitMQ connection closed by error", "reason", err) - } else { - logger.Info(ctx, "rabbitMQ connection closed normally (nil err)") - } - - for { - time.Sleep(5 * time.Second) - newConn, err := amqp.Dial(rabbitMQURI) - if err == nil { - p.mu.Lock() - p.Conn = newConn - p.mu.Unlock() - logger.Info(ctx, "rabbitMQ reconnected successfully") - break + select { + case <-ctx.Done(): + logger.Info(ctx, "context cancelled, exiting handleReconnect") + return + case err, ok := <-closeChan: + if !ok { + logger.Info(ctx, "rabbitMQ notify channel closed") + return } - logger.Error(ctx, "rabbitMQ reconnect failed", "err", err) + + if err == nil { + logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect") + return + } + + logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err) } + + if !p.reconnect(ctx, rabbitMQURI) { + return + } + } +} + +func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool { + for { + logger.Info(ctx, "attempting to reconnect to rabbitMQ...") + select { + case <-ctx.Done(): + return false + case <-time.After(5 * time.Second): + + } + + newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{ + TLSClientConfig: p.tlsConf, + SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, + Heartbeat: 10 * time.Second, + }) + if err == nil { + p.mu.Lock() + p.conn = newConn + p.mu.Unlock() + logger.Info(ctx, "rabbitMQ reconnected successfully") + return true + } + logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err) + } +} + +// CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine +func CloseRabbitProxy() { + if _globalRabbitMQProxy != nil { + _globalRabbitMQProxy.cancel() + _globalRabbitMQProxy.mu.Lock() + if _globalRabbitMQProxy.conn != nil { + _globalRabbitMQProxy.conn.Close() + } + _globalRabbitMQProxy.mu.Unlock() } } @@ -122,7 +151,7 @@ func generateRabbitMQURI(rCfg config.RabbitMQConfig) string { user := url.QueryEscape(rCfg.User) password := url.QueryEscape(rCfg.Password) - amqpURI := fmt.Sprintf("amqp://%s:%s@%s:%d/", + amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/", user, password, rCfg.Host, @@ -131,10 +160,10 @@ func generateRabbitMQURI(rCfg config.RabbitMQConfig) string { return amqpURI } -func initCertConf(rCfg config.RabbitMQConfig) (*RabbitMQCertConf, error) { - certConf := &RabbitMQCertConf{ - insecureSkipVerify: rCfg.InsecureSkipVerify, - serverName: rCfg.ServerName, +func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) { + tlsConf := &tls.Config{ + InsecureSkipVerify: rCfg.InsecureSkipVerify, + ServerName: rCfg.ServerName, } caCert, err := os.ReadFile(rCfg.CACertPath) @@ -145,7 +174,7 @@ func initCertConf(rCfg config.RabbitMQConfig) (*RabbitMQCertConf, error) { if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath) } - certConf.caCertPool = caCertPool + tlsConf.RootCAs = caCertPool certPEM, err := os.ReadFile(rCfg.ClientCertPath) if err != nil { @@ -179,6 +208,6 @@ func initCertConf(rCfg config.RabbitMQConfig) (*RabbitMQCertConf, error) { return nil, fmt.Errorf("create x509 key pair failed: %w", err) } - certConf.clientCert = clientCert - return certConf, nil + tlsConf.Certificates = []tls.Certificate{clientCert} + return tlsConf, nil } diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_computing.go index 88a2a8a..8b4fe45 100644 --- a/real-time-data/real_time_data_computing.go +++ b/real-time-data/real_time_data_computing.go @@ -165,7 +165,7 @@ func processCauseMap(data map[string]any) (map[string]any, error) { } } - if foundFloatKey == true { + if foundFloatKey { return causeResult, nil }