feat: add event lifecycle API and UI fanout publishing - extract event status, MQ exchange/queue, and DB name constants to constants/event.go - add bson tags and IsPersisted flag to EventRecord for proper MongoDB serialization

- publish persisted alarm events to UI consumers via RabbitMQ fanout exchange                                                 - add GET /events/:event_uuid handler to query a single event from MongoDB
  - add PATCH confirm/close handlers with atomic FindOneAndUpdate status transitions                                            - register /events routes in main.go
This commit is contained in:
douxu 2026-05-08 15:43:54 +08:00
parent 44ded93411
commit 2f7cd5a8fc
7 changed files with 329 additions and 37 deletions

29
constants/event.go Normal file
View File

@ -0,0 +1,29 @@
// Package constants define constant variable
package constants
const (
// EventStatusHappened define status for event record when event just happened, no data attached yet
EventStatusHappened = iota
// EventStatusDataAttached define status for event record when event data attached, ready to be sent
EventStatusDataAttached
// EventStatusReported define status for event record when event reported to downstream
EventStatusReported
// EventStatusConfirmed define status for event record when event confirmed by operator or CIM
EventStatusConfirmed
// EventStatusClosed define status for event record when event closed due to condition recovery or manual close
EventStatusClosed
)
const (
// EventUIExchangeName define exchange name for pushing events to UI consumers
EventUIExchangeName = "event-ui-exchange"
// EventUIQueueName define queue name for UI consumers to subscribe to events
EventUIQueueName = "event-ui-queue"
)
const (
// EventDBName define MongoDB database name for event storage
EventDBName = "eventdb"
// EventCollectionName define MongoDB collection name for alarm event records
EventCollectionName = "alarms"
)

View File

@ -4,36 +4,38 @@ package event
// EventRecord define struct for CIM event record
type EventRecord struct {
// 事件名称
EventName string `json:"event"`
EventName string `json:"event" bson:"event"`
// 事件唯一标识符
EventUUID string `json:"event_uuid"`
EventUUID string `json:"event_uuid" bson:"event_uuid"`
// 事件类型
Type int `json:"type"`
Type int `json:"type" bson:"type"`
// 事件优先级 (0-9)
Priority int `json:"priority"`
Priority int `json:"priority" bson:"priority"`
// 事件状态
Status int `json:"status"`
Status int `json:"status" bson:"status"`
// 是否已持久化到数据库,由 eventRT 消费并落库后置为 true
IsPersisted bool `json:"is_persisted" bson:"is_persisted"`
// 可选模板参数
Category string `json:"category,omitempty"`
Category string `json:"category,omitempty" bson:"category,omitempty"`
// 毫秒级时间戳 (Unix epoch)
Timestamp int64 `json:"timestamp"`
Timestamp int64 `json:"timestamp" bson:"timestamp"`
// 事件来源 (station, platform, msa)
From string `json:"from"`
From string `json:"from" bson:"from"`
// 事件场景描述对象 (如阈值、当前值)
Condition map[string]any `json:"condition"`
Condition map[string]any `json:"condition" bson:"condition"`
// 与事件相关的订阅信息
AttachedSubscriptions []any `json:"attached_subscriptions"`
AttachedSubscriptions []any `json:"attached_subscriptions" bson:"attached_subscriptions"`
// 事件分析结果对象
Result map[string]any `json:"result,omitempty"`
Result map[string]any `json:"result,omitempty" bson:"result,omitempty"`
// 操作历史记录 (CIM ActivityRecord)
Operations []OperationRecord `json:"operations"`
Operations []OperationRecord `json:"operations" bson:"operations"`
// 子站告警原始数据 (CIM Alarm 数据)
Origin map[string]any `json:"origin,omitempty"`
Origin map[string]any `json:"origin,omitempty" bson:"origin,omitempty"`
}
// OperationRecord 描述对事件的操作记录,如确认(acknowledgment)等
type OperationRecord struct {
Action string `json:"action"` // 执行的动作,如 "acknowledgment"
Op string `json:"op"` // 操作人/操作账号标识
TS int64 `json:"ts"` // 操作发生的毫秒时间戳
Action string `json:"action" bson:"action"` // 执行的动作,如 "acknowledgment"
Op string `json:"op" bson:"op"` // 操作人/操作账号标识
TS int64 `json:"ts" bson:"ts"` // 操作发生的毫秒时间戳
}

83
event/publish_ui_event.go Normal file
View File

@ -0,0 +1,83 @@
// Package event define real time data evnet operation functions
package event
import (
"context"
"encoding/json"
"time"
"eventRT/constants"
"eventRT/logger"
"eventRT/mq"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
)
// initUIEventChannel declares the fanout exchange and queue used by UI consumers,
// then returns a channel ready for publishing.
func initUIEventChannel(ctx context.Context) (*amqp.Channel, error) {
ch, err := mq.GetConn().Channel()
if err != nil {
logger.Error(ctx, "open rabbitMQ channel for UI event publish failed", "error", err)
return nil, err
}
err = ch.ExchangeDeclare(constants.EventUIExchangeName, "fanout", true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare UI event exchange failed", "error", err)
return nil, err
}
_, err = ch.QueueDeclare(constants.EventUIQueueName, true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare UI event queue failed", "error", err)
return nil, err
}
// fanout exchange routes to all bound queues regardless of routing key
err = ch.QueueBind(constants.EventUIQueueName, "", constants.EventUIExchangeName, false, nil)
if err != nil {
logger.Error(ctx, "bind UI event queue to exchange failed", "error", err)
return nil, err
}
return ch, nil
}
// PublishEventToUI sets the event status to Reported and publishes the record
// to the UI-facing fanout exchange. Intended to be called after the event has
// been successfully persisted to the database.
func PublishEventToUI(ctx context.Context, ch *amqp.Channel, record *EventRecord) error {
record.Status = constants.EventStatusReported
body, err := json.Marshal(record)
if err != nil {
logger.Error(ctx, "marshal event record for UI publish failed", "event_uuid", record.EventUUID, "error", err)
return err
}
headers := amqp.Table{}
otel.GetTextMapPropagator().Inject(ctx, amqpHeaderCarrier(headers))
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err = ch.PublishWithContext(pubCtx,
constants.EventUIExchangeName,
"", // fanout exchange ignores routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
Headers: headers,
})
if err != nil {
logger.Error(ctx, "publish event to UI exchange failed", "event_uuid", record.EventUUID, "error", err)
return err
}
logger.Info(ctx, "event published to UI exchange", "event_uuid", record.EventUUID, "status", record.Status)
return nil
}

View File

@ -5,12 +5,12 @@ import (
"context"
"encoding/json"
"eventRT/constants"
"eventRT/database"
"eventRT/logger"
"eventRT/mq"
amqp "github.com/rabbitmq/amqp091-go"
"go.mongodb.org/mongo-driver/bson"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
@ -42,10 +42,8 @@ func (c amqpHeaderCarrier) Keys() []string {
var _ propagation.TextMapCarrier = amqpHeaderCarrier{}
const (
queueName = "event-up-down-queue"
exchangeName = "event-exchange"
dbName = "eventdb"
collectionName = "alarms"
queueName = "event-up-down-queue"
exchangeName = "event-exchange"
)
// ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm
@ -63,6 +61,13 @@ func ReceiptUpDownLimitAlarm(ctx context.Context) {
}
defer channel.Close()
uiChannel, err := initUIEventChannel(ctx)
if err != nil {
logger.Error(ctx, "init UI event channel failed", "error", err)
return
}
defer uiChannel.Close()
err = channel.QueueBind(
queueName, // 队列名
"event.#.updown.*",
@ -107,12 +112,12 @@ func ReceiptUpDownLimitAlarm(ctx context.Context) {
logger.Error(ctx, "message channel closed, exiting consumer loop")
return
}
processAlarmEventMessage(ctx, msg)
processAlarmEventMessage(ctx, msg, uiChannel)
}
}
}
func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery) {
func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery, uiCh *amqp.Channel) {
// extract upstream trace context injected by modelRT into AMQP headers
ctx = otel.GetTextMapPropagator().Extract(ctx, amqpHeaderCarrier(msg.Headers))
ctx, span := otel.Tracer("eventRT/event").Start(ctx, "processAlarmEventMessage")
@ -128,20 +133,8 @@ func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery) {
return
}
// TODO 根据业务逻辑进行进一步处理,如写入数据库、推送前端等
_, err = mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent)
if err != nil {
logger.Error(ctx, "failed to insert alarm event into database", "error", err)
return
}
// TODO 尝试跳过中间结构体的方式插入到 mongoDB 中
var doc bson.D
err = bson.UnmarshalExtJSON(msg.Body, true, &doc)
if err != nil {
logger.Error(ctx, "unmarshal alarm event message failed", "error", err)
}
result, err := mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc)
alarmEvent.IsPersisted = true
result, err := mongodbClient.Database(constants.EventDBName).Collection(constants.EventCollectionName).InsertOne(ctx, alarmEvent)
if err != nil {
logger.Error(ctx, "failed to insert alarm event into database", "error", err)
return
@ -152,4 +145,8 @@ func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery) {
return
}
logger.Info(ctx, "alarm event inserted into database", "result", result)
if err := PublishEventToUI(ctx, uiCh, &alarmEvent); err != nil {
logger.Error(ctx, "publish alarm event to UI failed", "event_uuid", alarmEvent.EventUUID, "error", err)
}
}

40
handler/event_query.go Normal file
View File

@ -0,0 +1,40 @@
// Package handler define HTTP handler functions for eventRT service
package handler
import (
"errors"
"net/http"
"eventRT/constants"
"eventRT/database"
"eventRT/event"
"eventRT/logger"
"github.com/gin-gonic/gin"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
// GetEventHandler handles GET /events/:event_uuid
func GetEventHandler(c *gin.Context) {
ctx := c.Request.Context()
eventUUID := c.Param("event_uuid")
var record event.EventRecord
err := database.GetMongoClient().
Database(constants.EventDBName).
Collection(constants.EventCollectionName).
FindOne(ctx, bson.M{"event_uuid": eventUUID}).
Decode(&record)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
c.JSON(http.StatusNotFound, gin.H{"error": "event not found"})
return
}
logger.Error(ctx, "query event by uuid failed", "event_uuid", eventUUID, "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"})
return
}
c.JSON(http.StatusOK, record)
}

133
handler/event_status.go Normal file
View File

@ -0,0 +1,133 @@
// Package handler define HTTP handler functions for eventRT service
package handler
import (
"context"
"errors"
"net/http"
"time"
"eventRT/constants"
"eventRT/database"
"eventRT/event"
"eventRT/logger"
"github.com/gin-gonic/gin"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
var (
errEventNotFound = errors.New("event not found")
errInvalidStatusTransition = errors.New("invalid status transition: precondition status mismatch")
)
type updateStatusRequest struct {
Op string `json:"op" binding:"required"`
}
// ConfirmEventHandler handles PATCH /events/:event_uuid/confirm
// Transitions event status from EventStatusReported to EventStatusConfirmed.
func ConfirmEventHandler(c *gin.Context) {
handleStatusTransition(c,
constants.EventStatusReported,
constants.EventStatusConfirmed,
"confirm",
)
}
// CloseEventHandler handles PATCH /events/:event_uuid/close
// Transitions event status from EventStatusConfirmed to EventStatusClosed.
func CloseEventHandler(c *gin.Context) {
handleStatusTransition(c,
constants.EventStatusConfirmed,
constants.EventStatusClosed,
"close",
)
}
func handleStatusTransition(c *gin.Context, requiredStatus, newStatus int, action string) {
ctx := c.Request.Context()
eventUUID := c.Param("event_uuid")
var req updateStatusRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "field op is required"})
return
}
updated, err := transitionEventStatus(ctx, eventUUID, req.Op, requiredStatus, newStatus, action)
if err != nil {
switch {
case errors.Is(err, errEventNotFound):
c.JSON(http.StatusNotFound, gin.H{"error": "event not found"})
case errors.Is(err, errInvalidStatusTransition):
c.JSON(http.StatusConflict, gin.H{
"error": "invalid status transition",
"required_status": requiredStatus,
})
default:
logger.Error(ctx, "update event status failed",
"event_uuid", eventUUID,
"action", action,
"error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"})
}
return
}
c.JSON(http.StatusOK, updated)
}
// transitionEventStatus atomically validates the precondition status and applies the transition.
// Uses FindOneAndUpdate with a status filter so the check and update are a single MongoDB operation.
func transitionEventStatus(ctx context.Context, eventUUID, op string, requiredStatus, newStatus int, action string) (*event.EventRecord, error) {
col := database.GetMongoClient().
Database(constants.EventDBName).
Collection(constants.EventCollectionName)
opRecord := event.OperationRecord{
Action: action,
Op: op,
TS: time.Now().UnixNano() / int64(time.Millisecond),
}
filter := bson.M{
"event_uuid": eventUUID,
"status": requiredStatus,
}
update := bson.M{
"$set": bson.M{"status": newStatus},
"$push": bson.M{"operations": opRecord},
}
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
var updated event.EventRecord
err := col.FindOneAndUpdate(ctx, filter, update, opts).Decode(&updated)
if err != nil {
if !errors.Is(err, mongo.ErrNoDocuments) {
return nil, err
}
// FindOneAndUpdate returned no documents: either the event doesn't exist
// or the current status doesn't match the required precondition.
// Do a follow-up read to distinguish the two cases.
var existing event.EventRecord
findErr := col.FindOne(ctx, bson.M{"event_uuid": eventUUID}).Decode(&existing)
if errors.Is(findErr, mongo.ErrNoDocuments) {
return nil, errEventNotFound
}
if findErr != nil {
return nil, findErr
}
return nil, errInvalidStatusTransition
}
logger.Info(ctx, "event status transitioned",
"event_uuid", eventUUID,
"from", requiredStatus,
"to", newStatus,
"op", op)
return &updated, nil
}

View File

@ -18,6 +18,7 @@ import (
"eventRT/constants"
"eventRT/database"
"eventRT/event"
"eventRT/handler"
"eventRT/logger"
"eventRT/middleware"
"eventRT/mq"
@ -108,6 +109,13 @@ func main() {
// c.JSON(200, gin.H{"status": "ok"})
// })
events := engine.Group("/events")
{
events.GET("/:event_uuid", handler.GetEventHandler)
events.PATCH("/:event_uuid/confirm", handler.ConfirmEventHandler)
events.PATCH("/:event_uuid/close", handler.CloseEventHandler)
}
server := http.Server{
Addr: eventRTConfig.ServiceAddr,
Handler: engine,