From 5653fb07197aeb03acfadbf215259f522c0952e5 Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 30 Jan 2026 17:43:16 +0800 Subject: [PATCH] optimze of rabbitmq and mongodb code --- config/config.go | 24 ++++++++--- database/mongo_init.go | 47 +++++++++++++++------ go.mod | 2 +- go.sum | 4 +- main.go | 42 ++++++++++++------- mq/mq_init.go | 95 ++++++++++++++++++++++++++++++++++++++---- 6 files changed, 170 insertions(+), 44 deletions(-) diff --git a/config/config.go b/config/config.go index 7c58ee2..e5fb6ef 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "fmt" + "net/url" "github.com/spf13/viper" ) @@ -20,14 +21,17 @@ type LoggerConfig struct { // MongoDBConfig define config struct of mongoDB config type MongoDBConfig struct { - URI string `mapstructure:"uri"` + Host string `mapstructure:"host"` + User string `mapstructure:"user"` + Password string `mapstructure:"password"` Database string `mapstructure:"database"` + AuthDB string `mapstructure:"auth_db"` + Port int `mapstructure:"port"` Timeout int `mapstructure:"timeout"` } // RabbitMQConfig define config struct of RabbitMQ config type RabbitMQConfig struct { - URL string `mapstructure:"url"` User string `mapstructure:"user"` Password string `mapstructure:"password"` Host string `mapstructure:"host"` @@ -47,7 +51,8 @@ type EventRTConfig struct { ServiceConfig `mapstructure:"service"` LoggerConfig `mapstructure:"logger"` MongoDBConfig `mapstructure:"mongodb"` - RabbitMQConfig `mapstructure:"rabbitmq"` + RabbitMQConfig `mapstructure:"rabbitMQ"` + MongoDBURI string } // ReadAndInitConfig return eventRT project config struct @@ -67,7 +72,16 @@ func ReadAndInitConfig(configDir, configName, configType string) (eventRTConfig panic(fmt.Sprintf("unmarshal eventRT config failed:%s\n", err.Error())) } - // init postgres db uri - // eventRTConfig.PostgresDBURI = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", eventRTConfig.PostgresConfig.Host, eventRTConfig.PostgresConfig.Port, eventRTConfig.PostgresConfig.User, eventRTConfig.PostgresConfig.Password, eventRTConfig.PostgresConfig.DataBase) + if eventRTConfig.MongoDBConfig.Timeout <= 0 { + eventRTConfig.MongoDBConfig.Timeout = 10 + } + if eventRTConfig.MongoDBConfig.AuthDB == "" { + eventRTConfig.MongoDBConfig.AuthDB = "admin" + } + + // init mongoDB uri + user := url.QueryEscape(eventRTConfig.MongoDBConfig.User) + password := url.QueryEscape(eventRTConfig.MongoDBConfig.Password) + eventRTConfig.MongoDBURI = fmt.Sprintf("mongodb://%s:%s@%s:%d/?authSource=%s", user, password, eventRTConfig.MongoDBConfig.Host, eventRTConfig.MongoDBConfig.Port, eventRTConfig.MongoDBConfig.AuthDB) return eventRTConfig } diff --git a/database/mongo_init.go b/database/mongo_init.go index e25d3fb..0606537 100644 --- a/database/mongo_init.go +++ b/database/mongo_init.go @@ -6,28 +6,51 @@ import ( "sync" "time" - "eventRT/config" + "eventRT/config" // + "eventRT/logger" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) var ( - _globalMongoClient *mongo.Client mongoOnce sync.Once + _globalMongoClient *mongo.Client ) -// InitMongoInstance return instance of MongoDB client -func InitMongoInstance(ctx context.Context, mCfg config.MongoDBConfig) *mongo.Client { - mongoOnce.Do(func() { - cancelCtx, cancel := context.WithTimeout(ctx, time.Duration(mCfg.Timeout)*time.Second) - defer cancel() +// GetMongoClient returns the global MongoDB client +func GetMongoClient() *mongo.Client { + return _globalMongoClient +} - client, err := mongo.Connect(cancelCtx, options.Client().ApplyURI(mCfg.URI)) - if err != nil { - panic(err) - } - _globalMongoClient = client +// InitMongoInstance return instance of MongoDB client +func InitMongoInstance(ctx context.Context, eCfg config.EventRTConfig) *mongo.Client { + mongoOnce.Do(func() { + _globalMongoClient = initMongoClient(ctx, eCfg.MongoDBURI, eCfg.Timeout) }) return _globalMongoClient } + +// initMongoClient return successfully initialized MongoDB client +func initMongoClient(ctx context.Context, mongoDBURI string, timeout int) *mongo.Client { + clientOptions := options.Client().ApplyURI(mongoDBURI) + client, err := mongo.Connect(ctx, clientOptions) + if err != nil { + logger.Error(ctx, "failed to connect to MongoDB", "error", err) + panic(err) + } + + pingTimeout := time.Duration(timeout) * time.Second + if pingTimeout == 0 { + pingTimeout = 10 * time.Second + } + + pingCtx, cancel := context.WithTimeout(ctx, pingTimeout) + defer cancel() + if err := client.Ping(pingCtx, nil); err != nil { + logger.Error(ctx, "failed to ping operation with MongoDB", "error", err) + panic(err) + } + + return client +} diff --git a/go.mod b/go.mod index 0f8d995..29b4940 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,8 @@ go 1.24.1 require ( github.com/gin-gonic/gin v1.11.0 github.com/natefinch/lumberjack v2.0.0+incompatible + github.com/rabbitmq/amqp091-go v1.10.0 github.com/spf13/viper v1.21.0 - github.com/streadway/amqp v1.1.0 go.mongodb.org/mongo-driver v1.17.7 go.uber.org/zap v1.27.1 ) diff --git a/go.sum b/go.sum index 5902f50..ac4444e 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg= github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc= @@ -82,8 +84,6 @@ github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= -github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= -github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/main.go b/main.go index 3c2910b..6e9114f 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "os/signal" "path/filepath" "syscall" + "time" "eventRT/config" "eventRT/constants" @@ -31,7 +32,6 @@ var ( func main() { flag.Parse() - ctx := context.TODO() configPath := filepath.Join(*eventRTConfigDir, *eventRTConfigName+"."+*eventRTConfigType) if _, err := os.Stat(configPath); os.IsNotExist(err) { @@ -52,16 +52,27 @@ func main() { } eventRTConfig := config.ReadAndInitConfig(*eventRTConfigDir, *eventRTConfigName, *eventRTConfigType) - + // init logger instance logger.InitLoggerInstance(eventRTConfig.LoggerConfig) - mongoDBClient := database.InitMongoInstance(ctx, eventRTConfig.MongoDBConfig) + // init MongoDB client + notifyCtx, stop := signal.NotifyContext(context.TODO(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + defer stop() - // TODO 等待删除 - fmt.Println(mongoDBClient) + client := database.InitMongoInstance(notifyCtx, eventRTConfig) + defer func() { + disconnectCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := client.Disconnect(disconnectCtx); err != nil { + logger.Error(notifyCtx, "mongodb disconnect failed", "err", err) + } else { + logger.Info(notifyCtx, "mongodb connection closed gracefully") + } + }() - rabbitConn := mq.InitRabbitMQ(eventRTConfig.RabbitMQConfig) - defer rabbitConn.Close() + // init RabbitMQ connection + mq.InitRabbitProxy(notifyCtx, eventRTConfig.RabbitMQConfig) + defer mq.GetConn().Close() // use release mode in production if eventRTConfig.DeployEnv == constants.ProductionDeployMode { @@ -80,25 +91,24 @@ func main() { Handler: engine, } - // creating a System Signal Receiver - done := make(chan os.Signal, 10) - signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { - <-done - if err := server.Shutdown(context.Background()); err != nil { - logger.Error(ctx, "shutdown serverError", "err", err) + <-notifyCtx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := server.Shutdown(shutdownCtx); err != nil { + logger.Error(shutdownCtx, "server shutdown failed", "err", err) } }() - logger.Info(ctx, "starting EventRT server") + logger.Info(notifyCtx, "starting EventRT server") err := server.ListenAndServe() if err != nil { if err == http.ErrServerClosed { // the service receives the shutdown signal normally and then closes - logger.Info(ctx, "server closed under request") + logger.Info(notifyCtx, "server closed under request") } else { // abnormal shutdown of service - logger.Error(ctx, "server closed unexpected", "err", err) + logger.Error(notifyCtx, "server closed unexpected", "err", err) } } } diff --git a/mq/mq_init.go b/mq/mq_init.go index 993e4ab..682d3ce 100644 --- a/mq/mq_init.go +++ b/mq/mq_init.go @@ -2,21 +2,100 @@ package mq import ( + "context" "fmt" + "net/url" + "sync" + "time" "eventRT/config" + "eventRT/logger" - "github.com/streadway/amqp" + amqp "github.com/rabbitmq/amqp091-go" ) -func InitRabbitMQ(rCfg config.RabbitMQConfig) *amqp.Connection { - url := fmt.Sprintf("amqp://%s:%s@%s:%d/", rCfg.User, rCfg.Password, rCfg.Host, rCfg.Port) - if rCfg.URL != "" { - url = rCfg.URL - } - conn, err := amqp.Dial(url) +var ( + _globalRabbitMQProxy *RabbitMQProxy + rabbitMQOnce sync.Once +) + +// RabbitMQProxy define stuct of rabbitMQ connection proxy +type RabbitMQProxy struct { + Conn *amqp.Connection + mu sync.Mutex +} + +// GetConn define func to return the rabbitMQ connection +func GetConn() *amqp.Connection { + _globalRabbitMQProxy.mu.Lock() + defer _globalRabbitMQProxy.mu.Unlock() + return _globalRabbitMQProxy.Conn +} + +// InitRabbitProxy return instance of rabbitMQ connection +func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy { + amqpURI := generateRabbitMQURI(rCfg) + rabbitMQOnce.Do(func() { + conn := initRabbitMQ(ctx, amqpURI) + _globalRabbitMQProxy = &RabbitMQProxy{Conn: conn} + go _globalRabbitMQProxy.handleReconnect(ctx, amqpURI) + }) + return _globalRabbitMQProxy +} + +// initRabbitMQ return instance of rabbitMQ connection +func initRabbitMQ(ctx context.Context, rabbitMQURI string) *amqp.Connection { + logger.Info(ctx, fmt.Sprintf("connecting to rabbitMQ server at: %s", rabbitMQURI)) + conn, err := amqp.Dial(rabbitMQURI) if err != nil { - panic(fmt.Errorf("failed to connect to RabbitMQ: %w", err)) + panic(fmt.Errorf("failed to connect to rabbitMQ: %w", err)) } + return conn } + +func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) { + for { + closeChan := make(chan *amqp.Error) + p.Conn.NotifyClose(closeChan) + + err, ok := <-closeChan + + if !ok { + logger.Info(ctx, "rabbitMQ notify channel closed, stopping solicitor") + break + } + + if err != nil { + logger.Warn(ctx, "rabbitMQ connection closed by error", "reason", err) + } else { + logger.Info(ctx, "rabbitMQ connection closed normally (nil err)") + } + + for { + time.Sleep(5 * time.Second) + newConn, err := amqp.Dial(rabbitMQURI) + if err == nil { + p.mu.Lock() + p.Conn = newConn + p.mu.Unlock() + logger.Info(ctx, "rabbitMQ reconnected successfully") + break + } + logger.Error(ctx, "rabbitMQ reconnect failed", "err", err) + } + } +} + +func generateRabbitMQURI(rCfg config.RabbitMQConfig) string { + user := url.QueryEscape(rCfg.User) + password := url.QueryEscape(rCfg.Password) + + amqpURI := fmt.Sprintf("amqp://%s:%s@%s:%d/", + user, + password, + rCfg.Host, + rCfg.Port, + ) + return amqpURI +}