From 1c385ee60d40606aa0fb4a344a698e383f662c3c Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 11 Feb 2026 16:43:42 +0800 Subject: [PATCH] optimize code of rabbitmq connection and event alarm struct --- mq/publish_event.go | 7 ++++--- mq/rabbitmq_init.go | 18 +++++++++++------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/mq/publish_event.go b/mq/publish_event.go index 7984741..2ea438a 100644 --- a/mq/publish_event.go +++ b/mq/publish_event.go @@ -19,7 +19,7 @@ const ( deadQueueName = "event-alarm-dead-letter-queue" ) -func initChannel(ctx context.Context) (*amqp.Channel, error) { +func initEventAlarmChannel(ctx context.Context) (*amqp.Channel, error) { var channel *amqp.Channel var err error @@ -70,8 +70,9 @@ func initChannel(ctx context.Context) (*amqp.Channel, error) { return channel, nil } -func pushEventToRabbitMQ(ctx context.Context, msgChan chan string) { - channel, err := initChannel(ctx) +// PushEventToRabbitMQ define func to push event alarm message to rabbitMQ +func PushEventToRabbitMQ(ctx context.Context, msgChan chan string) { + channel, err := initEventAlarmChannel(ctx) if err != nil { logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err) return diff --git a/mq/rabbitmq_init.go b/mq/rabbitmq_init.go index 511547a..96c4fc0 100644 --- a/mq/rabbitmq_init.go +++ b/mq/rabbitmq_init.go @@ -7,7 +7,6 @@ import ( "crypto/x509" "encoding/pem" "fmt" - "net/url" "os" "sync" "time" @@ -28,7 +27,7 @@ var ( type RabbitMQProxy struct { tlsConf *tls.Config conn *amqp.Connection - cancel context.CancelFunc // 增加这个用于停止重连协程 + cancel context.CancelFunc mu sync.Mutex } @@ -148,12 +147,17 @@ func CloseRabbitProxy() { } func generateRabbitMQURI(rCfg config.RabbitMQConfig) string { - user := url.QueryEscape(rCfg.User) - password := url.QueryEscape(rCfg.Password) + // TODO 考虑拆分用户名密码配置项,兼容不同认证方式 + // user := url.QueryEscape(rCfg.User) + // password := url.QueryEscape(rCfg.Password) - amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/", - user, - password, + // amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/", + // user, + // password, + // rCfg.Host, + // rCfg.Port, + // ) + amqpURI := fmt.Sprintf("amqps://%s:%d/", rCfg.Host, rCfg.Port, )