diff --git a/event/up_down_limit_alarm.go b/event/up_down_limit_alarm.go index 844c7c5..d2df65f 100644 --- a/event/up_down_limit_alarm.go +++ b/event/up_down_limit_alarm.go @@ -9,11 +9,15 @@ import ( "eventRT/logger" "eventRT/mq" + "github.com/rabbitmq/amqp091-go" "go.mongodb.org/mongo-driver/bson" ) const ( - queueName = "event-alarm-queue" + queueName = "event-up-down-queue" + exchangeName = "event-exchange" + dbName = "eventdb" + collectionName = "alarms" ) // ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm @@ -31,6 +35,18 @@ func ReceiptUpDownLimitAlarm(ctx context.Context) { } defer channel.Close() + err = channel.QueueBind( + queueName, // 队列名 + "event.#.updown.*", + exchangeName, // 交换机名 + false, + nil, + ) + if err != nil { + logger.Error(ctx, "channel bind queue and exchange failed", "error", err) + return + } + err = channel.Qos(1, 0, false) if err != nil { logger.Error(ctx, "set rabbitMQ Qos config failed", "error", err) @@ -40,7 +56,7 @@ func ReceiptUpDownLimitAlarm(ctx context.Context) { msgs, err := channel.Consume( queueName, // queue "", // consumer tag - true, // auto-ack + false, // auto-ack false, // exclusive false, // no-local false, // no-wait @@ -63,24 +79,23 @@ func ReceiptUpDownLimitAlarm(ctx context.Context) { logger.Error(ctx, "message channel closed, exiting consumer loop") return } - processAlarmMessage(ctx, msg.Body) + processAlarmEventMessage(ctx, msg) } } } -func processAlarmMessage(ctx context.Context, body []byte) { +func processAlarmEventMessage(ctx context.Context, msg amqp091.Delivery) { logger.Info(ctx, "received event alarm from modelRT up and down limit compute process") mongodbClient := database.GetMongoClient() var alarmEvent EventRecord - err := json.Unmarshal(body, &alarmEvent) + err := json.Unmarshal(msg.Body, &alarmEvent) if err != nil { logger.Error(ctx, "unmarshal alarm event message failed", "error", err) return } - // TODO: 根据业务逻辑进行进一步处理,如写入数据库、推送前端等 - logger.Info(ctx, "alarm message processed successfully", "data_len", len(body)) + // TODO 根据业务逻辑进行进一步处理,如写入数据库、推送前端等 _, err = mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent) if err != nil { logger.Error(ctx, "failed to insert alarm event into database", "error", err) @@ -89,14 +104,15 @@ func processAlarmMessage(ctx context.Context, body []byte) { // TODO 尝试跳过中间结构体的方式插入到 mongoDB 中 var doc bson.D - err = bson.UnmarshalExtJSON(body, true, &doc) + err = bson.UnmarshalExtJSON(msg.Body, true, &doc) if err != nil { logger.Error(ctx, "unmarshal alarm event message failed", "error", err) } - result, err := mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, doc) + result, err := mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc) if err != nil { logger.Error(ctx, "failed to insert alarm event into database", "error", err) return } + msg.Ack(false) logger.Info(ctx, "alarm event inserted into database", "result", result) }