eventRT/docs/event-flow-analysis.md

142 lines
4.7 KiB
Markdown
Raw Permalink Normal View History

# eventRT 事件流转与存储分析
## 一、整体架构
```bash
modelRT 服务
│ 发布事件 (AMQPS + mTLS)
RabbitMQ (exchange: event-exchange)
│ routing key: event.#.updown.*
Queue: event-up-down-queue
eventRT (消费者)
│ 解析 + 写入
MongoDB (eventdb.alarms)
```
---
## 二、事件数据结构 (`event/event.go`)
| 字段 | 类型 | 说明 |
| :--- | :--- | :--- |
| `event` | string | 事件名称 |
| `event_uuid` | string | 唯一标识符 |
| `type` | int | 事件类型 |
| `priority` | int | 优先级 0-9 |
| `status` | int | 事件状态(见第四节) |
| `category` | string | 可选模板参数 |
| `timestamp` | int64 | 毫秒级 Unix 时间戳 |
| `from` | string | 来源station / platform / msa |
| `condition` | map | 触发条件(如阈值、当前值) |
| `attached_subscriptions` | []any | 关联订阅信息 |
| `result` | map | 事件分析结果 |
| `operations` | []OperationRecord | 操作历史(见第四节) |
| `origin` | map | 子站原始告警数据 (CIM Alarm) |
`OperationRecord` 结构:
```go
Action string // 动作类型,如 "acknowledgment"
Op string // 操作人标识
TS int64 // 操作时间(毫秒)
```
---
## 三、事件流转流程 (`event/up_down_limit_alarm.go`)
```bash
main.go 启动
└─ go event.ReceiptUpDownLimitAlarm(ctx) // goroutine 异步运行
ReceiptUpDownLimitAlarm():
1. GetConn() // 获取全局 RabbitMQ 连接
2. conn.Channel() // 创建 Channel
3. channel.QueueBind( // 绑定路由
queue = "event-up-down-queue",
exchange = "event-exchange",
key = "event.#.updown.*"
)
4. channel.Qos(1, 0, false) // 每次只预取 1 条,防止积压
5. channel.Consume(autoAck=false) // 手动 ACK 模式
6. for { select msg / ctx.Done() } // 事件循环
processAlarmEventMessage(msg):
1. json.Unmarshal → EventRecord // 结构体反序列化
2. InsertOne(ctx, alarmEvent) // ⚠️ 第一次写入(结构体方式)
3. bson.UnmarshalExtJSON → bson.D // 原始 BSON 反序列化
4. InsertOne(ctx, doc) // 第二次写入(原始文档方式)
5. msg.Ack(false) // 手动确认消息
```
---
## 四、事件状态流转
**现状:`Status` 字段有定义,但无状态常量,无状态机逻辑。**
`EventRecord.Status int` + `Operations []OperationRecord` 的设计意图推断,其预期的状态流转应为:
```bash
[未定义初始态]
│ 由 modelRT 发布事件写入
status = ? (活跃/未确认)
│ Operations 中追加 {action: "acknowledgment", op: 用户, ts: 时间}
status = ? (已确认)
│ 后续处理result 填充等)
status = ? (已关闭/已解决)
```
目前**状态码值没有任何枚举常量定义**`constants/` 包下只有 trace、log_mode、deploy_mode状态机逻辑尚未实现代码有多处 `TODO`)。
---
## 五、存储方式
- **数据库**MongoDB
- **库/集合**`eventdb` / `alarms`
- **写入时机**:消息到达后立即插入,无缓冲
**当前代码存在一个明显问题**`up_down_limit_alarm.go:99-115`
```go
// 第一次:以结构体插入(会被 Go 零值污染,如 omitempty 未设置的字段)
mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent)
// 第二次:以原始 BSON 插入(保留原始字段,是更合理的方式)
mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc)
```
每条消息会**插入两份文档**到 `eventdb.alarms`,且 `msg.Ack` 只在第二次成功后才调用。这是个明显的 TODO 遗留问题,第一次插入应当被删除。
---
## 六、连接可靠性设计
RabbitMQ 连接有自动重连机制(`mq/mq_init.go:82`):断连后每 5 秒重试一次,直到 context 取消。MongoDB 目前无重连逻辑,直接依赖驱动内置能力。
---
## 七、总结
| 维度 | 现状 |
| :--- | :--- |
| 事件流转 | modelRT → RabbitMQ topic exchange → eventRT 消费 → MongoDB |
| 消息可靠性 | 手动 ACK + QoS=1 + 断连重连 |
| 存储方式 | MongoDB `eventdb.alarms`目前每条消息重复写入两次bug |
| 状态流转 | `Status` + `Operations` 字段已建模,但状态常量和状态机**尚未实现** |
| 主要 TODO | 删除重复插入、定义状态枚举、实现状态流转逻辑、补充推送前端逻辑 |