From 2f7cd5a8fcd22c92a1fe851a9c86df8b673aecbf Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 8 May 2026 15:43:54 +0800 Subject: [PATCH] feat: add event lifecycle API and UI fanout publishing - extract event status, MQ exchange/queue, and DB name constants to constants/event.go - add bson tags and IsPersisted flag to EventRecord for proper MongoDB serialization - publish persisted alarm events to UI consumers via RabbitMQ fanout exchange - add GET /events/:event_uuid handler to query a single event from MongoDB - add PATCH confirm/close handlers with atomic FindOneAndUpdate status transitions - register /events routes in main.go --- constants/event.go | 29 ++++++++ event/event.go | 34 ++++----- event/publish_ui_event.go | 83 ++++++++++++++++++++++ event/up_down_limit_alarm.go | 39 +++++----- handler/event_query.go | 40 +++++++++++ handler/event_status.go | 133 +++++++++++++++++++++++++++++++++++ main.go | 8 +++ 7 files changed, 329 insertions(+), 37 deletions(-) create mode 100644 constants/event.go create mode 100644 event/publish_ui_event.go create mode 100644 handler/event_query.go create mode 100644 handler/event_status.go diff --git a/constants/event.go b/constants/event.go new file mode 100644 index 0000000..be70758 --- /dev/null +++ b/constants/event.go @@ -0,0 +1,29 @@ +// Package constants define constant variable +package constants + +const ( + // EventStatusHappened define status for event record when event just happened, no data attached yet + EventStatusHappened = iota + // EventStatusDataAttached define status for event record when event data attached, ready to be sent + EventStatusDataAttached + // EventStatusReported define status for event record when event reported to downstream + EventStatusReported + // EventStatusConfirmed define status for event record when event confirmed by operator or CIM + EventStatusConfirmed + // EventStatusClosed define status for event record when event closed due to condition recovery or manual close + EventStatusClosed +) + +const ( + // EventUIExchangeName define exchange name for pushing events to UI consumers + EventUIExchangeName = "event-ui-exchange" + // EventUIQueueName define queue name for UI consumers to subscribe to events + EventUIQueueName = "event-ui-queue" +) + +const ( + // EventDBName define MongoDB database name for event storage + EventDBName = "eventdb" + // EventCollectionName define MongoDB collection name for alarm event records + EventCollectionName = "alarms" +) diff --git a/event/event.go b/event/event.go index 6740251..0ab1007 100644 --- a/event/event.go +++ b/event/event.go @@ -4,36 +4,38 @@ package event // EventRecord define struct for CIM event record type EventRecord struct { // 事件名称 - EventName string `json:"event"` + EventName string `json:"event" bson:"event"` // 事件唯一标识符 - EventUUID string `json:"event_uuid"` + EventUUID string `json:"event_uuid" bson:"event_uuid"` // 事件类型 - Type int `json:"type"` + Type int `json:"type" bson:"type"` // 事件优先级 (0-9) - Priority int `json:"priority"` + Priority int `json:"priority" bson:"priority"` // 事件状态 - Status int `json:"status"` + Status int `json:"status" bson:"status"` + // 是否已持久化到数据库,由 eventRT 消费并落库后置为 true + IsPersisted bool `json:"is_persisted" bson:"is_persisted"` // 可选模板参数 - Category string `json:"category,omitempty"` + Category string `json:"category,omitempty" bson:"category,omitempty"` // 毫秒级时间戳 (Unix epoch) - Timestamp int64 `json:"timestamp"` + Timestamp int64 `json:"timestamp" bson:"timestamp"` // 事件来源 (station, platform, msa) - From string `json:"from"` + From string `json:"from" bson:"from"` // 事件场景描述对象 (如阈值、当前值) - Condition map[string]any `json:"condition"` + Condition map[string]any `json:"condition" bson:"condition"` // 与事件相关的订阅信息 - AttachedSubscriptions []any `json:"attached_subscriptions"` + AttachedSubscriptions []any `json:"attached_subscriptions" bson:"attached_subscriptions"` // 事件分析结果对象 - Result map[string]any `json:"result,omitempty"` + Result map[string]any `json:"result,omitempty" bson:"result,omitempty"` // 操作历史记录 (CIM ActivityRecord) - Operations []OperationRecord `json:"operations"` + Operations []OperationRecord `json:"operations" bson:"operations"` // 子站告警原始数据 (CIM Alarm 数据) - Origin map[string]any `json:"origin,omitempty"` + Origin map[string]any `json:"origin,omitempty" bson:"origin,omitempty"` } // OperationRecord 描述对事件的操作记录,如确认(acknowledgment)等 type OperationRecord struct { - Action string `json:"action"` // 执行的动作,如 "acknowledgment" - Op string `json:"op"` // 操作人/操作账号标识 - TS int64 `json:"ts"` // 操作发生的毫秒时间戳 + Action string `json:"action" bson:"action"` // 执行的动作,如 "acknowledgment" + Op string `json:"op" bson:"op"` // 操作人/操作账号标识 + TS int64 `json:"ts" bson:"ts"` // 操作发生的毫秒时间戳 } diff --git a/event/publish_ui_event.go b/event/publish_ui_event.go new file mode 100644 index 0000000..55d5cdc --- /dev/null +++ b/event/publish_ui_event.go @@ -0,0 +1,83 @@ +// Package event define real time data evnet operation functions +package event + +import ( + "context" + "encoding/json" + "time" + + "eventRT/constants" + "eventRT/logger" + "eventRT/mq" + + amqp "github.com/rabbitmq/amqp091-go" + "go.opentelemetry.io/otel" +) + +// initUIEventChannel declares the fanout exchange and queue used by UI consumers, +// then returns a channel ready for publishing. +func initUIEventChannel(ctx context.Context) (*amqp.Channel, error) { + ch, err := mq.GetConn().Channel() + if err != nil { + logger.Error(ctx, "open rabbitMQ channel for UI event publish failed", "error", err) + return nil, err + } + + err = ch.ExchangeDeclare(constants.EventUIExchangeName, "fanout", true, false, false, false, nil) + if err != nil { + logger.Error(ctx, "declare UI event exchange failed", "error", err) + return nil, err + } + + _, err = ch.QueueDeclare(constants.EventUIQueueName, true, false, false, false, nil) + if err != nil { + logger.Error(ctx, "declare UI event queue failed", "error", err) + return nil, err + } + + // fanout exchange routes to all bound queues regardless of routing key + err = ch.QueueBind(constants.EventUIQueueName, "", constants.EventUIExchangeName, false, nil) + if err != nil { + logger.Error(ctx, "bind UI event queue to exchange failed", "error", err) + return nil, err + } + + return ch, nil +} + +// PublishEventToUI sets the event status to Reported and publishes the record +// to the UI-facing fanout exchange. Intended to be called after the event has +// been successfully persisted to the database. +func PublishEventToUI(ctx context.Context, ch *amqp.Channel, record *EventRecord) error { + record.Status = constants.EventStatusReported + + body, err := json.Marshal(record) + if err != nil { + logger.Error(ctx, "marshal event record for UI publish failed", "event_uuid", record.EventUUID, "error", err) + return err + } + + headers := amqp.Table{} + otel.GetTextMapPropagator().Inject(ctx, amqpHeaderCarrier(headers)) + + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + err = ch.PublishWithContext(pubCtx, + constants.EventUIExchangeName, + "", // fanout exchange ignores routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: body, + Headers: headers, + }) + if err != nil { + logger.Error(ctx, "publish event to UI exchange failed", "event_uuid", record.EventUUID, "error", err) + return err + } + + logger.Info(ctx, "event published to UI exchange", "event_uuid", record.EventUUID, "status", record.Status) + return nil +} diff --git a/event/up_down_limit_alarm.go b/event/up_down_limit_alarm.go index 5970b51..7dca9fe 100644 --- a/event/up_down_limit_alarm.go +++ b/event/up_down_limit_alarm.go @@ -5,12 +5,12 @@ import ( "context" "encoding/json" + "eventRT/constants" "eventRT/database" "eventRT/logger" "eventRT/mq" amqp "github.com/rabbitmq/amqp091-go" - "go.mongodb.org/mongo-driver/bson" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" ) @@ -42,10 +42,8 @@ func (c amqpHeaderCarrier) Keys() []string { var _ propagation.TextMapCarrier = amqpHeaderCarrier{} const ( - queueName = "event-up-down-queue" - exchangeName = "event-exchange" - dbName = "eventdb" - collectionName = "alarms" + queueName = "event-up-down-queue" + exchangeName = "event-exchange" ) // ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm @@ -63,6 +61,13 @@ func ReceiptUpDownLimitAlarm(ctx context.Context) { } defer channel.Close() + uiChannel, err := initUIEventChannel(ctx) + if err != nil { + logger.Error(ctx, "init UI event channel failed", "error", err) + return + } + defer uiChannel.Close() + err = channel.QueueBind( queueName, // 队列名 "event.#.updown.*", @@ -107,12 +112,12 @@ func ReceiptUpDownLimitAlarm(ctx context.Context) { logger.Error(ctx, "message channel closed, exiting consumer loop") return } - processAlarmEventMessage(ctx, msg) + processAlarmEventMessage(ctx, msg, uiChannel) } } } -func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery) { +func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery, uiCh *amqp.Channel) { // extract upstream trace context injected by modelRT into AMQP headers ctx = otel.GetTextMapPropagator().Extract(ctx, amqpHeaderCarrier(msg.Headers)) ctx, span := otel.Tracer("eventRT/event").Start(ctx, "processAlarmEventMessage") @@ -128,20 +133,8 @@ func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery) { return } - // TODO 根据业务逻辑进行进一步处理,如写入数据库、推送前端等 - _, err = mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent) - if err != nil { - logger.Error(ctx, "failed to insert alarm event into database", "error", err) - return - } - - // TODO 尝试跳过中间结构体的方式插入到 mongoDB 中 - var doc bson.D - err = bson.UnmarshalExtJSON(msg.Body, true, &doc) - if err != nil { - logger.Error(ctx, "unmarshal alarm event message failed", "error", err) - } - result, err := mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc) + alarmEvent.IsPersisted = true + result, err := mongodbClient.Database(constants.EventDBName).Collection(constants.EventCollectionName).InsertOne(ctx, alarmEvent) if err != nil { logger.Error(ctx, "failed to insert alarm event into database", "error", err) return @@ -152,4 +145,8 @@ func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery) { return } logger.Info(ctx, "alarm event inserted into database", "result", result) + + if err := PublishEventToUI(ctx, uiCh, &alarmEvent); err != nil { + logger.Error(ctx, "publish alarm event to UI failed", "event_uuid", alarmEvent.EventUUID, "error", err) + } } diff --git a/handler/event_query.go b/handler/event_query.go new file mode 100644 index 0000000..da4014f --- /dev/null +++ b/handler/event_query.go @@ -0,0 +1,40 @@ +// Package handler define HTTP handler functions for eventRT service +package handler + +import ( + "errors" + "net/http" + + "eventRT/constants" + "eventRT/database" + "eventRT/event" + "eventRT/logger" + + "github.com/gin-gonic/gin" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +// GetEventHandler handles GET /events/:event_uuid +func GetEventHandler(c *gin.Context) { + ctx := c.Request.Context() + eventUUID := c.Param("event_uuid") + + var record event.EventRecord + err := database.GetMongoClient(). + Database(constants.EventDBName). + Collection(constants.EventCollectionName). + FindOne(ctx, bson.M{"event_uuid": eventUUID}). + Decode(&record) + if err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + c.JSON(http.StatusNotFound, gin.H{"error": "event not found"}) + return + } + logger.Error(ctx, "query event by uuid failed", "event_uuid", eventUUID, "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"}) + return + } + + c.JSON(http.StatusOK, record) +} diff --git a/handler/event_status.go b/handler/event_status.go new file mode 100644 index 0000000..f2c28ed --- /dev/null +++ b/handler/event_status.go @@ -0,0 +1,133 @@ +// Package handler define HTTP handler functions for eventRT service +package handler + +import ( + "context" + "errors" + "net/http" + "time" + + "eventRT/constants" + "eventRT/database" + "eventRT/event" + "eventRT/logger" + + "github.com/gin-gonic/gin" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +var ( + errEventNotFound = errors.New("event not found") + errInvalidStatusTransition = errors.New("invalid status transition: precondition status mismatch") +) + +type updateStatusRequest struct { + Op string `json:"op" binding:"required"` +} + +// ConfirmEventHandler handles PATCH /events/:event_uuid/confirm +// Transitions event status from EventStatusReported to EventStatusConfirmed. +func ConfirmEventHandler(c *gin.Context) { + handleStatusTransition(c, + constants.EventStatusReported, + constants.EventStatusConfirmed, + "confirm", + ) +} + +// CloseEventHandler handles PATCH /events/:event_uuid/close +// Transitions event status from EventStatusConfirmed to EventStatusClosed. +func CloseEventHandler(c *gin.Context) { + handleStatusTransition(c, + constants.EventStatusConfirmed, + constants.EventStatusClosed, + "close", + ) +} + +func handleStatusTransition(c *gin.Context, requiredStatus, newStatus int, action string) { + ctx := c.Request.Context() + eventUUID := c.Param("event_uuid") + + var req updateStatusRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "field op is required"}) + return + } + + updated, err := transitionEventStatus(ctx, eventUUID, req.Op, requiredStatus, newStatus, action) + if err != nil { + switch { + case errors.Is(err, errEventNotFound): + c.JSON(http.StatusNotFound, gin.H{"error": "event not found"}) + case errors.Is(err, errInvalidStatusTransition): + c.JSON(http.StatusConflict, gin.H{ + "error": "invalid status transition", + "required_status": requiredStatus, + }) + default: + logger.Error(ctx, "update event status failed", + "event_uuid", eventUUID, + "action", action, + "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"}) + } + return + } + + c.JSON(http.StatusOK, updated) +} + +// transitionEventStatus atomically validates the precondition status and applies the transition. +// Uses FindOneAndUpdate with a status filter so the check and update are a single MongoDB operation. +func transitionEventStatus(ctx context.Context, eventUUID, op string, requiredStatus, newStatus int, action string) (*event.EventRecord, error) { + col := database.GetMongoClient(). + Database(constants.EventDBName). + Collection(constants.EventCollectionName) + + opRecord := event.OperationRecord{ + Action: action, + Op: op, + TS: time.Now().UnixNano() / int64(time.Millisecond), + } + + filter := bson.M{ + "event_uuid": eventUUID, + "status": requiredStatus, + } + update := bson.M{ + "$set": bson.M{"status": newStatus}, + "$push": bson.M{"operations": opRecord}, + } + opts := options.FindOneAndUpdate().SetReturnDocument(options.After) + + var updated event.EventRecord + err := col.FindOneAndUpdate(ctx, filter, update, opts).Decode(&updated) + if err != nil { + if !errors.Is(err, mongo.ErrNoDocuments) { + return nil, err + } + // FindOneAndUpdate returned no documents: either the event doesn't exist + // or the current status doesn't match the required precondition. + // Do a follow-up read to distinguish the two cases. + var existing event.EventRecord + findErr := col.FindOne(ctx, bson.M{"event_uuid": eventUUID}).Decode(&existing) + if errors.Is(findErr, mongo.ErrNoDocuments) { + return nil, errEventNotFound + } + if findErr != nil { + return nil, findErr + } + return nil, errInvalidStatusTransition + } + + logger.Info(ctx, "event status transitioned", + "event_uuid", eventUUID, + "from", requiredStatus, + "to", newStatus, + "op", op) + + return &updated, nil +} diff --git a/main.go b/main.go index 3588090..8e71591 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "eventRT/constants" "eventRT/database" "eventRT/event" + "eventRT/handler" "eventRT/logger" "eventRT/middleware" "eventRT/mq" @@ -108,6 +109,13 @@ func main() { // c.JSON(200, gin.H{"status": "ok"}) // }) + events := engine.Group("/events") + { + events.GET("/:event_uuid", handler.GetEventHandler) + events.PATCH("/:event_uuid/confirm", handler.ConfirmEventHandler) + events.PATCH("/:event_uuid/close", handler.CloseEventHandler) + } + server := http.Server{ Addr: eventRTConfig.ServiceAddr, Handler: engine,