From 4b52e5f3c6298d6ce3673baf118b56e019274e17 Mon Sep 17 00:00:00 2001 From: douxu Date: Sat, 28 Feb 2026 17:38:33 +0800 Subject: [PATCH] optimize code of event record and push rabbitmq func --- constants/event.go | 18 ++++++++ main.go | 2 +- ...vent.go => publish_up_down_limit_event.go} | 42 ++++++++----------- real-time-data/compute_analyzer.go | 1 + 4 files changed, 37 insertions(+), 26 deletions(-) rename mq/{publish_event.go => publish_up_down_limit_event.go} (66%) diff --git a/constants/event.go b/constants/event.go index c9bc90d..435cb2b 100644 --- a/constants/event.go +++ b/constants/event.go @@ -63,3 +63,21 @@ const ( // EventStatusClosed define status for event record when event closed, no matter it's successful or failed EventStatusClosed ) + +const ( + // EventExchangeName define exchange name for event alarm message + EventExchangeName = "event-exchange" + // EventDeadExchangeName define dead letter exchange name for event alarm message + EventDeadExchangeName = "event-dead-letter-exchange" +) + +const ( + // EventUpDownRoutingKey define routing key for up or down limit event alarm message + EventUpDownRoutingKey = "event-up-down-routing-key" + // EventUpDownDeadRoutingKey define dead letter routing key for up or down limit event alarm message + EventUpDownDeadRoutingKey = "event-up-down-dead-letter-routing-key" + // EventUpDownQueueName define queue name for up or down limit event alarm message + EventUpDownQueueName = "event-up-down-queue" + // EventUpDownDeadQueueName define dead letter queue name for event alarm message + EventUpDownDeadQueueName = "event-dead-letter-queue" +) diff --git a/main.go b/main.go index 486a7ef..c718793 100644 --- a/main.go +++ b/main.go @@ -153,7 +153,7 @@ func main() { // init rabbitmq connection mq.InitRabbitProxy(ctx, modelRTConfig.RabbitMQConfig) // async push event to rabbitMQ - go mq.PushEventToRabbitMQ(ctx, mq.MsgChan) + go mq.PushUpDownLimitEventToRabbitMQ(ctx, mq.MsgChan) postgresDBClient.Transaction(func(tx *gorm.DB) error { // load circuit diagram from postgres diff --git a/mq/publish_event.go b/mq/publish_up_down_limit_event.go similarity index 66% rename from mq/publish_event.go rename to mq/publish_up_down_limit_event.go index 7b0ee6f..e1f0bd1 100644 --- a/mq/publish_event.go +++ b/mq/publish_up_down_limit_event.go @@ -5,6 +5,7 @@ import ( "context" "time" + "modelRT/constants" "modelRT/logger" amqp "github.com/rabbitmq/amqp091-go" @@ -17,16 +18,7 @@ func init() { MsgChan = make(chan []byte, 10000) } -const ( - routingKey = "event-alarm-routing-key" - exchangeName = "event-alarm-exchange" - queueName = "event-alarm-queue" - deadRoutingKey = "event-alarm-dead-letter-routing-key" - deadExchangeName = "event-alarm-dead-letter-exchange" - deadQueueName = "event-alarm-dead-letter-queue" -) - -func initEventAlarmChannel(ctx context.Context) (*amqp.Channel, error) { +func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) { var channel *amqp.Channel var err error @@ -35,22 +27,22 @@ func initEventAlarmChannel(ctx context.Context) (*amqp.Channel, error) { logger.Error(ctx, "open rabbitMQ server channel failed", "error", err) } - err = channel.ExchangeDeclare(deadExchangeName, "topic", true, false, false, false, nil) + err = channel.ExchangeDeclare(constants.EventDeadExchangeName, "topic", true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare event dead letter exchange failed", "error", err) } - _, err = channel.QueueDeclare(deadQueueName, true, false, false, false, nil) + _, err = channel.QueueDeclare(constants.EventUpDownDeadQueueName, true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare event dead letter queue failed", "error", err) } - err = channel.QueueBind(deadQueueName, deadRoutingKey, deadExchangeName, false, nil) + err = channel.QueueBind(constants.EventUpDownDeadQueueName, constants.EventUpDownDeadRoutingKey, constants.EventDeadExchangeName, false, nil) if err != nil { logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err) } - err = channel.ExchangeDeclare(exchangeName, "topic", true, false, false, false, nil) + err = channel.ExchangeDeclare(constants.EventExchangeName, "topic", true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare event exchange failed", "error", err) } @@ -58,15 +50,15 @@ func initEventAlarmChannel(ctx context.Context) (*amqp.Channel, error) { args := amqp.Table{ // messages that accumulate to the maximum number will be automatically transferred to the dead letter queue "x-max-length": int32(50), - "x-dead-letter-exchange": deadExchangeName, - "x-dead-letter-routing-key": deadRoutingKey, + "x-dead-letter-exchange": constants.EventDeadExchangeName, + "x-dead-letter-routing-key": constants.EventUpDownDeadRoutingKey, } - _, err = channel.QueueDeclare(queueName, true, false, false, false, args) + _, err = channel.QueueDeclare(constants.EventUpDownQueueName, true, false, false, false, args) if err != nil { logger.Error(ctx, "declare event queue failed", "error", err) } - err = channel.QueueBind(queueName, routingKey, exchangeName, false, nil) + err = channel.QueueBind(constants.EventUpDownQueueName, constants.EventUpDownRoutingKey, constants.EventExchangeName, false, nil) if err != nil { logger.Error(ctx, "bind event queue with routing key and exchange failed:", "error", err) } @@ -77,9 +69,9 @@ func initEventAlarmChannel(ctx context.Context) (*amqp.Channel, error) { return channel, nil } -// PushEventToRabbitMQ define func to push event alarm message to rabbitMQ -func PushEventToRabbitMQ(ctx context.Context, msgChan chan []byte) { - channel, err := initEventAlarmChannel(ctx) +// PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ +func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan []byte) { + channel, err := initUpDownLimitEventChannel(ctx) if err != nil { logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err) return @@ -119,10 +111,10 @@ func PushEventToRabbitMQ(ctx context.Context, msgChan chan []byte) { // send event alarm message to rabbitMQ queue pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) err = channel.PublishWithContext(pubCtx, - exchangeName, // exchange - routingKey, // routing key - false, // mandatory - false, // immediate + constants.EventExchangeName, // exchange + constants.EventUpDownRoutingKey, // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: msg, diff --git a/real-time-data/compute_analyzer.go b/real-time-data/compute_analyzer.go index c1ae86a..e1c5166 100644 --- a/real-time-data/compute_analyzer.go +++ b/real-time-data/compute_analyzer.go @@ -131,6 +131,7 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE opts := []event.EventOption{ event.WithConditionValue(triggerValues, conf.Cause), event.WithTEAnalysisResult(firstBreachType), + event.WithCategory(constants.EventUpDownRoutingKey), // TODO 生成 operations并考虑如何放入 event 中 // event.WithOperations(nil) }