optimize code of receipt event
This commit is contained in:
parent
d239f9bd85
commit
609511c7cd
|
|
@ -9,11 +9,15 @@ import (
|
|||
"eventRT/logger"
|
||||
"eventRT/mq"
|
||||
|
||||
"github.com/rabbitmq/amqp091-go"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
)
|
||||
|
||||
const (
|
||||
queueName = "event-alarm-queue"
|
||||
queueName = "event-up-down-queue"
|
||||
exchangeName = "event-exchange"
|
||||
dbName = "eventdb"
|
||||
collectionName = "alarms"
|
||||
)
|
||||
|
||||
// ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm
|
||||
|
|
@ -31,6 +35,18 @@ func ReceiptUpDownLimitAlarm(ctx context.Context) {
|
|||
}
|
||||
defer channel.Close()
|
||||
|
||||
err = channel.QueueBind(
|
||||
queueName, // 队列名
|
||||
"event.#.updown.*",
|
||||
exchangeName, // 交换机名
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "channel bind queue and exchange failed", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = channel.Qos(1, 0, false)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "set rabbitMQ Qos config failed", "error", err)
|
||||
|
|
@ -40,7 +56,7 @@ func ReceiptUpDownLimitAlarm(ctx context.Context) {
|
|||
msgs, err := channel.Consume(
|
||||
queueName, // queue
|
||||
"", // consumer tag
|
||||
true, // auto-ack
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
|
|
@ -63,24 +79,23 @@ func ReceiptUpDownLimitAlarm(ctx context.Context) {
|
|||
logger.Error(ctx, "message channel closed, exiting consumer loop")
|
||||
return
|
||||
}
|
||||
processAlarmMessage(ctx, msg.Body)
|
||||
processAlarmEventMessage(ctx, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processAlarmMessage(ctx context.Context, body []byte) {
|
||||
func processAlarmEventMessage(ctx context.Context, msg amqp091.Delivery) {
|
||||
logger.Info(ctx, "received event alarm from modelRT up and down limit compute process")
|
||||
mongodbClient := database.GetMongoClient()
|
||||
|
||||
var alarmEvent EventRecord
|
||||
err := json.Unmarshal(body, &alarmEvent)
|
||||
err := json.Unmarshal(msg.Body, &alarmEvent)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "unmarshal alarm event message failed", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: 根据业务逻辑进行进一步处理,如写入数据库、推送前端等
|
||||
logger.Info(ctx, "alarm message processed successfully", "data_len", len(body))
|
||||
// TODO 根据业务逻辑进行进一步处理,如写入数据库、推送前端等
|
||||
_, err = mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "failed to insert alarm event into database", "error", err)
|
||||
|
|
@ -89,14 +104,15 @@ func processAlarmMessage(ctx context.Context, body []byte) {
|
|||
|
||||
// TODO 尝试跳过中间结构体的方式插入到 mongoDB 中
|
||||
var doc bson.D
|
||||
err = bson.UnmarshalExtJSON(body, true, &doc)
|
||||
err = bson.UnmarshalExtJSON(msg.Body, true, &doc)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "unmarshal alarm event message failed", "error", err)
|
||||
}
|
||||
result, err := mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, doc)
|
||||
result, err := mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "failed to insert alarm event into database", "error", err)
|
||||
return
|
||||
}
|
||||
msg.Ack(false)
|
||||
logger.Info(ctx, "alarm event inserted into database", "result", result)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue