142 lines
4.7 KiB
Markdown
142 lines
4.7 KiB
Markdown
|
|
# eventRT 事件流转与存储分析
|
|||
|
|
|
|||
|
|
## 一、整体架构
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
modelRT 服务
|
|||
|
|
│
|
|||
|
|
│ 发布事件 (AMQPS + mTLS)
|
|||
|
|
▼
|
|||
|
|
RabbitMQ (exchange: event-exchange)
|
|||
|
|
│ routing key: event.#.updown.*
|
|||
|
|
▼
|
|||
|
|
Queue: event-up-down-queue
|
|||
|
|
│
|
|||
|
|
▼
|
|||
|
|
eventRT (消费者)
|
|||
|
|
│
|
|||
|
|
│ 解析 + 写入
|
|||
|
|
▼
|
|||
|
|
MongoDB (eventdb.alarms)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 二、事件数据结构 (`event/event.go`)
|
|||
|
|
|
|||
|
|
| 字段 | 类型 | 说明 |
|
|||
|
|
| :--- | :--- | :--- |
|
|||
|
|
| `event` | string | 事件名称 |
|
|||
|
|
| `event_uuid` | string | 唯一标识符 |
|
|||
|
|
| `type` | int | 事件类型 |
|
|||
|
|
| `priority` | int | 优先级 0-9 |
|
|||
|
|
| `status` | int | 事件状态(见第四节) |
|
|||
|
|
| `category` | string | 可选模板参数 |
|
|||
|
|
| `timestamp` | int64 | 毫秒级 Unix 时间戳 |
|
|||
|
|
| `from` | string | 来源:station / platform / msa |
|
|||
|
|
| `condition` | map | 触发条件(如阈值、当前值) |
|
|||
|
|
| `attached_subscriptions` | []any | 关联订阅信息 |
|
|||
|
|
| `result` | map | 事件分析结果 |
|
|||
|
|
| `operations` | []OperationRecord | 操作历史(见第四节) |
|
|||
|
|
| `origin` | map | 子站原始告警数据 (CIM Alarm) |
|
|||
|
|
|
|||
|
|
`OperationRecord` 结构:
|
|||
|
|
|
|||
|
|
```go
|
|||
|
|
Action string // 动作类型,如 "acknowledgment"
|
|||
|
|
Op string // 操作人标识
|
|||
|
|
TS int64 // 操作时间(毫秒)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 三、事件流转流程 (`event/up_down_limit_alarm.go`)
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
main.go 启动
|
|||
|
|
└─ go event.ReceiptUpDownLimitAlarm(ctx) // goroutine 异步运行
|
|||
|
|
|
|||
|
|
ReceiptUpDownLimitAlarm():
|
|||
|
|
1. GetConn() // 获取全局 RabbitMQ 连接
|
|||
|
|
2. conn.Channel() // 创建 Channel
|
|||
|
|
3. channel.QueueBind( // 绑定路由
|
|||
|
|
queue = "event-up-down-queue",
|
|||
|
|
exchange = "event-exchange",
|
|||
|
|
key = "event.#.updown.*"
|
|||
|
|
)
|
|||
|
|
4. channel.Qos(1, 0, false) // 每次只预取 1 条,防止积压
|
|||
|
|
5. channel.Consume(autoAck=false) // 手动 ACK 模式
|
|||
|
|
6. for { select msg / ctx.Done() } // 事件循环
|
|||
|
|
|
|||
|
|
processAlarmEventMessage(msg):
|
|||
|
|
1. json.Unmarshal → EventRecord // 结构体反序列化
|
|||
|
|
2. InsertOne(ctx, alarmEvent) // ⚠️ 第一次写入(结构体方式)
|
|||
|
|
3. bson.UnmarshalExtJSON → bson.D // 原始 BSON 反序列化
|
|||
|
|
4. InsertOne(ctx, doc) // 第二次写入(原始文档方式)
|
|||
|
|
5. msg.Ack(false) // 手动确认消息
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 四、事件状态流转
|
|||
|
|
|
|||
|
|
**现状:`Status` 字段有定义,但无状态常量,无状态机逻辑。**
|
|||
|
|
|
|||
|
|
从 `EventRecord.Status int` + `Operations []OperationRecord` 的设计意图推断,其预期的状态流转应为:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
[未定义初始态]
|
|||
|
|
│
|
|||
|
|
│ 由 modelRT 发布事件写入
|
|||
|
|
▼
|
|||
|
|
status = ? (活跃/未确认)
|
|||
|
|
│
|
|||
|
|
│ Operations 中追加 {action: "acknowledgment", op: 用户, ts: 时间}
|
|||
|
|
▼
|
|||
|
|
status = ? (已确认)
|
|||
|
|
│
|
|||
|
|
│ 后续处理(result 填充等)
|
|||
|
|
▼
|
|||
|
|
status = ? (已关闭/已解决)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
目前**状态码值没有任何枚举常量定义**,`constants/` 包下只有 trace、log_mode、deploy_mode,状态机逻辑尚未实现(代码有多处 `TODO`)。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 五、存储方式
|
|||
|
|
|
|||
|
|
- **数据库**:MongoDB
|
|||
|
|
- **库/集合**:`eventdb` / `alarms`
|
|||
|
|
- **写入时机**:消息到达后立即插入,无缓冲
|
|||
|
|
|
|||
|
|
**当前代码存在一个明显问题**(`up_down_limit_alarm.go:99-115`):
|
|||
|
|
|
|||
|
|
```go
|
|||
|
|
// 第一次:以结构体插入(会被 Go 零值污染,如 omitempty 未设置的字段)
|
|||
|
|
mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent)
|
|||
|
|
|
|||
|
|
// 第二次:以原始 BSON 插入(保留原始字段,是更合理的方式)
|
|||
|
|
mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
每条消息会**插入两份文档**到 `eventdb.alarms`,且 `msg.Ack` 只在第二次成功后才调用。这是个明显的 TODO 遗留问题,第一次插入应当被删除。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 六、连接可靠性设计
|
|||
|
|
|
|||
|
|
RabbitMQ 连接有自动重连机制(`mq/mq_init.go:82`):断连后每 5 秒重试一次,直到 context 取消。MongoDB 目前无重连逻辑,直接依赖驱动内置能力。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 七、总结
|
|||
|
|
|
|||
|
|
| 维度 | 现状 |
|
|||
|
|
| :--- | :--- |
|
|||
|
|
| 事件流转 | modelRT → RabbitMQ topic exchange → eventRT 消费 → MongoDB |
|
|||
|
|
| 消息可靠性 | 手动 ACK + QoS=1 + 断连重连 |
|
|||
|
|
| 存储方式 | MongoDB `eventdb.alarms`,目前每条消息重复写入两次(bug) |
|
|||
|
|
| 状态流转 | `Status` + `Operations` 字段已建模,但状态常量和状态机**尚未实现** |
|
|||
|
|
| 主要 TODO | 删除重复插入、定义状态枚举、实现状态流转逻辑、补充推送前端逻辑 |
|