186 lines
4.8 KiB
Go
186 lines
4.8 KiB
Go
// Package mq define message queue operation functions
|
|
package mq
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/pem"
|
|
"fmt"
|
|
"net/url"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"modelRT/config"
|
|
"modelRT/logger"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"github.com/youmark/pkcs8"
|
|
)
|
|
|
|
var (
|
|
_globalRabbitMQProxy *RabbitMQProxy
|
|
rabbitMQOnce sync.Once
|
|
)
|
|
|
|
// RabbitMQProxy define stuct of rabbitMQ connection proxy
|
|
type RabbitMQProxy struct {
|
|
Conn *amqp.Connection
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// RabbitMQCertConf define stuct of rabbitMQ connection certificates config
|
|
type RabbitMQCertConf struct {
|
|
serverName string
|
|
insecureSkipVerify bool
|
|
clientCert tls.Certificate
|
|
caCertPool *x509.CertPool
|
|
}
|
|
|
|
// GetConn define func to return the rabbitMQ connection
|
|
func GetConn() *amqp.Connection {
|
|
_globalRabbitMQProxy.mu.Lock()
|
|
defer _globalRabbitMQProxy.mu.Unlock()
|
|
return _globalRabbitMQProxy.Conn
|
|
}
|
|
|
|
// InitRabbitProxy return instance of rabbitMQ connection
|
|
func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy {
|
|
amqpURI := generateRabbitMQURI(rCfg)
|
|
certConf, err := readCertFiles(ctx, rCfg)
|
|
if err != nil {
|
|
logger.Error(ctx, "read rabbitMQ cert files failed", "error", err)
|
|
panic(err)
|
|
}
|
|
rabbitMQOnce.Do(func() {
|
|
conn := initRabbitMQ(ctx, amqpURI, certConf)
|
|
_globalRabbitMQProxy = &RabbitMQProxy{Conn: conn}
|
|
go _globalRabbitMQProxy.handleReconnect(ctx, amqpURI)
|
|
})
|
|
return _globalRabbitMQProxy
|
|
}
|
|
|
|
// initRabbitMQ return instance of rabbitMQ connection
|
|
func initRabbitMQ(ctx context.Context, rabbitMQURI string, certConf *RabbitMQCertConf) *amqp.Connection {
|
|
logger.Info(ctx, fmt.Sprintf("connecting to rabbitMQ server at: %s", rabbitMQURI))
|
|
|
|
tlsConfig := &tls.Config{
|
|
Certificates: []tls.Certificate{certConf.clientCert},
|
|
RootCAs: certConf.caCertPool,
|
|
InsecureSkipVerify: certConf.insecureSkipVerify,
|
|
ServerName: certConf.serverName,
|
|
}
|
|
|
|
conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
|
|
TLSClientConfig: tlsConfig,
|
|
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
|
|
Heartbeat: 10 * time.Second,
|
|
})
|
|
if err != nil {
|
|
logger.Error(ctx, "Error opening connection: ", "error", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
return conn
|
|
}
|
|
|
|
func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) {
|
|
for {
|
|
closeChan := make(chan *amqp.Error)
|
|
GetConn().NotifyClose(closeChan)
|
|
|
|
err, ok := <-closeChan
|
|
|
|
if !ok {
|
|
logger.Info(ctx, "rabbitMQ notify channel closed, stopping solicitor")
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
logger.Warn(ctx, "rabbitMQ connection closed by error", "reason", err)
|
|
} else {
|
|
logger.Info(ctx, "rabbitMQ connection closed normally (nil err)")
|
|
}
|
|
|
|
for {
|
|
time.Sleep(5 * time.Second)
|
|
newConn, err := amqp.Dial(rabbitMQURI)
|
|
if err == nil {
|
|
p.mu.Lock()
|
|
p.Conn = newConn
|
|
p.mu.Unlock()
|
|
logger.Info(ctx, "rabbitMQ reconnected successfully")
|
|
break
|
|
}
|
|
logger.Error(ctx, "rabbitMQ reconnect failed", "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
|
|
user := url.QueryEscape(rCfg.User)
|
|
password := url.QueryEscape(rCfg.Password)
|
|
|
|
amqpURI := fmt.Sprintf("amqp://%s:%s@%s:%d/",
|
|
user,
|
|
password,
|
|
rCfg.Host,
|
|
rCfg.Port,
|
|
)
|
|
return amqpURI
|
|
}
|
|
|
|
func readCertFiles(ctx context.Context, rCfg config.RabbitMQConfig) (*RabbitMQCertConf, error) {
|
|
var initFailedFlag bool
|
|
certConf := RabbitMQCertConf{
|
|
insecureSkipVerify: rCfg.InsecureSkipVerify,
|
|
}
|
|
|
|
caCert, err := os.ReadFile(rCfg.CACertPath)
|
|
if err != nil {
|
|
logger.Error(ctx, "read server ca file failed", "error", err)
|
|
initFailedFlag = true
|
|
}
|
|
caCertPool := x509.NewCertPool()
|
|
caCertPool.AppendCertsFromPEM(caCert)
|
|
certConf.caCertPool = caCertPool
|
|
|
|
keyData, err := os.ReadFile(rCfg.ClientKeyPath)
|
|
if err != nil {
|
|
logger.Error(ctx, "read private key file failed", "error", err)
|
|
initFailedFlag = true
|
|
}
|
|
|
|
block, _ := pem.Decode(keyData)
|
|
privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword))
|
|
if err != nil {
|
|
logger.Error(ctx, "parse private key failed", "error", err)
|
|
initFailedFlag = true
|
|
}
|
|
|
|
pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey)
|
|
if err != nil {
|
|
logger.Error(ctx, "parse private key failed", "error", err)
|
|
initFailedFlag = true
|
|
}
|
|
pemBlock := &pem.Block{Type: "PRIVATE KEY", Bytes: pemBytes}
|
|
|
|
certPEM, err := os.ReadFile(rCfg.ClientCertPath)
|
|
if err != nil {
|
|
logger.Error(ctx, "parse private key failed", "error", err)
|
|
initFailedFlag = true
|
|
}
|
|
clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock))
|
|
if err != nil {
|
|
logger.Error(ctx, "load client cert failed", "error", err)
|
|
initFailedFlag = true
|
|
}
|
|
certConf.serverName = rCfg.ServerName
|
|
certConf.clientCert = clientCert
|
|
if initFailedFlag {
|
|
return nil, fmt.Errorf("rabbitMQ cert files init failed")
|
|
}
|
|
return &certConf, nil
|
|
}
|