// Package event define real time data evnet operation functions package event import ( "context" "encoding/json" "eventRT/database" "eventRT/logger" "eventRT/mq" "github.com/rabbitmq/amqp091-go" "go.mongodb.org/mongo-driver/bson" ) const ( queueName = "event-up-down-queue" exchangeName = "event-exchange" dbName = "eventdb" collectionName = "alarms" ) // ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm func ReceiptUpDownLimitAlarm(ctx context.Context) { conn := mq.GetConn() if conn == nil { logger.Error(ctx, "get rabbitMQ connection for receiving alarms failed") return } channel, err := conn.Channel() if err != nil { logger.Error(ctx, "open rabbitMQ channel for consumer failed", "error", err) return } defer channel.Close() err = channel.QueueBind( queueName, // 队列名 "event.#.updown.*", exchangeName, // 交换机名 false, nil, ) if err != nil { logger.Error(ctx, "channel bind queue and exchange failed", "error", err) return } err = channel.Qos(1, 0, false) if err != nil { logger.Error(ctx, "set rabbitMQ Qos config failed", "error", err) } // registered consumer msgs, err := channel.Consume( queueName, // queue "", // consumer tag false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { logger.Error(ctx, "failed to register a consumer", "error", err) return } logger.Info(ctx, "started receiving up-down limit alarms from rabbitMQ") for { select { case <-ctx.Done(): logger.Info(ctx, "receipt up-down limit alarm stopped by context cancel") return case msg, ok := <-msgs: if !ok { logger.Error(ctx, "message channel closed, exiting consumer loop") return } processAlarmEventMessage(ctx, msg) } } } func processAlarmEventMessage(ctx context.Context, msg amqp091.Delivery) { logger.Info(ctx, "received event alarm from modelRT up and down limit compute process") mongodbClient := database.GetMongoClient() var alarmEvent EventRecord err := json.Unmarshal(msg.Body, &alarmEvent) if err != nil { logger.Error(ctx, "unmarshal alarm event message failed", "error", err) return } // TODO 根据业务逻辑进行进一步处理,如写入数据库、推送前端等 _, err = mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent) if err != nil { logger.Error(ctx, "failed to insert alarm event into database", "error", err) return } // TODO 尝试跳过中间结构体的方式插入到 mongoDB 中 var doc bson.D err = bson.UnmarshalExtJSON(msg.Body, true, &doc) if err != nil { logger.Error(ctx, "unmarshal alarm event message failed", "error", err) } result, err := mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc) if err != nil { logger.Error(ctx, "failed to insert alarm event into database", "error", err) return } err = msg.Ack(false) if err != nil { logger.Error(ctx, "ack alarm event message failed", "error", err) return } logger.Info(ctx, "alarm event inserted into database", "result", result) }