// Package mq define message queue operation functions package mq import ( "context" "crypto/tls" "crypto/x509" "encoding/pem" "fmt" "os" "sync" "time" "eventRT/config" "eventRT/logger" amqp "github.com/rabbitmq/amqp091-go" "github.com/youmark/pkcs8" ) var ( _globalRabbitMQProxy *RabbitMQProxy rabbitMQOnce sync.Once ) // RabbitMQProxy define stuct of rabbitMQ connection proxy type RabbitMQProxy struct { tlsConf *tls.Config conn *amqp.Connection cancel context.CancelFunc mu sync.Mutex } // rabbitMQCertConf define stuct of rabbitMQ connection certificates config type rabbitMQCertConf struct { serverName string insecureSkipVerify bool clientCert tls.Certificate caCertPool *x509.CertPool } // GetConn define func to return the rabbitMQ connection func GetConn() *amqp.Connection { _globalRabbitMQProxy.mu.Lock() defer _globalRabbitMQProxy.mu.Unlock() return _globalRabbitMQProxy.conn } // InitRabbitProxy return instance of rabbitMQ connection func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy { amqpURI := generateRabbitMQURI(rCfg) tlsConf, err := initCertConf(rCfg) if err != nil { logger.Error(ctx, "init rabbitMQ cert config failed", "error", err) panic(err) } rabbitMQOnce.Do(func() { cancelCtx, cancel := context.WithCancel(ctx) conn := initRabbitMQ(ctx, amqpURI, tlsConf) _globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel} go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI) }) return _globalRabbitMQProxy } // initRabbitMQ return instance of rabbitMQ connection func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection { logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI) conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{ TLSClientConfig: tlsConf, SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, Heartbeat: 10 * time.Second, }) if err != nil { logger.Error(ctx, "init rabbitMQ connection failed", "error", err) panic(err) } return conn } func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) { for { closeChan := make(chan *amqp.Error) GetConn().NotifyClose(closeChan) select { case <-ctx.Done(): logger.Info(ctx, "context cancelled, exiting handleReconnect") return case err, ok := <-closeChan: if !ok { logger.Info(ctx, "rabbitMQ notify channel closed") return } if err == nil { logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect") return } logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err) } if !p.reconnect(ctx, rabbitMQURI) { return } } } func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool { for { logger.Info(ctx, "attempting to reconnect to rabbitMQ...") select { case <-ctx.Done(): return false case <-time.After(5 * time.Second): } newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{ TLSClientConfig: p.tlsConf, SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, Heartbeat: 10 * time.Second, }) if err == nil { p.mu.Lock() p.conn = newConn p.mu.Unlock() logger.Info(ctx, "rabbitMQ reconnected successfully") return true } logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err) } } // CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine func CloseRabbitProxy() { if _globalRabbitMQProxy != nil { _globalRabbitMQProxy.cancel() _globalRabbitMQProxy.mu.Lock() if _globalRabbitMQProxy.conn != nil { _globalRabbitMQProxy.conn.Close() } _globalRabbitMQProxy.mu.Unlock() } } func generateRabbitMQURI(rCfg config.RabbitMQConfig) string { // TODO 考虑拆分用户名密码配置项,兼容不同认证方式 // user := url.QueryEscape(rCfg.User) // password := url.QueryEscape(rCfg.Password) // amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/", // user, // password, // rCfg.Host, // rCfg.Port, // ) amqpURI := fmt.Sprintf("amqps://%s:%d/", rCfg.Host, rCfg.Port, ) return amqpURI } func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) { tlsConf := &tls.Config{ InsecureSkipVerify: rCfg.InsecureSkipVerify, ServerName: rCfg.ServerName, } caCert, err := os.ReadFile(rCfg.CACertPath) if err != nil { return nil, fmt.Errorf("read server ca file failed: %w", err) } caCertPool := x509.NewCertPool() if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath) } tlsConf.RootCAs = caCertPool certPEM, err := os.ReadFile(rCfg.ClientCertPath) if err != nil { return nil, fmt.Errorf("read client cert file failed: %w", err) } keyData, err := os.ReadFile(rCfg.ClientKeyPath) if err != nil { return nil, fmt.Errorf("read private key file failed: %w", err) } block, _ := pem.Decode(keyData) if block == nil { return nil, fmt.Errorf("failed to decode PEM block from private key") } der, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword)) if err != nil { return nil, fmt.Errorf("parse password-protected private key failed: %w", err) } privBytes, err := x509.MarshalPKCS8PrivateKey(der) if err != nil { return nil, fmt.Errorf("marshal private key failed: %w", err) } keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}) clientCert, err := tls.X509KeyPair(certPEM, keyPEM) if err != nil { return nil, fmt.Errorf("create x509 key pair failed: %w", err) } tlsConf.Certificates = []tls.Certificate{clientCert} return tlsConf, nil }