optimize code of rabbitmq connection and event alarm struct

This commit is contained in:
douxu 2026-02-11 16:43:42 +08:00
parent 6618209bcc
commit 1c385ee60d
2 changed files with 15 additions and 10 deletions

View File

@ -19,7 +19,7 @@ const (
deadQueueName = "event-alarm-dead-letter-queue" deadQueueName = "event-alarm-dead-letter-queue"
) )
func initChannel(ctx context.Context) (*amqp.Channel, error) { func initEventAlarmChannel(ctx context.Context) (*amqp.Channel, error) {
var channel *amqp.Channel var channel *amqp.Channel
var err error var err error
@ -70,8 +70,9 @@ func initChannel(ctx context.Context) (*amqp.Channel, error) {
return channel, nil return channel, nil
} }
func pushEventToRabbitMQ(ctx context.Context, msgChan chan string) { // PushEventToRabbitMQ define func to push event alarm message to rabbitMQ
channel, err := initChannel(ctx) func PushEventToRabbitMQ(ctx context.Context, msgChan chan string) {
channel, err := initEventAlarmChannel(ctx)
if err != nil { if err != nil {
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err) logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
return return

View File

@ -7,7 +7,6 @@ import (
"crypto/x509" "crypto/x509"
"encoding/pem" "encoding/pem"
"fmt" "fmt"
"net/url"
"os" "os"
"sync" "sync"
"time" "time"
@ -28,7 +27,7 @@ var (
type RabbitMQProxy struct { type RabbitMQProxy struct {
tlsConf *tls.Config tlsConf *tls.Config
conn *amqp.Connection conn *amqp.Connection
cancel context.CancelFunc // 增加这个用于停止重连协程 cancel context.CancelFunc
mu sync.Mutex mu sync.Mutex
} }
@ -148,12 +147,17 @@ func CloseRabbitProxy() {
} }
func generateRabbitMQURI(rCfg config.RabbitMQConfig) string { func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
user := url.QueryEscape(rCfg.User) // TODO 考虑拆分用户名密码配置项,兼容不同认证方式
password := url.QueryEscape(rCfg.Password) // user := url.QueryEscape(rCfg.User)
// password := url.QueryEscape(rCfg.Password)
amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/", // amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/",
user, // user,
password, // password,
// rCfg.Host,
// rCfg.Port,
// )
amqpURI := fmt.Sprintf("amqps://%s:%d/",
rCfg.Host, rCfg.Host,
rCfg.Port, rCfg.Port,
) )