eventRT/mq/mq_init.go

102 lines
2.3 KiB
Go

// Package mq define message queue operation functions
package mq
import (
"context"
"fmt"
"net/url"
"sync"
"time"
"eventRT/config"
"eventRT/logger"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
_globalRabbitMQProxy *RabbitMQProxy
rabbitMQOnce sync.Once
)
// RabbitMQProxy define stuct of rabbitMQ connection proxy
type RabbitMQProxy struct {
Conn *amqp.Connection
mu sync.Mutex
}
// GetConn define func to return the rabbitMQ connection
func GetConn() *amqp.Connection {
_globalRabbitMQProxy.mu.Lock()
defer _globalRabbitMQProxy.mu.Unlock()
return _globalRabbitMQProxy.Conn
}
// InitRabbitProxy return instance of rabbitMQ connection
func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy {
amqpURI := generateRabbitMQURI(rCfg)
rabbitMQOnce.Do(func() {
conn := initRabbitMQ(ctx, amqpURI)
_globalRabbitMQProxy = &RabbitMQProxy{Conn: conn}
go _globalRabbitMQProxy.handleReconnect(ctx, amqpURI)
})
return _globalRabbitMQProxy
}
// initRabbitMQ return instance of rabbitMQ connection
func initRabbitMQ(ctx context.Context, rabbitMQURI string) *amqp.Connection {
logger.Info(ctx, fmt.Sprintf("connecting to rabbitMQ server at: %s", rabbitMQURI))
conn, err := amqp.Dial(rabbitMQURI)
if err != nil {
panic(fmt.Errorf("failed to connect to rabbitMQ: %w", err))
}
return conn
}
func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) {
for {
closeChan := make(chan *amqp.Error)
p.Conn.NotifyClose(closeChan)
err, ok := <-closeChan
if !ok {
logger.Info(ctx, "rabbitMQ notify channel closed, stopping solicitor")
break
}
if err != nil {
logger.Warn(ctx, "rabbitMQ connection closed by error", "reason", err)
} else {
logger.Info(ctx, "rabbitMQ connection closed normally (nil err)")
}
for {
time.Sleep(5 * time.Second)
newConn, err := amqp.Dial(rabbitMQURI)
if err == nil {
p.mu.Lock()
p.Conn = newConn
p.mu.Unlock()
logger.Info(ctx, "rabbitMQ reconnected successfully")
break
}
logger.Error(ctx, "rabbitMQ reconnect failed", "err", err)
}
}
}
func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
user := url.QueryEscape(rCfg.User)
password := url.QueryEscape(rCfg.Password)
amqpURI := fmt.Sprintf("amqp://%s:%s@%s:%d/",
user,
password,
rCfg.Host,
rCfg.Port,
)
return amqpURI
}