eventRT/docs/event-flow-analysis.md

4.7 KiB
Raw Blame History

eventRT 事件流转与存储分析

一、整体架构

modelRT 服务
    │
    │ 发布事件 (AMQPS + mTLS)
    ▼
RabbitMQ (exchange: event-exchange)
    │ routing key: event.#.updown.*
    ▼
Queue: event-up-down-queue
    │
    ▼
eventRT (消费者)
    │
    │ 解析 + 写入
    ▼
MongoDB (eventdb.alarms)

二、事件数据结构 (event/event.go)

字段 类型 说明
event string 事件名称
event_uuid string 唯一标识符
type int 事件类型
priority int 优先级 0-9
status int 事件状态(见第四节)
category string 可选模板参数
timestamp int64 毫秒级 Unix 时间戳
from string 来源station / platform / msa
condition map 触发条件(如阈值、当前值)
attached_subscriptions []any 关联订阅信息
result map 事件分析结果
operations []OperationRecord 操作历史(见第四节)
origin map 子站原始告警数据 (CIM Alarm)

OperationRecord 结构:

Action string  // 动作类型,如 "acknowledgment"
Op     string  // 操作人标识
TS     int64   // 操作时间(毫秒)

三、事件流转流程 (event/up_down_limit_alarm.go)

main.go 启动
  └─ go event.ReceiptUpDownLimitAlarm(ctx)       // goroutine 异步运行

ReceiptUpDownLimitAlarm():
  1. GetConn()                                    // 获取全局 RabbitMQ 连接
  2. conn.Channel()                               // 创建 Channel
  3. channel.QueueBind(                           // 绑定路由
       queue    = "event-up-down-queue",
       exchange = "event-exchange",
       key      = "event.#.updown.*"
     )
  4. channel.Qos(1, 0, false)                     // 每次只预取 1 条,防止积压
  5. channel.Consume(autoAck=false)               // 手动 ACK 模式
  6. for { select msg / ctx.Done() }              // 事件循环

processAlarmEventMessage(msg):
  1. json.Unmarshal → EventRecord                 // 结构体反序列化
  2. InsertOne(ctx, alarmEvent)                   // ⚠️ 第一次写入(结构体方式)
  3. bson.UnmarshalExtJSON → bson.D               // 原始 BSON 反序列化
  4. InsertOne(ctx, doc)                          // 第二次写入(原始文档方式)
  5. msg.Ack(false)                               // 手动确认消息

四、事件状态流转

现状:Status 字段有定义,但无状态常量,无状态机逻辑。

EventRecord.Status int + Operations []OperationRecord 的设计意图推断,其预期的状态流转应为:

[未定义初始态]
      │
      │ 由 modelRT 发布事件写入
      ▼
   status = ? (活跃/未确认)
      │
      │ Operations 中追加 {action: "acknowledgment", op: 用户, ts: 时间}status = ? (已确认)
      │
      │ 后续处理result 填充等)
      ▼
   status = ? (已关闭/已解决)

目前状态码值没有任何枚举常量定义constants/ 包下只有 trace、log_mode、deploy_mode状态机逻辑尚未实现代码有多处 TODO)。


五、存储方式

  • 数据库MongoDB
  • 库/集合eventdb / alarms
  • 写入时机:消息到达后立即插入,无缓冲

当前代码存在一个明显问题up_down_limit_alarm.go:99-115

// 第一次:以结构体插入(会被 Go 零值污染,如 omitempty 未设置的字段)
mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent)

// 第二次:以原始 BSON 插入(保留原始字段,是更合理的方式)
mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc)

每条消息会插入两份文档eventdb.alarms,且 msg.Ack 只在第二次成功后才调用。这是个明显的 TODO 遗留问题,第一次插入应当被删除。


六、连接可靠性设计

RabbitMQ 连接有自动重连机制(mq/mq_init.go:82):断连后每 5 秒重试一次,直到 context 取消。MongoDB 目前无重连逻辑,直接依赖驱动内置能力。


七、总结

维度 现状
事件流转 modelRT → RabbitMQ topic exchange → eventRT 消费 → MongoDB
消息可靠性 手动 ACK + QoS=1 + 断连重连
存储方式 MongoDB eventdb.alarms目前每条消息重复写入两次bug
状态流转 Status + Operations 字段已建模,但状态常量和状态机尚未实现
主要 TODO 删除重复插入、定义状态枚举、实现状态流转逻辑、补充推送前端逻辑