eventRT/event/up_down_limit_alarm.go

103 lines
2.7 KiB
Go

// Package event define real time data evnet operation functions
package event
import (
"context"
"encoding/json"
"eventRT/database"
"eventRT/logger"
"eventRT/mq"
"go.mongodb.org/mongo-driver/bson"
)
const (
queueName = "event-alarm-queue"
)
// ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm
func ReceiptUpDownLimitAlarm(ctx context.Context) {
conn := mq.GetConn()
if conn == nil {
logger.Error(ctx, "get rabbitMQ connection for receiving alarms failed")
return
}
channel, err := conn.Channel()
if err != nil {
logger.Error(ctx, "open rabbitMQ channel for consumer failed", "error", err)
return
}
defer channel.Close()
err = channel.Qos(1, 0, false)
if err != nil {
logger.Error(ctx, "set rabbitMQ Qos config failed", "error", err)
}
// registered consumer
msgs, err := channel.Consume(
queueName, // queue
"", // consumer tag
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logger.Error(ctx, "failed to register a consumer", "error", err)
return
}
logger.Info(ctx, "started receiving up-down limit alarms from rabbitMQ")
for {
select {
case <-ctx.Done():
logger.Info(ctx, "receipt up-down limit alarm stopped by context cancel")
return
case msg, ok := <-msgs:
if !ok {
logger.Error(ctx, "message channel closed, exiting consumer loop")
return
}
processAlarmMessage(ctx, msg.Body)
}
}
}
func processAlarmMessage(ctx context.Context, body []byte) {
logger.Info(ctx, "received event alarm from modelRT up and down limit compute process")
mongodbClient := database.GetMongoClient()
var alarmEvent EventRecord
err := json.Unmarshal(body, &alarmEvent)
if err != nil {
logger.Error(ctx, "unmarshal alarm event message failed", "error", err)
return
}
// TODO: 根据业务逻辑进行进一步处理,如写入数据库、推送前端等
logger.Info(ctx, "alarm message processed successfully", "data_len", len(body))
_, err = mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent)
if err != nil {
logger.Error(ctx, "failed to insert alarm event into database", "error", err)
return
}
// TODO 尝试跳过中间结构体的方式插入到 mongoDB 中
var doc bson.D
err = bson.UnmarshalExtJSON(body, true, &doc)
if err != nil {
logger.Error(ctx, "unmarshal alarm event message failed", "error", err)
}
result, err := mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, doc)
if err != nil {
logger.Error(ctx, "failed to insert alarm event into database", "error", err)
return
}
logger.Info(ctx, "alarm event inserted into database", "result", result)
}