102 lines
2.3 KiB
Go
102 lines
2.3 KiB
Go
// Package mq define message queue operation functions
|
|
package mq
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"sync"
|
|
"time"
|
|
|
|
"eventRT/config"
|
|
"eventRT/logger"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
var (
|
|
_globalRabbitMQProxy *RabbitMQProxy
|
|
rabbitMQOnce sync.Once
|
|
)
|
|
|
|
// RabbitMQProxy define stuct of rabbitMQ connection proxy
|
|
type RabbitMQProxy struct {
|
|
Conn *amqp.Connection
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// GetConn define func to return the rabbitMQ connection
|
|
func GetConn() *amqp.Connection {
|
|
_globalRabbitMQProxy.mu.Lock()
|
|
defer _globalRabbitMQProxy.mu.Unlock()
|
|
return _globalRabbitMQProxy.Conn
|
|
}
|
|
|
|
// InitRabbitProxy return instance of rabbitMQ connection
|
|
func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy {
|
|
amqpURI := generateRabbitMQURI(rCfg)
|
|
rabbitMQOnce.Do(func() {
|
|
conn := initRabbitMQ(ctx, amqpURI)
|
|
_globalRabbitMQProxy = &RabbitMQProxy{Conn: conn}
|
|
go _globalRabbitMQProxy.handleReconnect(ctx, amqpURI)
|
|
})
|
|
return _globalRabbitMQProxy
|
|
}
|
|
|
|
// initRabbitMQ return instance of rabbitMQ connection
|
|
func initRabbitMQ(ctx context.Context, rabbitMQURI string) *amqp.Connection {
|
|
logger.Info(ctx, fmt.Sprintf("connecting to rabbitMQ server at: %s", rabbitMQURI))
|
|
conn, err := amqp.Dial(rabbitMQURI)
|
|
if err != nil {
|
|
panic(fmt.Errorf("failed to connect to rabbitMQ: %w", err))
|
|
}
|
|
|
|
return conn
|
|
}
|
|
|
|
func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) {
|
|
for {
|
|
closeChan := make(chan *amqp.Error)
|
|
GetConn().NotifyClose(closeChan)
|
|
|
|
err, ok := <-closeChan
|
|
|
|
if !ok {
|
|
logger.Info(ctx, "rabbitMQ notify channel closed, stopping solicitor")
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
logger.Warn(ctx, "rabbitMQ connection closed by error", "reason", err)
|
|
} else {
|
|
logger.Info(ctx, "rabbitMQ connection closed normally (nil err)")
|
|
}
|
|
|
|
for {
|
|
time.Sleep(5 * time.Second)
|
|
newConn, err := amqp.Dial(rabbitMQURI)
|
|
if err == nil {
|
|
p.mu.Lock()
|
|
p.Conn = newConn
|
|
p.mu.Unlock()
|
|
logger.Info(ctx, "rabbitMQ reconnected successfully")
|
|
break
|
|
}
|
|
logger.Error(ctx, "rabbitMQ reconnect failed", "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
|
|
user := url.QueryEscape(rCfg.User)
|
|
password := url.QueryEscape(rCfg.Password)
|
|
|
|
amqpURI := fmt.Sprintf("amqp://%s:%s@%s:%d/",
|
|
user,
|
|
password,
|
|
rCfg.Host,
|
|
rCfg.Port,
|
|
)
|
|
return amqpURI
|
|
}
|