eventRT/docs/event-flow-analysis.md

142 lines
4.7 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# eventRT 事件流转与存储分析
## 一、整体架构
```bash
modelRT 服务
│ 发布事件 (AMQPS + mTLS)
RabbitMQ (exchange: event-exchange)
│ routing key: event.#.updown.*
Queue: event-up-down-queue
eventRT (消费者)
│ 解析 + 写入
MongoDB (eventdb.alarms)
```
---
## 二、事件数据结构 (`event/event.go`)
| 字段 | 类型 | 说明 |
| :--- | :--- | :--- |
| `event` | string | 事件名称 |
| `event_uuid` | string | 唯一标识符 |
| `type` | int | 事件类型 |
| `priority` | int | 优先级 0-9 |
| `status` | int | 事件状态(见第四节) |
| `category` | string | 可选模板参数 |
| `timestamp` | int64 | 毫秒级 Unix 时间戳 |
| `from` | string | 来源station / platform / msa |
| `condition` | map | 触发条件(如阈值、当前值) |
| `attached_subscriptions` | []any | 关联订阅信息 |
| `result` | map | 事件分析结果 |
| `operations` | []OperationRecord | 操作历史(见第四节) |
| `origin` | map | 子站原始告警数据 (CIM Alarm) |
`OperationRecord` 结构:
```go
Action string // 动作类型,如 "acknowledgment"
Op string // 操作人标识
TS int64 // 操作时间(毫秒)
```
---
## 三、事件流转流程 (`event/up_down_limit_alarm.go`)
```bash
main.go 启动
└─ go event.ReceiptUpDownLimitAlarm(ctx) // goroutine 异步运行
ReceiptUpDownLimitAlarm():
1. GetConn() // 获取全局 RabbitMQ 连接
2. conn.Channel() // 创建 Channel
3. channel.QueueBind( // 绑定路由
queue = "event-up-down-queue",
exchange = "event-exchange",
key = "event.#.updown.*"
)
4. channel.Qos(1, 0, false) // 每次只预取 1 条,防止积压
5. channel.Consume(autoAck=false) // 手动 ACK 模式
6. for { select msg / ctx.Done() } // 事件循环
processAlarmEventMessage(msg):
1. json.Unmarshal → EventRecord // 结构体反序列化
2. InsertOne(ctx, alarmEvent) // ⚠️ 第一次写入(结构体方式)
3. bson.UnmarshalExtJSON → bson.D // 原始 BSON 反序列化
4. InsertOne(ctx, doc) // 第二次写入(原始文档方式)
5. msg.Ack(false) // 手动确认消息
```
---
## 四、事件状态流转
**现状:`Status` 字段有定义,但无状态常量,无状态机逻辑。**
`EventRecord.Status int` + `Operations []OperationRecord` 的设计意图推断,其预期的状态流转应为:
```bash
[未定义初始态]
│ 由 modelRT 发布事件写入
status = ? (活跃/未确认)
│ Operations 中追加 {action: "acknowledgment", op: 用户, ts: 时间}
status = ? (已确认)
│ 后续处理result 填充等)
status = ? (已关闭/已解决)
```
目前**状态码值没有任何枚举常量定义**`constants/` 包下只有 trace、log_mode、deploy_mode状态机逻辑尚未实现代码有多处 `TODO`)。
---
## 五、存储方式
- **数据库**MongoDB
- **库/集合**`eventdb` / `alarms`
- **写入时机**:消息到达后立即插入,无缓冲
**当前代码存在一个明显问题**`up_down_limit_alarm.go:99-115`
```go
// 第一次:以结构体插入(会被 Go 零值污染,如 omitempty 未设置的字段)
mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent)
// 第二次:以原始 BSON 插入(保留原始字段,是更合理的方式)
mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc)
```
每条消息会**插入两份文档**到 `eventdb.alarms`,且 `msg.Ack` 只在第二次成功后才调用。这是个明显的 TODO 遗留问题,第一次插入应当被删除。
---
## 六、连接可靠性设计
RabbitMQ 连接有自动重连机制(`mq/mq_init.go:82`):断连后每 5 秒重试一次,直到 context 取消。MongoDB 目前无重连逻辑,直接依赖驱动内置能力。
---
## 七、总结
| 维度 | 现状 |
| :--- | :--- |
| 事件流转 | modelRT → RabbitMQ topic exchange → eventRT 消费 → MongoDB |
| 消息可靠性 | 手动 ACK + QoS=1 + 断连重连 |
| 存储方式 | MongoDB `eventdb.alarms`目前每条消息重复写入两次bug |
| 状态流转 | `Status` + `Operations` 字段已建模,但状态常量和状态机**尚未实现** |
| 主要 TODO | 删除重复插入、定义状态枚举、实现状态流转逻辑、补充推送前端逻辑 |