Compare commits

..

22 Commits

Author SHA1 Message Date
douxu c20b36373a feat: add MongoDB K8s deployment manifests and expand deploy guide
- add mongodb StatefulSet, Service, PVC, and Secret manifests for Minikube
  - restructure deploy.md MongoDB section into Docker (1.1) and K8s (1.2) paths
  - add connection verification, cleanup, and teardown sections to deploy guide
  - document local go build/run workflow (section 5.2-5.4)
  - reference eventrt-certs-secret.sh helper and Pod readiness waits
  - fix MongoDB password env var name in docs (INITDB_ROOT_PASSWORD)
  - set imagePullPolicy IfNotPresent on rabbitmq deployment for local images
2026-06-11 16:20:39 +08:00
douxu 4c57c37c26 refactor: sync logger improvements from modelrt to eventrt
- add LokiConfig struct and Loki field to LoggerConfig for dev direct-push
  - replace getEncoder/getLogWriter with mode-aware encoder and getWriteSyncer
    (colored console in dev, JSON in container; stdout instead of stderr)
  - add containerFields() to inject K8s pod/namespace/node as global log fields
  - add loki_syncer.go: async batched push to Loki with 512-entry channel
  - introduce makeLogFieldsSkip/getLoggerCallerInfoSkip for wrapper call-frame support
  - expose ErrorSkip/WarnSkip/InfoSkip facade functions for skip-frame logging
  - add loki.endpoint placeholder to eventrt-configmap.yaml for self-documentation
2026-06-08 13:58:48 +08:00
douxu 04d81cedce chore: update k8s deploy config for local Minikube dev environment
- add eventrt-certs-secret.sh helper script for creating cert secret
  - update configmap: use K8s service DNS names for mongodb and jaeger
  - switch deploy_env to development and image to local eventrt:v1
  - add explicit command entrypoint to deployment manifest
  - update mongodb credentials to match local dev setup
2026-06-05 16:38:48 +08:00
douxu 582a64ad20 docs: expand eventRT image deploy guide and bump Go to 1.26
- add three-stage build table (builder/certs/runtime) to section 3.1
  - add --build-arg USER_ID option for custom non-root UID
  - add method 2: load pre-built local image (e.g. eventrt:v1) into Minikube
  - add smoke test subsection 3.1.1 with size/inspect/run checks
  - bump base image from golang:1.25-alpine to golang:1.26-alpine
  - remove bundled config.example.yaml from image (config should be mounted at runtime)
2026-06-02 10:28:31 +08:00
douxu c7dec53ded docs: expand deploy guide to cover full project setup
- add MongoDB Docker deployment steps with init and auth setup
  - restructure RabbitMQ section with step-by-step cert generation and K8s deploy
  - add EventRT Kubernetes deployment section (build, secrets, configmap, checks)
  - document SSH tunnel setup for Mac local dev (MongoDB, RabbitMQ, OTel, Jaeger)
  - add config.yaml parameter reference table for local development
  - add troubleshooting section for cert checks, TLS handshake, and MongoDB connectivity
2026-05-29 14:20:02 +08:00
douxu 2a1929a180 feat: add Kubernetes deployment manifests and hardened Dockerfile
- add multi-stage scratch-based Dockerfile with non-root user and mTLS cert support
  - add K8s Deployment, Service, ConfigMap, and Secret manifests with security hardening
  - bind MONGODB_PASSWORD and SERVICE_SECRET_KEY from environment variables via viper
  - restructure deploy/ directory and remove unused modelrt.cnf
  - bump Go version to 1.26.3 and add event-flow-analysis doc
2026-05-29 11:09:03 +08:00
douxu 715183994c fix: remove status mutation side effect in PublishEventToUI
- stop overwriting record.Status inside the publish function
  - status should be managed by the caller, not the publisher
2026-05-11 17:35:43 +08:00
douxu 2f7cd5a8fc feat: add event lifecycle API and UI fanout publishing - extract event status, MQ exchange/queue, and DB name constants to constants/event.go - add bson tags and IsPersisted flag to EventRecord for proper MongoDB serialization
- publish persisted alarm events to UI consumers via RabbitMQ fanout exchange                                                 - add GET /events/:event_uuid handler to query a single event from MongoDB
  - add PATCH confirm/close handlers with atomic FindOneAndUpdate status transitions                                            - register /events routes in main.go
2026-05-08 15:43:54 +08:00
douxu 44ded93411 feat: add OpenTelemetry distributed tracing to eventRT - add OtelConfig to EventRTConfig and configs/config.yaml - add middleware/trace.go with InitTracerProvider (W3C TraceContext) and StartTrace gin middleware
- migrate logger to read traceID/spanID from OTel span context instead of ctx string keys
  - extract upstream W3C trace context from AMQP headers in processAlarmEventMessage to chain spans across modelRT→eventRT
  - add per-message span in processAlarmEventMessage for end-to-end trace visibility
2026-05-07 16:32:53 +08:00
douxu e0863eb04a add code of rabbitmq message ack action 2026-03-03 17:09:31 +08:00
douxu 609511c7cd optimize code of receipt event 2026-03-02 17:00:56 +08:00
douxu d239f9bd85 add code of receipt event and store into mongodb 2026-02-27 16:36:40 +08:00
douxu f333a830a1 optimize file name of modelrt cert config with rabbitmq 2026-02-12 17:10:32 +08:00
jessequ a3e5892379 updated README.md 2026-02-11 18:12:56 +08:00
jessequ b2a8180f64 updated README.md 2026-02-11 18:11:31 +08:00
douxu 8930f3a20e optimize code and add md file of deploy rabbitMQ server with eventRT 2026-02-11 16:41:08 +08:00
douxu 722e8a9c0f optimize code of rabbitmq connection 2026-02-06 17:55:30 +08:00
douxu fff5dd9218 optimize code of rabbitmq deploy 2026-02-05 17:04:46 +08:00
douxu 23bc2dab9f add rabbitmq deploy files 2026-02-04 17:43:52 +08:00
douxu 5653fb0719 optimze of rabbitmq and mongodb code 2026-01-30 17:43:16 +08:00
douxu c7bc0d033b add drone config 2026-01-29 16:56:21 +08:00
douxu 9380717b75 add code of db or mq init 2026-01-29 16:54:00 +08:00
47 changed files with 3357 additions and 1 deletions

12
.drone.yml Normal file
View File

@ -0,0 +1,12 @@
kind: pipeline
type: docker
name: default
steps:
- name: build
image: golang:latest
environment:
GO111MODULE: on
GOPROXY: https://goproxy.cn,direct
commands:
- go build main.go

8
.gitignore vendored
View File

@ -21,3 +21,11 @@
# Go workspace file # Go workspace file
go.work go.work
# vscode files
.vscode/
# Shield all log files in the log folder
/log/
# Shield config files in the configs folder
/configs/**/*.yaml
/configs/**/*.pem

View File

@ -1,3 +1,5 @@
# eventRT # eventRT
事件处理服务 事件处理服务
[![Build Status](http://192.168.46.100:4080/api/badges/CL-Softwares/eventRT/status.svg?ref=refs/heads/feature-joinDebuggingDemo)](http://192.168.46.100:4080/CL-Softwares/eventRT)

109
config/config.go Normal file
View File

@ -0,0 +1,109 @@
// Package config define config struct of model runtime service
package config
import (
"fmt"
"net/url"
"github.com/spf13/viper"
)
// LokiConfig define config struct of loki direct-push (used in development mode)
type LokiConfig struct {
Endpoint string `mapstructure:"endpoint"` // empty disables direct push
Labels map[string]string `mapstructure:"labels"`
}
// LoggerConfig define config struct of zap logger config
type LoggerConfig struct {
Mode string `mapstructure:"mode"`
Level string `mapstructure:"level"`
FilePath string `mapstructure:"filepath"` // empty disables file rotation in container modes
MaxSize int `mapstructure:"maxsize"`
MaxBackups int `mapstructure:"maxbackups"`
MaxAge int `mapstructure:"maxage"`
Compress bool `mapstructure:"compress"`
Loki LokiConfig `mapstructure:"loki"`
}
// MongoDBConfig define config struct of mongoDB config
type MongoDBConfig struct {
Host string `mapstructure:"host"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
Database string `mapstructure:"database"`
AuthDB string `mapstructure:"auth_db"`
Port int `mapstructure:"port"`
Timeout int `mapstructure:"timeout"`
}
type RabbitMQConfig struct {
CACertPath string `mapstructure:"ca_cert_path"`
ClientKeyPath string `mapstructure:"client_key_path"`
ClientKeyPassword string `mapstructure:"client_key_password"`
ClientCertPath string `mapstructure:"client_cert_path"`
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
ServerName string `mapstructure:"server_name"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
}
// ServiceConfig define config struct of service config
type ServiceConfig struct {
ServiceAddr string `mapstructure:"service_addr"`
ServiceName string `mapstructure:"service_name"`
SecretKey string `mapstructure:"secret_key"`
DeployEnv string `mapstructure:"deploy_env"`
}
// OtelConfig define config struct of OpenTelemetry tracing
type OtelConfig struct {
Endpoint string `mapstructure:"endpoint"` // e.g. "localhost:4318"
Insecure bool `mapstructure:"insecure"`
}
// EventRTConfig define config struct of model runtime server
type EventRTConfig struct {
ServiceConfig `mapstructure:"service"`
LoggerConfig `mapstructure:"logger"`
MongoDBConfig `mapstructure:"mongodb"`
RabbitMQConfig `mapstructure:"rabbitMQ"`
OtelConfig OtelConfig `mapstructure:"otel"`
MongoDBURI string
}
// ReadAndInitConfig return eventRT project config struct
func ReadAndInitConfig(configDir, configName, configType string) (eventRTConfig EventRTConfig) {
config := viper.New()
config.AddConfigPath(configDir)
config.SetConfigName(configName)
config.SetConfigType(configType)
if err := config.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
panic(fmt.Sprintf("can not find conifg file:%s\n", err.Error()))
}
panic(err)
}
config.BindEnv("mongodb.password", "MONGODB_PASSWORD")
config.BindEnv("service.secret_key", "SERVICE_SECRET_KEY")
if err := config.Unmarshal(&eventRTConfig); err != nil {
panic(fmt.Sprintf("unmarshal eventRT config failed:%s\n", err.Error()))
}
if eventRTConfig.MongoDBConfig.Timeout <= 0 {
eventRTConfig.MongoDBConfig.Timeout = 10
}
if eventRTConfig.MongoDBConfig.AuthDB == "" {
eventRTConfig.MongoDBConfig.AuthDB = "admin"
}
// init mongoDB uri
user := url.QueryEscape(eventRTConfig.MongoDBConfig.User)
password := url.QueryEscape(eventRTConfig.MongoDBConfig.Password)
eventRTConfig.MongoDBURI = fmt.Sprintf("mongodb://%s:%s@%s:%d/?authSource=%s", user, password, eventRTConfig.MongoDBConfig.Host, eventRTConfig.MongoDBConfig.Port, eventRTConfig.MongoDBConfig.AuthDB)
return eventRTConfig
}

11
constants/deploy_mode.go Normal file
View File

@ -0,0 +1,11 @@
// Package constants define constant variable
package constants
const (
// DevelopmentDeployMode define development operator environment for eventRT project
DevelopmentDeployMode = "development"
// DebugDeployMode define debug operator environment for eventRT project
DebugDeployMode = "debug"
// ProductionDeployMode define production operator environment for eventRT project
ProductionDeployMode = "production"
)

29
constants/event.go Normal file
View File

@ -0,0 +1,29 @@
// Package constants define constant variable
package constants
const (
// EventStatusHappened define status for event record when event just happened, no data attached yet
EventStatusHappened = iota
// EventStatusDataAttached define status for event record when event data attached, ready to be sent
EventStatusDataAttached
// EventStatusReported define status for event record when event reported to downstream
EventStatusReported
// EventStatusConfirmed define status for event record when event confirmed by operator or CIM
EventStatusConfirmed
// EventStatusClosed define status for event record when event closed due to condition recovery or manual close
EventStatusClosed
)
const (
// EventUIExchangeName define exchange name for pushing events to UI consumers
EventUIExchangeName = "event-ui-exchange"
// EventUIQueueName define queue name for UI consumers to subscribe to events
EventUIQueueName = "event-ui-queue"
)
const (
// EventDBName define MongoDB database name for event storage
EventDBName = "eventdb"
// EventCollectionName define MongoDB collection name for alarm event records
EventCollectionName = "alarms"
)

11
constants/log_mode.go Normal file
View File

@ -0,0 +1,11 @@
// Package constants define constant variable
package constants
const (
// DevelopmentLogMode define development operator environment for eventRT project
DevelopmentLogMode = "development"
// DebugLogMode define debug operator environment for eventRT project
DebugLogMode = "debug"
// ProductionLogMode define production operator environment for eventRT project
ProductionLogMode = "production"
)

21
constants/trace.go Normal file
View File

@ -0,0 +1,21 @@
// Package constants define constant variable
package constants
// Internal context keys for trace values set by StartTrace middleware.
// These are gin/stdlib context keys only — actual W3C header propagation
// (traceparent / tracestate) is handled automatically by the OTel propagator.
const (
HeaderTraceID = "trace-id"
HeaderSpanID = "span-id"
HeaderParentSpanID = "parent-span-id"
)
// traceCtxKey is an unexported type for context keys to avoid collisions with other packages.
type traceCtxKey string
// Typed context keys for trace values — use these with context.WithValue / ctx.Value.
var (
CtxKeyTraceID = traceCtxKey(HeaderTraceID)
CtxKeySpanID = traceCtxKey(HeaderSpanID)
CtxKeyParentSpanID = traceCtxKey(HeaderParentSpanID)
)

56
database/mongo_init.go Normal file
View File

@ -0,0 +1,56 @@
// Package database define database operation functions
package database
import (
"context"
"sync"
"time"
"eventRT/config" //
"eventRT/logger"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
var (
mongoOnce sync.Once
_globalMongoClient *mongo.Client
)
// GetMongoClient returns the global MongoDB client
func GetMongoClient() *mongo.Client {
return _globalMongoClient
}
// InitMongoInstance return instance of MongoDB client
func InitMongoInstance(ctx context.Context, eCfg config.EventRTConfig) *mongo.Client {
mongoOnce.Do(func() {
_globalMongoClient = initMongoClient(ctx, eCfg.MongoDBURI, eCfg.Timeout)
})
return _globalMongoClient
}
// initMongoClient return successfully initialized MongoDB client
func initMongoClient(ctx context.Context, mongoDBURI string, timeout int) *mongo.Client {
clientOptions := options.Client().ApplyURI(mongoDBURI)
client, err := mongo.Connect(ctx, clientOptions)
if err != nil {
logger.Error(ctx, "failed to connect to MongoDB", "error", err)
panic(err)
}
pingTimeout := time.Duration(timeout) * time.Second
if pingTimeout == 0 {
pingTimeout = 10 * time.Second
}
pingCtx, cancel := context.WithTimeout(ctx, pingTimeout)
defer cancel()
if err := client.Ping(pingCtx, nil); err != nil {
logger.Error(ctx, "failed to ping operation with MongoDB", "error", err)
panic(err)
}
return client
}

689
deploy/deploy.md Normal file
View File

@ -0,0 +1,689 @@
# 项目部署指南
本项目依赖 `MongoDB`(事件持久化存储)与 `RabbitMQ`mTLS 消息队列),并通过 `OpenTelemetry + Jaeger` 完成链路追踪。
## 前提条件
1. 已安装 `Docker``kubectl`
2. `Minikube` 集群已启动并可访问
3. 确保以下端口在宿主机上未被占用:`27017`MongoDB、`5671`RabbitMQ AMQP
---
### 1\. 部署 MongoDB 数据库
EventRT 支持两种 MongoDB 部署方式,根据场景二选一:**Docker**(本地开发 / Ubuntu 宿主机直跑)或 **K8s**Minikube 环境)。
#### 1.1 Docker 部署(本地开发)
使用官方 `mongo:7.0` 镜像,在 Ubuntu 宿主机(`192.168.1.101`)上以 Docker 容器运行。
##### 1.1.1 部署命令
```bash
docker run --name mongodb \
-e MONGO_INITDB_ROOT_USERNAME=coslight \
-e MONGO_INITDB_ROOT_PASSWORD=coslight@tj \
-p 27017:27017 \
-d mongo:7.0
```
##### 1.1.2 连接信息
| 参数 | 值 | 说明 |
| :--- | :--- | :--- |
| **容器名称** | `mongodb` | 容器名 |
| **镜像版本** | `mongo:7.0` | MongoDB 7.0 |
| **主机端口** | `27017` | 外部应用连接端口 |
| **用户名** | `coslight` | Root 管理员 |
| **密码** | `coslight@tj` | 启动时通过 `MONGO_INITDB_ROOT_PASSWORD` 设置 |
| **鉴权数据库** | `admin` | `auth_db` |
| **业务数据库** | `eventdb` | EventRT 事件存储库 |
##### 1.1.3 状态检查
```bash
# 检查容器启动状态
docker ps -a | grep mongodb
# 检查启动日志
docker logs mongodb
```
> **注意:** 密码当前以明文形式写在 `docker run` 命令中,生产环境应通过 Docker Secret 或环境变量文件(`--env-file`)传入,避免在 Shell 历史记录中留存明文密码。
##### 1.1.4 连接验证
```bash
# 快速检查 MongoDB 是否接受连接
docker exec -it mongodb mongosh \
-u coslight -p "coslight@tj" --authenticationDatabase admin \
--eval "db.adminCommand({ ping: 1 })"
# 列出所有数据库(确认服务正常)
docker exec -it mongodb mongosh \
-u coslight -p "coslight@tj" --authenticationDatabase admin \
--eval "show dbs"
```
##### 1.1.5 初始化 eventdb 数据库
MongoDB 启动后进入容器,为 `eventdb` 库授权:
```bash
docker exec -it mongodb mongosh \
-u coslight -p "coslight@tj" --authenticationDatabase admin
```
`mongosh` 中执行:
```javascript
// 切换到 eventdb若不存在则自动创建
use eventdb
// 授予 coslight 用户对 eventdb 的读写权限(根用户已有全库权限,此步可选)
db.createUser({
user: "coslight",
pwd: "coslight@tj",
roles: [{ role: "readWrite", db: "eventdb" }]
})
```
#### 1.2 K8s 部署Minikube
YAML 文件位于 `deploy/k8s/`(从 modelrt 仓库迁移而来,需确认文件已拷贝至本项目)。
```bash
kubectl apply -f deploy/k8s/mongodb-secret.yaml
kubectl apply -f deploy/k8s/mongodb-pvc.yaml
kubectl apply -f deploy/k8s/mongodb-statefulset.yaml
kubectl apply -f deploy/k8s/mongodb-service.yaml
```
| 参数 | 值 | 说明 |
| :--- | :--- | :--- |
| **镜像** | `mongo:7.0` | MongoDB 7.0 |
| **NodePort** | `30017` | 集群外访问端口 |
| **用户名** | `admin` | Root 管理员Secret `mongodb-secret` |
| **密码** | `coslight` | Secret `mongodb-secret` 中配置,生产环境请替换强密码 |
| **存储** | `2Gi` | PVC `mongodb-data` |
| **CPU** | `100m` 请求 / `500m` 上限 | StatefulSet `resources` 字段 |
| **内存** | `256Mi` 请求 / `512Mi` 上限 | StatefulSet `resources` 字段 |
> **注意:** 密码存储在 `mongodb-secret.yaml``stringData` 中,生产环境应替换为强密码,并避免将明文密码提交至版本库。
##### 1.2.1 等待 Pod 就绪
```bash
kubectl wait --for=condition=ready pod -l app=mongodb --timeout=120s
```
##### 1.2.2 连接验证
```bash
kubectl exec -it $(kubectl get pod -l app=mongodb -o jsonpath='{.items[0].metadata.name}') \
-- mongosh -u admin -p coslight --authenticationDatabase admin \
--eval "db.adminCommand({ ping: 1 })"
```
##### 1.2.3 初始化 eventdb 数据库
```bash
kubectl exec -it $(kubectl get pod -l app=mongodb -o jsonpath='{.items[0].metadata.name}') \
-- mongosh -u admin -p coslight --authenticationDatabase admin
```
`mongosh` 中执行:
```javascript
use eventdb
db.createUser({
user: "coslight",
pwd: "coslight@tj",
roles: [{ role: "readWrite", db: "eventdb" }]
})
```
##### 1.2.4 状态检查
```bash
kubectl get pods -l app=mongodb
kubectl logs -l app=mongodb --tail=30
```
---
### 2\. 部署 RabbitMQKubernetes
RabbitMQ 配置为仅允许 TLS 连接(`listeners.tcp = none`),所有客户端须持有由同一 CA 签发的证书。YAML 文件位于 `deploy/mq/`
#### 2.1 TLS 证书生成
##### 2.1.1 生成根 CA
```bash
git clone https://github.com/rabbitmq/tls-gen.git
cd tls-gen/basic
# 生成根 CA结果输出至 result/
make CN=rabbitmq-server
# ca_certificate.pem 与 ca_key.pem即 cakey.pem生成于 result/
```
##### 2.1.2 生成服务端证书
服务端证书需包含 SAN使其同时匹配集群内 DNS 和 Minikube IP。使用 `deploy/mq/server.conf`(已预置正确配置):
```bash
# 将 ca_certificate.pem 和 cakey.pem 放在当前目录
openssl genrsa -out server_key.pem 2048
openssl req -new -key server_key.pem \
-out server_cert.csr -config deploy/mq/server.conf
openssl x509 -req -in server_cert.csr \
-CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \
-out server_certificate.pem -days 730 -sha256 \
-extfile deploy/mq/server.conf -extensions v3_server
rm server_cert.csr
```
##### 2.1.3 生成 EventRT 客户端证书
CN 必须与 RabbitMQ 中注册的用户名一致(`eventrt-client`)。使用 `deploy/mq/eventrt.conf`
```bash
openssl genrsa -out eventrt_client_key.pem 2048
openssl req -new -key eventrt_client_key.pem \
-out eventrt_client.csr -config deploy/mq/eventrt.conf
openssl x509 -req -in eventrt_client.csr \
-CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \
-out eventrt_client_cert.pem -days 365 \
-extensions v3_client -extfile deploy/mq/eventrt.conf
rm eventrt_client.csr
```
##### 2.1.4 生成 ModelRT 客户端证书
使用 `deploy/mq/modelrt.conf`CN 为 `modelrt-client`
```bash
openssl genrsa -out modelrt_client_key.pem 2048
openssl req -new -key modelrt_client_key.pem \
-out modelrt_client.csr -config deploy/mq/modelrt.conf
openssl x509 -req -in modelrt_client.csr \
-CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \
-out modelrt_client_cert.pem -days 365 \
-extensions v3_client -extfile deploy/mq/modelrt.conf
rm modelrt_client.csr
```
##### 2.1.5 验证证书
```bash
# 验证服务端证书
openssl verify -CAfile ca_certificate.pem server_certificate.pem
# 验证客户端证书
openssl verify -CAfile ca_certificate.pem eventrt_client_cert.pem
openssl verify -CAfile ca_certificate.pem modelrt_client_cert.pem
# 确认 CN 和扩展clientAuth / serverAuth
openssl x509 -in server_certificate.pem -noout -subject -ext subjectAltName
openssl x509 -in eventrt_client_cert.pem -noout -subject -text | grep -A1 "Extended Key Usage"
openssl x509 -in modelrt_client_cert.pem -noout -subject
```
#### 2.2 部署 RabbitMQ
##### 2.2.1 创建证书 Secret
在证书文件所在目录执行:
```bash
sh deploy/mq/secert.sh
```
该脚本等价于:
```bash
kubectl create secret generic rabbitmq-certs \
--from-file=ca_certificate.pem=./certs/ca_certificate.pem \
--from-file=server_certificate.pem=./certs/server_certificate.pem \
--from-file=server_key.pem=./certs/server_key.pem
```
##### 2.2.2 部署
```bash
kubectl apply -f deploy/mq/rabbitmq-secret.yaml
kubectl apply -f deploy/mq/rabbitmq-users-config.yaml
kubectl apply -f deploy/mq/rabbitmq-config.yaml
kubectl apply -f deploy/mq/rabbitmq-deployment.yaml
kubectl apply -f deploy/mq/rabbitmq-service.yaml
```
##### 2.2.3 端口汇总
| 端口 | NodePort | 说明 |
| :--- | :--- | :--- |
| `5671` | `30671` | AMQP over TLS客户端连接 |
| `5672` | `30672` | AMQP 明文(内部备用,生产禁用) |
| `15671` | `31671` | Management UI over TLS |
| `15672` | `31672` | Management UI 明文(内部备用) |
##### 2.2.4 用户与权限说明
用户定义在 `rabbitmq-users-config.yaml``definitions.json` 中,启动时通过 `load_definitions` 自动加载:
| 用户 | 认证方式 | 权限 | 说明 |
| :--- | :--- | :--- | :--- |
| `coslight` | 密码 | administrator | 管理员,密码在 `rabbitmq-secret.yaml` |
| `eventrt-client` | X.509 证书CN | configure/read/write | EventRT 服务专用 |
| `modelrt-client` | X.509 证书CN | configure/read/write | ModelRT 服务专用 |
| `web-client` | X.509 证书CN | read/write | Web 客户端 |
> **注意:** 证书认证用户的 `password_hash` 留空RabbitMQ 通过 `ssl_cert_login_from = common_name` 将证书 CN 映射为用户名。
---
### 3\. 部署 EventRTKubernetes
所有资源部署在 `default` 命名空间YAML 文件位于 `deploy/k8s/`
#### 3.1 构建并推送镜像
镜像采用三阶段构建,最终基于 `scratch`
| 阶段 | 基础镜像 | 作用 |
| :--- | :--- | :--- |
| **builder** | `golang:1.26-alpine` | 编译 Go 二进制(`CGO_ENABLED=0``-trimpath -ldflags="-s -w"` |
| **certs** | `alpine:3.21` | 提取 CA 证书、时区数据及非 root 用户定义UID 默认 `1000` |
| **runtime** | `scratch` | 仅含可执行文件与运行时依赖,无 shell、无包管理器 |
**方式一:从源码构建并加载**
```bash
# 在项目根目录执行(默认运行用户 UID=1000
docker build -f deploy/dockerfile/eventrt.Dockerfile -t coslight/eventrt:latest .
# 自定义运行用户 UID
docker build -f deploy/dockerfile/eventrt.Dockerfile \
--build-arg USER_ID=2000 \
-t coslight/eventrt:latest .
# 加载到 Minikube无需私有仓库
minikube image load coslight/eventrt:latest
```
**方式二:直接加载已有本地镜像**
Ubuntu 宿主机上已存在构建好的镜像(如 `eventrt:v1`)时,无需重新构建,直接导入 Minikube
```bash
# 确认本地镜像存在
docker images eventrt:v1
# 加载到 Minikube
minikube image load eventrt:v1
# 验证镜像已进入 Minikube 缓存
minikube image ls | grep eventrt
```
> **注意:** `deploy/k8s/eventrt-deployment.yaml` 中的 `image` 字段需与加载的镜像名称一致,并将 `imagePullPolicy` 设为 `Never`,防止 Minikube 尝试从远端拉取。
#### 3.1.1 镜像冒烟测试
```bash
# 查看镜像大小scratch 镜像预期 ≤ 25 MB
docker images coslight/eventrt:latest
# 检查镜像元信息(确认 User、Cmd、架构
docker inspect coslight/eventrt:latest
# 验证二进制可执行(无 config 时程序报错退出属预期行为,说明镜像构建正常)
docker run --rm coslight/eventrt:latest
# 挂载示例配置做完整启动验证Ctrl+C 退出)
docker run --rm \
-v "$(pwd)/configs/config.example.yaml:/app/configs/config.yaml" \
-p 8081:8081 \
coslight/eventrt:latest
```
> **注意:** `scratch` 镜像不含 shell无法使用 `docker exec` 进入容器调试;如需排查问题,可临时将最终阶段改为 `alpine` 进行本地调试,确认后再切回 `scratch`
#### 3.2 创建客户端证书 Secret
在 RabbitMQ TLS 证书生成完成后(见 2.1),进入证书文件所在目录执行:
```bash
sh deploy/k8s/eventrt-certs-secret.sh
```
该脚本等价于:
```bash
kubectl create secret generic eventrt-certs \
--from-file=ca_certificate.pem=./ca_certificate.pem \
--from-file=eventrt_client_cert.pem=./eventrt_client_cert.pem \
--from-file=eventrt_client_key.pem=./eventrt_client_key.pem
```
#### 3.3 部署
```bash
kubectl apply -f deploy/k8s/eventrt-secret.yaml
kubectl apply -f deploy/k8s/eventrt-configmap.yaml
kubectl apply -f deploy/k8s/eventrt-deployment.yaml
kubectl apply -f deploy/k8s/eventrt-service.yaml
```
等待 Pod 就绪:
```bash
kubectl wait --for=condition=ready pod -l app=eventrt --timeout=120s
```
#### 3.4 配置说明
| 配置项 | 方式 | 说明 |
| :--- | :--- | :--- |
| `mongodb.password` | Secret `eventrt-secret` | 不写入 ConfigMap通过环境变量 `MONGODB_PASSWORD` 注入 |
| `service.secret_key` | Secret `eventrt-secret` | 不写入 ConfigMap通过环境变量 `SERVICE_SECRET_KEY` 注入 |
| RabbitMQ 客户端证书 | Secret `eventrt-certs` | 挂载至 `/app/configs/certs/` |
| `config.yaml` 其余配置 | ConfigMap `eventrt-config` | `rabbitmq.host` 已设为 K8s Service 名 `rabbitmq-service` |
| `K8S_NAMESPACE` / `K8S_NODE_NAME` | Downward API | 注入至日志全局字段 |
#### 3.5 状态检查
```bash
# 查看 Pod 状态
kubectl get pods -l app=eventrt
# 查看启动日志
kubectl logs -l app=eventrt --tail=50
# 查看 Service
kubectl get svc eventrt-service
```
#### 3.6 端口汇总
| NodePort | 说明 |
| :--- | :--- |
| `30081` | EventRT HTTP API |
#### 3.7 清理
```bash
kubectl delete -f deploy/k8s/eventrt-service.yaml \
-f deploy/k8s/eventrt-deployment.yaml \
-f deploy/k8s/eventrt-configmap.yaml \
-f deploy/k8s/eventrt-secret.yaml
kubectl delete secret eventrt-certs
```
---
### 4\. Mac 本地访问SSH 隧道)
`EventRT` 在 Mac 本地运行时RabbitMQ 部署在 Ubuntu 宿主机(`192.168.1.101`)的 Minikube 中MongoDB 直接运行在宿主机上。需通过 SSH 本地端口转发建立访问隧道。
#### 4.1 网络拓扑
```text
Mac 本地端口 ──SSH隧道──▶ Ubuntu 宿主机 (192.168.1.101) ──▶ Minikube NodePort (192.168.49.2)
└──▶ MongoDB Docker (宿主机 27017)
```
#### 4.2 建立隧道
```bash
ssh -L 27017:127.0.0.1:27017 \
-L 5671:192.168.49.2:30671 \
-L 15671:192.168.49.2:31671 \
-L 4318:192.168.49.2:31318 \
-L 16686:192.168.49.2:31686 \
douxu@192.168.1.101
```
如需后台静默运行(不占用终端):
```bash
ssh -fN \
-L 27017:127.0.0.1:27017 \
-L 5671:192.168.49.2:30671 \
-L 15671:192.168.49.2:31671 \
-L 4318:192.168.49.2:31318 \
-L 16686:192.168.49.2:31686 \
douxu@192.168.1.101
```
#### 4.3 端口映射说明
| Mac 本地端口 | 目标 | 服务 | 说明 |
| :--- | :--- | :--- | :--- |
| `27017` | 宿主机 `127.0.0.1:27017` | MongoDB | EventRT 事件数据库 |
| `5671` | Minikube `192.168.49.2:30671` | RabbitMQ AMQP | 消息队列 mTLS 连接 |
| `15671` | Minikube `192.168.49.2:31671` | RabbitMQ Management | 管理界面 `http://localhost:15671` |
| `4318` | Minikube `192.168.49.2:31318` | OTLP HTTP | OTel Trace 上报Jaeger Collector |
| `16686` | Minikube `192.168.49.2:31686` | Jaeger UI | 链路追踪查询 `http://localhost:16686` |
> **注意:** 隧道建立后,本地 `config.yaml` 中所有服务地址填 `localhost:<本地端口>` 即可直接运行服务。
#### 4.4 关闭隧道
前台运行时直接 `Ctrl+C`;后台运行时查找并终止进程:
```bash
# 找到 ssh 隧道进程
ps aux | grep "ssh -fN"
# 终止(替换为实际 PID
kill <PID>
```
---
### 5\. 本地运行go run / 二进制)
#### 5.1 配置服务配置文件
`configs/config.example.yaml` 复制为 `configs/config.yaml` 并按以下说明调整:
##### 5.1.1 配置参数说明
| 类别 | 参数名 | 作用描述 | 示例值 |
| :--- | :--- | :--- | :--- |
| **MongoDB** | `host` | MongoDB 服务器地址 | `"localhost"` |
| | `port` | MongoDB 端口 | `27017` |
| | `user` | 连接用户名 | `"coslight"` |
| | `password` | 连接密码 | `"coslight@tj"` |
| | `database` | 业务数据库名 | `"eventdb"` |
| | `auth_db` | 鉴权数据库 | `"admin"` |
| | `timeout` | 连接超时(秒) | `10` |
| **RabbitMQ** | `host` | RabbitMQ 服务器地址 | `"localhost"` |
| | `port` | AMQP over TLS 端口 | `5671` |
| | `server_name` | TLS SNI / 证书 CN | `"rabbitmq-server"` |
| | `ca_cert_path` | CA 证书路径 | `"./configs/certs/ca_certificate.pem"` |
| | `client_cert_path` | 客户端证书路径 | `"./configs/certs/eventrt_client_cert.pem"` |
| | `client_key_path` | 客户端私钥路径 | `"./configs/certs/eventrt_client_key.pem"` |
| | `insecure_skip_verify` | 是否跳过证书校验(开发临时用) | `false` |
| **Logger (Zap)** | `mode` | 日志模式 `development` / `production` | `"development"` |
| | `level` | 最低日志级别 | `"debug"` |
| | `filepath` | 日志文件路径(空字符串输出到 stdout | `""` |
| | `maxsize` | 单文件最大大小MB | `1` |
| | `maxbackups` | 保留旧文件最大个数 | `5` |
| | `maxage` | 保留旧文件最大天数 | `30` |
| | `compress` | 是否压缩备份文件 | `false` |
| **Service** | `service_addr` | HTTP 监听地址 | `":8081"` |
| | `service_name` | 服务名(日志/监控标识) | `"eventRT"` |
| | `secret_key` | 内部签名密钥 | `"eventrt_key"` |
| | `deploy_env` | 部署环境 `development` / `production` | `"development"` |
| **OTel** | `endpoint` | OTLP HTTP 上报地址(不含协议前缀) | `"localhost:4318"` |
| | `insecure` | 是否不启用 TLS | `true` |
#### 5.2 编译 EventRT 服务
```bash
go build -o eventrt main.go
```
#### 5.3 启动服务
```bash
./eventrt
```
#### 5.4 检测服务启动日志
控制台输出 `starting EventRT server` 后即代表服务启动成功。
---
### 6\. 排查手册
#### 6.1 证书权限检查
确认客户端证书包含 `TLS Web Client Authentication`
```bash
openssl x509 -in configs/certs/eventrt_client_cert.pem -noout -text \
| grep -A1 "Extended Key Usage"
```
预期输出包含 `TLS Web Client Authentication`
#### 6.2 握手连通性验证
```bash
openssl s_client -connect localhost:5671 \
-cert configs/certs/eventrt_client_cert.pem \
-key configs/certs/eventrt_client_key.pem \
-CAfile configs/certs/ca_certificate.pem
```
预期结果:看到 `Verify return code: 0 (ok)`
#### 6.3 RabbitMQ 日志检查
连接成功后RabbitMQ 日志应出现:
```
connection <xxx>: user 'eventrt-client' authenticated and granted access to vhost '/'
```
查看 Pod 日志:
```bash
kubectl logs -l app=rabbitmq --tail=50 | grep eventrt-client
```
#### 6.4 MongoDB 连通性验证
```bash
mongosh "mongodb://coslight:coslight@tj@localhost:27017/eventdb?authSource=admin"
```
预期进入 `mongosh` 提示符,执行 `show collections` 无报错。
---
### 7\. 后续操作(停止与清理)
#### 7.1 本地 Docker 部署清理
适用于第 1 节使用 `docker run` 启动的 MongoDB 容器。
```bash
# 停止容器
docker stop mongodb
# 删除容器(容器内数据将同步丢失)
docker rm mongodb
```
#### 7.2 本地 go run 运行清理
适用于第 5 节以 `go run` 或编译后二进制方式在本地启动的 EventRT 服务。
前台运行时直接 `Ctrl+C` 终止;后台运行时查找并终止进程:
```bash
# 终止 go run 启动的进程
pkill -f "go run main.go"
# 或终止编译后的二进制进程
pkill eventrt
```
#### 7.3 K8s(Minikube) 部署清理
适用于第 1.2、2、3 节在 Minikube 中部署的所有资源。
##### 7.3.1 分服务清理
**仅停止(缩容至 0PVC 数据与 Secret 保留)**
将所有 Deployment 和 StatefulSet 缩容至 0 副本Pod 停止运行但持久卷数据不删除,之后可直接缩容回 1 恢复服务。
```bash
# 停止所有 DeploymentEventRT / RabbitMQ
kubectl scale deployment eventrt rabbitmq --replicas=0
# 停止 MongoDB StatefulSetPVC 数据保留)
kubectl scale statefulset mongodb --replicas=0
```
恢复时:
```bash
kubectl scale deployment eventrt rabbitmq --replicas=1
kubectl scale statefulset mongodb --replicas=1
```
---
**永久清理(删除所有资源,数据不可恢复)**
按部署顺序反向删除各服务资源:
```bash
# EventRT 应用
kubectl delete -f deploy/k8s/eventrt-service.yaml \
-f deploy/k8s/eventrt-deployment.yaml \
-f deploy/k8s/eventrt-configmap.yaml \
-f deploy/k8s/eventrt-secret.yaml
kubectl delete secret eventrt-certs
# MongoDB
kubectl delete -f deploy/k8s/mongodb-service.yaml \
-f deploy/k8s/mongodb-statefulset.yaml \
-f deploy/k8s/mongodb-pvc.yaml \
-f deploy/k8s/mongodb-secret.yaml
# RabbitMQ
kubectl delete -f deploy/mq/rabbitmq-service.yaml \
-f deploy/mq/rabbitmq-deployment.yaml \
-f deploy/mq/rabbitmq-users-config.yaml \
-f deploy/mq/rabbitmq-config.yaml \
-f deploy/mq/rabbitmq-secret.yaml
kubectl delete secret rabbitmq-certs
```
##### 7.3.2 一键清理
> **注意:** 此操作会删除 `deploy/k8s/``deploy/mq/` 下所有 YAML 对应的 K8s 资源,请确认后执行。
```bash
kubectl delete -f deploy/k8s/ -f deploy/mq/
kubectl delete secret eventrt-certs rabbitmq-certs
```

View File

@ -0,0 +1,34 @@
FROM golang:1.26-alpine AS builder
RUN apk --no-cache upgrade
WORKDIR /app
COPY go.mod go.sum ./
RUN GOPROXY="https://goproxy.cn,direct" go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build \
-ldflags="-s -w" \
-trimpath \
-mod=readonly \
-o eventrt main.go
# prepare runtime dependencies in a pinned alpine stage so they can be
# copied into scratch without pulling any vulnerable os packages at run time.
FROM alpine:3.21 AS certs
ARG USER_ID=1000
RUN apk --no-cache add ca-certificates tzdata && \
adduser -D -u ${USER_ID} eventrt
FROM scratch
# CA certificates required for TLS connections (RabbitMQ amqps://)
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# timezone data
COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo
# non-root user/group definitions
COPY --from=certs /etc/passwd /etc/passwd
COPY --from=certs /etc/group /etc/group
WORKDIR /app
COPY --from=builder /app/eventrt ./eventrt
USER eventrt
CMD ["/app/eventrt", "-eventRT_config_dir=/app/configs"]

View File

@ -0,0 +1,14 @@
#!/bin/sh
# Create the eventrt client certificate secret.
# Run this script from the directory that contains the three cert files,
# or adjust the paths below to point at the actual files.
#
# Expected files (generated during RabbitMQ TLS setup):
# ca_certificate.pem
# eventrt_client_cert.pem
# eventrt_client_key.pem
kubectl create secret generic eventrt-certs \
--from-file=ca_certificate.pem=./ca_certificate.pem \
--from-file=eventrt_client_cert.pem=./eventrt_client_cert.pem \
--from-file=eventrt_client_key.pem=./eventrt_client_key.pem

View File

@ -0,0 +1,47 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: eventrt-config
data:
config.yaml: |
rabbitmq:
ca_cert_path: "/app/configs/certs/ca_certificate.pem"
client_key_path: "/app/configs/certs/eventrt_client_key.pem"
client_key_password: ""
client_cert_path: "/app/configs/certs/eventrt_client_cert.pem"
insecure_skip_verify: false
server_name: "rabbitmq-server"
user: ""
password: ""
host: "rabbitmq-service"
port: 5671
mongodb:
host: "mongodb-service"
port: 27017
user: "admin"
password: "" # injected via env MONGODB_PASSWORD
database: "eventdb"
auth_db: "admin"
timeout: 10
logger:
mode: "production"
level: "info"
filepath: ""
maxsize: 100
maxbackups: 5
maxage: 30
compress: false
loki:
endpoint: "" # Promtail handles log collection in K8s, direct push disabled
service:
service_addr: ":8081"
service_name: "eventRT"
secret_key: "" # injected via env SERVICE_SECRET_KEY
deploy_env: "development"
otel:
endpoint: "jaeger-service:4318"
insecure: true

View File

@ -0,0 +1,91 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: eventrt
labels:
app: eventrt
spec:
replicas: 1
selector:
matchLabels:
app: eventrt
template:
metadata:
labels:
app: eventrt
spec:
containers:
- name: eventrt
image: eventrt:v1
imagePullPolicy: IfNotPresent
command: ["/app/eventrt"]
args:
- "-eventRT_config_dir=/app/configs"
- "-eventRT_config_name=config"
- "-eventRT_config_type=yaml"
ports:
- containerPort: 8081
env:
# Downward API — injected into every log line by logger
- name: K8S_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
# HOSTNAME is set automatically by K8s to the pod name
# Sensitive values injected from Secret so they stay out of ConfigMap
- name: MONGODB_PASSWORD
valueFrom:
secretKeyRef:
name: eventrt-secret
key: mongodb-password
- name: SERVICE_SECRET_KEY
valueFrom:
secretKeyRef:
name: eventrt-secret
key: secret-key
volumeMounts:
- name: config
mountPath: /app/configs/config.yaml
subPath: config.yaml
readOnly: true
- name: certs
mountPath: /app/configs/certs
readOnly: true
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
securityContext:
runAsUser: 1000
runAsNonRoot: true
readOnlyRootFilesystem: true
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
livenessProbe:
tcpSocket:
port: 8081
initialDelaySeconds: 10
periodSeconds: 30
failureThreshold: 3
readinessProbe:
tcpSocket:
port: 8081
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 3
volumes:
- name: config
configMap:
name: eventrt-config
- name: certs
secret:
secretName: eventrt-certs

View File

@ -0,0 +1,8 @@
apiVersion: v1
kind: Secret
metadata:
name: eventrt-secret
type: Opaque
stringData:
mongodb-password: "coslight"
secret-key: "eventrt_key"

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: eventrt-service
labels:
app: eventrt
spec:
type: NodePort
selector:
app: eventrt
ports:
- name: http
port: 8081
targetPort: 8081
nodePort: 30081

View File

@ -0,0 +1,10 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: mongodb-data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi

View File

@ -0,0 +1,8 @@
apiVersion: v1
kind: Secret
metadata:
name: mongodb-secret
type: Opaque
stringData:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: coslight

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: mongodb-service
labels:
app: mongodb
spec:
type: NodePort
selector:
app: mongodb
ports:
- name: mongodb
port: 27017
targetPort: 27017
nodePort: 30017

View File

@ -0,0 +1,61 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: mongodb
labels:
app: mongodb
spec:
serviceName: mongodb
replicas: 1
selector:
matchLabels:
app: mongodb
template:
metadata:
labels:
app: mongodb
spec:
containers:
- name: mongodb
image: mongo:7.0
imagePullPolicy: IfNotPresent
ports:
- name: mongodb
containerPort: 27017
envFrom:
- secretRef:
name: mongodb-secret
volumeMounts:
- name: mongodb-data
mountPath: /data/db
readinessProbe:
exec:
command:
- mongosh
- --eval
- "db.adminCommand('ping')"
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 10
failureThreshold: 12
livenessProbe:
exec:
command:
- mongosh
- --eval
- "db.adminCommand('ping')"
initialDelaySeconds: 120
periodSeconds: 10
timeoutSeconds: 30
failureThreshold: 5
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
volumes:
- name: mongodb-data
persistentVolumeClaim:
claimName: mongodb-data

14
deploy/mq/client.conf Normal file
View File

@ -0,0 +1,14 @@
[req]
distinguished_name = req_distinguished_name
prompt = no
[req_distinguished_name]
C = CN
ST = Beijing
L = Beijing
O = coslight
CN = web-client
[v3_client]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth

14
deploy/mq/eventrt.conf Normal file
View File

@ -0,0 +1,14 @@
[req]
distinguished_name = req_distinguished_name
prompt = no
[req_distinguished_name]
C = CN
ST = Beijing
L = Beijing
O = coslight
CN = eventrt-client
[v3_client]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth

14
deploy/mq/modelrt.conf Normal file
View File

@ -0,0 +1,14 @@
[req]
distinguished_name = req_distinguished_name
prompt = no
[req_distinguished_name]
C = CN
ST = Beijing
L = Beijing
O = coslight
CN = modelrt-client
[v3_client]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth

1
deploy/mq/plugins.sh Normal file
View File

@ -0,0 +1 @@
kubectl create configmap rabbit-plugins-conf --from-literal=enabled_plugins="[rabbitmq_auth_mechanism_ssl, rabbitmq_management, rabbitmq_management_agent, rabbitmq_prometheus, rabbitmq_web_dispatch]."

View File

@ -0,0 +1,33 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbitmq-config
data:
rabbitmq.conf: |
# 确保允许PLAIN认证
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN
auth_mechanisms.3 = EXTERNAL
# 允许admin用户通过远程方式连接
loopback_users.admin = false
# 默认心跳和监听配置可在此扩展
# 确定 ssl 连接时验证使用的用户名
ssl_cert_login_from = common_name
# 开启此项配置会导致只能通过TLS端口访问
listeners.tcp = none
listeners.ssl.default = 5671
# default user config
load_definitions = /etc/rabbitmq/definitions.json
# ssl config
ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem
ssl_options.certfile = /etc/rabbitmq/certs/server_certificate.pem
ssl_options.keyfile = /etc/rabbitmq/certs/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# management config
management.ssl.port = 15671
management.ssl.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem
management.ssl.certfile = /etc/rabbitmq/certs/server_certificate.pem
management.ssl.keyfile = /etc/rabbitmq/certs/server_key.pem
management.ssl.verify = verify_peer
management.ssl.fail_if_no_peer_cert = true

View File

@ -0,0 +1,76 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: eventrt-rabbitmq
spec:
replicas: 1
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: rabbitmq:4.1.1-management-alpine
imagePullPolicy: IfNotPresent
ports:
- containerPort: 4369
- containerPort: 5671
- containerPort: 5672 # AMQP
- containerPort: 15671
- containerPort: 15672 # Management UI
- containerPort: 15691
- containerPort: 15692
- containerPort: 25672
env:
- name: RABBITMQ_DEFAULT_USER
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: rabbitmq-user
- name: RABBITMQ_DEFAULT_PASS
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: rabbitmq-pass
- name: RABBITMQ_ERLANG_COOKIE
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: erlang-cookie
- name: RABBITMQ_DEFAULT_VHOST
value: "/"
volumeMounts:
- name: rabbitmq-certs-volume
mountPath: /etc/rabbitmq/certs
readOnly: true
- name: rabbitmq-config-volume
mountPath: /etc/rabbitmq/rabbitmq.conf
subPath: rabbitmq.conf
readOnly: true
- name: plugins-config-volume
mountPath: /etc/rabbitmq/enabled_plugins
subPath: enabled_plugins
- name: users-config-volume
mountPath: /etc/rabbitmq/definitions.json
subPath: definitions.json
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
volumes:
- name: rabbitmq-certs-volume
secret:
secretName: rabbitmq-certs
- name: rabbitmq-config-volume
configMap:
name: rabbitmq-config
- name: plugins-config-volume
configMap:
name: rabbit-plugins-conf
- name: users-config-volume
configMap:
name: rabbitmq-users-definitions
- name: rabbitmq-data
emptyDir: {}

View File

@ -0,0 +1,9 @@
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-secret
type: Opaque
stringData:
rabbitmq-user: "coslight"
rabbitmq-pass: "coslight@tj"
erlang-cookie: "secret-erlang-cookie"

View File

@ -0,0 +1,29 @@
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-service
spec:
type: NodePort # 在 Minikube 中使用 NodePort 方便外部访问
selector:
app: rabbitmq
ports:
- name: amqp-ssl
protocol: TCP
port: 5671
targetPort: 5671
nodePort: 30671
- name: amqp
protocol: TCP
port: 5672
targetPort: 5672
nodePort: 30672
- name: management-ssl
protocol: TCP
port: 15671
targetPort: 15671
nodePort: 31671
- name: management
protocol: TCP
port: 15672
targetPort: 15672
nodePort: 31672

View File

@ -0,0 +1,77 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbitmq-users-definitions
data:
definitions.json: |
{
"users": [
{
"name": "coslight",
"password_hash": "Gl2XVEJwPwDZQF8ZhsYnvm83wMkdftY3/raxyntdZueyx/Uv",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": ["administrator"]
},
{
"name": "web-client",
"password_hash": "",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": ["management"]
},
{
"name": "modelrt-client",
"password_hash": "",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": ["management"]
},
{
"name": "eventrt-client",
"password_hash": "",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": ["management"]
}
],
"vhosts": [ { "name": "/" } ],
"permissions": [
{
"user": "coslight",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "web-client",
"vhost": "/",
"configure": "^$",
"write": ".*",
"read": ".*"
},
{
"user": "modelrt-client",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "eventrt-client",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"topic_permissions": [],
"parameters": [],
"global_parameters": [
{
"name": "cluster_name",
"value": "evnetrt-rabbitmq-cluster"
}
],
"policies": [],
"queues": [],
"exchanges": [],
"bindings": []
}

4
deploy/mq/secert.sh Normal file
View File

@ -0,0 +1,4 @@
kubectl create secret generic rabbitmq-certs \
--from-file=ca_certificate.pem=./certs/ca_certificate.pem \
--from-file=server_certificate.pem=./certs/server_certificate.pem \
--from-file=server_key.pem=./certs/server_key.pem

22
deploy/mq/server.conf Normal file
View File

@ -0,0 +1,22 @@
[req]
distinguished_name = req_distinguished_name
prompt = no
[req_distinguished_name]
C = CN
ST = Beijing
L = Beijing
O = coslight
CN = rabbitmq-server
[v3_server]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth, clientAuth
subjectAltName = @alt_names
[alt_names]
DNS.1 = rabbitmq-server
DNS.2 = rabbitmq-service.default.svc.cluster.local
DNS.3 = localhost
IP.1 = 192.168.49.2
IP.2 = 127.0.0.1

141
docs/event-flow-analysis.md Normal file
View File

@ -0,0 +1,141 @@
# eventRT 事件流转与存储分析
## 一、整体架构
```bash
modelRT 服务
│ 发布事件 (AMQPS + mTLS)
RabbitMQ (exchange: event-exchange)
│ routing key: event.#.updown.*
Queue: event-up-down-queue
eventRT (消费者)
│ 解析 + 写入
MongoDB (eventdb.alarms)
```
---
## 二、事件数据结构 (`event/event.go`)
| 字段 | 类型 | 说明 |
| :--- | :--- | :--- |
| `event` | string | 事件名称 |
| `event_uuid` | string | 唯一标识符 |
| `type` | int | 事件类型 |
| `priority` | int | 优先级 0-9 |
| `status` | int | 事件状态(见第四节) |
| `category` | string | 可选模板参数 |
| `timestamp` | int64 | 毫秒级 Unix 时间戳 |
| `from` | string | 来源station / platform / msa |
| `condition` | map | 触发条件(如阈值、当前值) |
| `attached_subscriptions` | []any | 关联订阅信息 |
| `result` | map | 事件分析结果 |
| `operations` | []OperationRecord | 操作历史(见第四节) |
| `origin` | map | 子站原始告警数据 (CIM Alarm) |
`OperationRecord` 结构:
```go
Action string // 动作类型,如 "acknowledgment"
Op string // 操作人标识
TS int64 // 操作时间(毫秒)
```
---
## 三、事件流转流程 (`event/up_down_limit_alarm.go`)
```bash
main.go 启动
└─ go event.ReceiptUpDownLimitAlarm(ctx) // goroutine 异步运行
ReceiptUpDownLimitAlarm():
1. GetConn() // 获取全局 RabbitMQ 连接
2. conn.Channel() // 创建 Channel
3. channel.QueueBind( // 绑定路由
queue = "event-up-down-queue",
exchange = "event-exchange",
key = "event.#.updown.*"
)
4. channel.Qos(1, 0, false) // 每次只预取 1 条,防止积压
5. channel.Consume(autoAck=false) // 手动 ACK 模式
6. for { select msg / ctx.Done() } // 事件循环
processAlarmEventMessage(msg):
1. json.Unmarshal → EventRecord // 结构体反序列化
2. InsertOne(ctx, alarmEvent) // ⚠️ 第一次写入(结构体方式)
3. bson.UnmarshalExtJSON → bson.D // 原始 BSON 反序列化
4. InsertOne(ctx, doc) // 第二次写入(原始文档方式)
5. msg.Ack(false) // 手动确认消息
```
---
## 四、事件状态流转
**现状:`Status` 字段有定义,但无状态常量,无状态机逻辑。**
`EventRecord.Status int` + `Operations []OperationRecord` 的设计意图推断,其预期的状态流转应为:
```bash
[未定义初始态]
│ 由 modelRT 发布事件写入
status = ? (活跃/未确认)
│ Operations 中追加 {action: "acknowledgment", op: 用户, ts: 时间}
status = ? (已确认)
│ 后续处理result 填充等)
status = ? (已关闭/已解决)
```
目前**状态码值没有任何枚举常量定义**`constants/` 包下只有 trace、log_mode、deploy_mode状态机逻辑尚未实现代码有多处 `TODO`)。
---
## 五、存储方式
- **数据库**MongoDB
- **库/集合**`eventdb` / `alarms`
- **写入时机**:消息到达后立即插入,无缓冲
**当前代码存在一个明显问题**`up_down_limit_alarm.go:99-115`
```go
// 第一次:以结构体插入(会被 Go 零值污染,如 omitempty 未设置的字段)
mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent)
// 第二次:以原始 BSON 插入(保留原始字段,是更合理的方式)
mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc)
```
每条消息会**插入两份文档**到 `eventdb.alarms`,且 `msg.Ack` 只在第二次成功后才调用。这是个明显的 TODO 遗留问题,第一次插入应当被删除。
---
## 六、连接可靠性设计
RabbitMQ 连接有自动重连机制(`mq/mq_init.go:82`):断连后每 5 秒重试一次,直到 context 取消。MongoDB 目前无重连逻辑,直接依赖驱动内置能力。
---
## 七、总结
| 维度 | 现状 |
| :--- | :--- |
| 事件流转 | modelRT → RabbitMQ topic exchange → eventRT 消费 → MongoDB |
| 消息可靠性 | 手动 ACK + QoS=1 + 断连重连 |
| 存储方式 | MongoDB `eventdb.alarms`目前每条消息重复写入两次bug |
| 状态流转 | `Status` + `Operations` 字段已建模,但状态常量和状态机**尚未实现** |
| 主要 TODO | 删除重复插入、定义状态枚举、实现状态流转逻辑、补充推送前端逻辑 |

41
event/event.go Normal file
View File

@ -0,0 +1,41 @@
// Package event define real time data evnet operation functions
package event
// EventRecord define struct for CIM event record
type EventRecord struct {
// 事件名称
EventName string `json:"event" bson:"event"`
// 事件唯一标识符
EventUUID string `json:"event_uuid" bson:"event_uuid"`
// 事件类型
Type int `json:"type" bson:"type"`
// 事件优先级 (0-9)
Priority int `json:"priority" bson:"priority"`
// 事件状态
Status int `json:"status" bson:"status"`
// 是否已持久化到数据库,由 eventRT 消费并落库后置为 true
IsPersisted bool `json:"is_persisted" bson:"is_persisted"`
// 可选模板参数
Category string `json:"category,omitempty" bson:"category,omitempty"`
// 毫秒级时间戳 (Unix epoch)
Timestamp int64 `json:"timestamp" bson:"timestamp"`
// 事件来源 (station, platform, msa)
From string `json:"from" bson:"from"`
// 事件场景描述对象 (如阈值、当前值)
Condition map[string]any `json:"condition" bson:"condition"`
// 与事件相关的订阅信息
AttachedSubscriptions []any `json:"attached_subscriptions" bson:"attached_subscriptions"`
// 事件分析结果对象
Result map[string]any `json:"result,omitempty" bson:"result,omitempty"`
// 操作历史记录 (CIM ActivityRecord)
Operations []OperationRecord `json:"operations" bson:"operations"`
// 子站告警原始数据 (CIM Alarm 数据)
Origin map[string]any `json:"origin,omitempty" bson:"origin,omitempty"`
}
// OperationRecord 描述对事件的操作记录,如确认(acknowledgment)等
type OperationRecord struct {
Action string `json:"action" bson:"action"` // 执行的动作,如 "acknowledgment"
Op string `json:"op" bson:"op"` // 操作人/操作账号标识
TS int64 `json:"ts" bson:"ts"` // 操作发生的毫秒时间戳
}

81
event/publish_ui_event.go Normal file
View File

@ -0,0 +1,81 @@
// Package event define real time data evnet operation functions
package event
import (
"context"
"encoding/json"
"time"
"eventRT/constants"
"eventRT/logger"
"eventRT/mq"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
)
// initUIEventChannel declares the fanout exchange and queue used by UI consumers,
// then returns a channel ready for publishing.
func initUIEventChannel(ctx context.Context) (*amqp.Channel, error) {
ch, err := mq.GetConn().Channel()
if err != nil {
logger.Error(ctx, "open rabbitMQ channel for UI event publish failed", "error", err)
return nil, err
}
err = ch.ExchangeDeclare(constants.EventUIExchangeName, "fanout", true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare UI event exchange failed", "error", err)
return nil, err
}
_, err = ch.QueueDeclare(constants.EventUIQueueName, true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare UI event queue failed", "error", err)
return nil, err
}
// fanout exchange routes to all bound queues regardless of routing key
err = ch.QueueBind(constants.EventUIQueueName, "", constants.EventUIExchangeName, false, nil)
if err != nil {
logger.Error(ctx, "bind UI event queue to exchange failed", "error", err)
return nil, err
}
return ch, nil
}
// PublishEventToUI sets the event status to Reported and publishes the record
// to the UI-facing fanout exchange. Intended to be called after the event has
// been successfully persisted to the database.
func PublishEventToUI(ctx context.Context, ch *amqp.Channel, record *EventRecord) error {
body, err := json.Marshal(record)
if err != nil {
logger.Error(ctx, "marshal event record for UI publish failed", "event_uuid", record.EventUUID, "error", err)
return err
}
headers := amqp.Table{}
otel.GetTextMapPropagator().Inject(ctx, amqpHeaderCarrier(headers))
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err = ch.PublishWithContext(pubCtx,
constants.EventUIExchangeName,
"", // fanout exchange ignores routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
Headers: headers,
})
if err != nil {
logger.Error(ctx, "publish event to UI exchange failed", "event_uuid", record.EventUUID, "error", err)
return err
}
logger.Info(ctx, "event published to UI exchange", "event_uuid", record.EventUUID, "status", record.Status)
return nil
}

View File

@ -0,0 +1,153 @@
// Package event define real time data evnet operation functions
package event
import (
"context"
"encoding/json"
"eventRT/constants"
"eventRT/database"
"eventRT/logger"
"eventRT/mq"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
// amqpHeaderCarrier adapts amqp.Table to propagation.TextMapCarrier for trace context extraction.
type amqpHeaderCarrier amqp.Table
func (c amqpHeaderCarrier) Get(key string) string {
val, ok := amqp.Table(c)[key]
if !ok {
return ""
}
str, _ := val.(string)
return str
}
func (c amqpHeaderCarrier) Set(key, value string) {
amqp.Table(c)[key] = value
}
func (c amqpHeaderCarrier) Keys() []string {
keys := make([]string, 0, len(c))
for k := range c {
keys = append(keys, k)
}
return keys
}
var _ propagation.TextMapCarrier = amqpHeaderCarrier{}
const (
queueName = "event-up-down-queue"
exchangeName = "event-exchange"
)
// ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm
func ReceiptUpDownLimitAlarm(ctx context.Context) {
conn := mq.GetConn()
if conn == nil {
logger.Error(ctx, "get rabbitMQ connection for receiving alarms failed")
return
}
channel, err := conn.Channel()
if err != nil {
logger.Error(ctx, "open rabbitMQ channel for consumer failed", "error", err)
return
}
defer channel.Close()
uiChannel, err := initUIEventChannel(ctx)
if err != nil {
logger.Error(ctx, "init UI event channel failed", "error", err)
return
}
defer uiChannel.Close()
err = channel.QueueBind(
queueName, // 队列名
"event.#.updown.*",
exchangeName, // 交换机名
false,
nil,
)
if err != nil {
logger.Error(ctx, "channel bind queue and exchange failed", "error", err)
return
}
err = channel.Qos(1, 0, false)
if err != nil {
logger.Error(ctx, "set rabbitMQ Qos config failed", "error", err)
}
// registered consumer
msgs, err := channel.Consume(
queueName, // queue
"", // consumer tag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logger.Error(ctx, "failed to register a consumer", "error", err)
return
}
logger.Info(ctx, "started receiving up-down limit alarms from rabbitMQ")
for {
select {
case <-ctx.Done():
logger.Info(ctx, "receipt up-down limit alarm stopped by context cancel")
return
case msg, ok := <-msgs:
if !ok {
logger.Error(ctx, "message channel closed, exiting consumer loop")
return
}
processAlarmEventMessage(ctx, msg, uiChannel)
}
}
}
func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery, uiCh *amqp.Channel) {
// extract upstream trace context injected by modelRT into AMQP headers
ctx = otel.GetTextMapPropagator().Extract(ctx, amqpHeaderCarrier(msg.Headers))
ctx, span := otel.Tracer("eventRT/event").Start(ctx, "processAlarmEventMessage")
defer span.End()
logger.Info(ctx, "received event alarm from modelRT up and down limit compute process")
mongodbClient := database.GetMongoClient()
var alarmEvent EventRecord
if err := json.Unmarshal(msg.Body, &alarmEvent); err != nil {
logger.Error(ctx, "unmarshal alarm event message failed", "error", err)
return
}
alarmEvent.IsPersisted = true
alarmEvent.Status = constants.EventStatusReported
result, err := mongodbClient.Database(constants.EventDBName).Collection(constants.EventCollectionName).InsertOne(ctx, alarmEvent)
if err != nil {
logger.Error(ctx, "failed to insert alarm event into database", "error", err)
return
}
logger.Info(ctx, "alarm event inserted into database", "result", result)
if err := msg.Ack(false); err != nil {
logger.Error(ctx, "ack alarm event message failed", "error", err)
return
}
if err := PublishEventToUI(ctx, uiCh, &alarmEvent); err != nil {
logger.Error(ctx, "publish alarm event to UI failed", "event_uuid", alarmEvent.EventUUID, "error", err)
}
}

83
go.mod Normal file
View File

@ -0,0 +1,83 @@
module eventRT
go 1.26.3
require (
github.com/gin-gonic/gin v1.11.0
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/rabbitmq/amqp091-go v1.10.0
github.com/spf13/viper v1.21.0
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
go.mongodb.org/mongo-driver v1.17.9
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.uber.org/zap v1.27.1
)
require (
github.com/BurntSushi/toml v1.6.0 // indirect
github.com/bytedance/sonic v1.14.0 // indirect
github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
github.com/gin-contrib/sse v1.1.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.27.0 // indirect
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/goccy/go-yaml v1.18.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.54.0 // indirect
github.com/sagikazarmark/locafero v0.11.0 // indirect
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect
github.com/spf13/afero v1.15.0 // indirect
github.com/spf13/cast v1.10.0 // indirect
github.com/spf13/pflag v1.0.10 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.3.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect
go.opentelemetry.io/otel/metric v1.43.0 // indirect
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
go.uber.org/mock v0.5.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/arch v0.20.0 // indirect
golang.org/x/crypto v0.49.0 // indirect
golang.org/x/mod v0.33.0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0 // indirect
golang.org/x/tools v0.42.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
google.golang.org/grpc v1.80.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

215
go.sum Normal file
View File

@ -0,0 +1,215 @@
github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk=
github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M=
github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w=
github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM=
github.com/gin-gonic/gin v1.11.0 h1:OW/6PLjyusp2PPXtyxKHU0RbX6I/l28FTdDlae5ueWk=
github.com/gin-gonic/gin v1.11.0/go.mod h1:+iq/FyxlGzII0KHiBGjuNn4UNENUlKbGlNmc+W50Dls=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4=
github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs=
github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg=
github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc=
github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik=
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 h1:+jumHNA0Wrelhe64i8F6HNlS8pkoyMv5sreGx2Ry5Rw=
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8/go.mod h1:3n1Cwaq1E1/1lhQhtRK2ts/ZwZEhjcQeJQ1RuC6Q/8U=
github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I=
github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU=
github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.3.0 h1:Qd2W2sQawAfG8XSvzwhBeoGq71zXOC/Q1E9y/wUcsUA=
github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.17.9 h1:IexDdCuuNJ3BHrELgBlyaH9p60JXAvdzWR128q+U5tU=
go.mongodb.org/mongo-driver v1.17.9/go.mod h1:LlOhpH5NUEfhxcAwG0UEkMqwYcc4JU18gtCdGudk/tQ=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=
go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 h1:88Y4s2C8oTui1LGM6bTWkw0ICGcOLCAI5l6zsD1j20k=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0/go.mod h1:Vl1/iaggsuRlrHf/hfPJPvVag77kKyvrLeD10kpMl+A=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 h1:3iZJKlCZufyRzPzlQhUIWVmfltrXuGyfjREgGP3UUjc=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0/go.mod h1:/G+nUPfhq2e+qiXMGxMwumDrP5jtzU+mWN7/sjT2rak=
go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM=
go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY=
go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg=
go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg=
go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw=
go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A=
go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A=
go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0=
go.opentelemetry.io/proto/otlp v1.10.0 h1:IQRWgT5srOCYfiWnpqUYz9CVmbO8bFmKcwYxpuCSL2g=
go.opentelemetry.io/proto/otlp v1.10.0/go.mod h1:/CV4QoCR/S9yaPj8utp3lvQPoqMtxXdzn7ozvvozVqk=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA=
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

40
handler/event_query.go Normal file
View File

@ -0,0 +1,40 @@
// Package handler define HTTP handler functions for eventRT service
package handler
import (
"errors"
"net/http"
"eventRT/constants"
"eventRT/database"
"eventRT/event"
"eventRT/logger"
"github.com/gin-gonic/gin"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
// GetEventHandler handles GET /events/:event_uuid
func GetEventHandler(c *gin.Context) {
ctx := c.Request.Context()
eventUUID := c.Param("event_uuid")
var record event.EventRecord
err := database.GetMongoClient().
Database(constants.EventDBName).
Collection(constants.EventCollectionName).
FindOne(ctx, bson.M{"event_uuid": eventUUID}).
Decode(&record)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
c.JSON(http.StatusNotFound, gin.H{"error": "event not found"})
return
}
logger.Error(ctx, "query event by uuid failed", "event_uuid", eventUUID, "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"})
return
}
c.JSON(http.StatusOK, record)
}

133
handler/event_status.go Normal file
View File

@ -0,0 +1,133 @@
// Package handler define HTTP handler functions for eventRT service
package handler
import (
"context"
"errors"
"net/http"
"time"
"eventRT/constants"
"eventRT/database"
"eventRT/event"
"eventRT/logger"
"github.com/gin-gonic/gin"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
var (
errEventNotFound = errors.New("event not found")
errInvalidStatusTransition = errors.New("invalid status transition: precondition status mismatch")
)
type updateStatusRequest struct {
Op string `json:"op" binding:"required"`
}
// ConfirmEventHandler handles PATCH /events/:event_uuid/confirm
// Transitions event status from EventStatusReported to EventStatusConfirmed.
func ConfirmEventHandler(c *gin.Context) {
handleStatusTransition(c,
constants.EventStatusReported,
constants.EventStatusConfirmed,
"confirm",
)
}
// CloseEventHandler handles PATCH /events/:event_uuid/close
// Transitions event status from EventStatusConfirmed to EventStatusClosed.
func CloseEventHandler(c *gin.Context) {
handleStatusTransition(c,
constants.EventStatusConfirmed,
constants.EventStatusClosed,
"close",
)
}
func handleStatusTransition(c *gin.Context, requiredStatus, newStatus int, action string) {
ctx := c.Request.Context()
eventUUID := c.Param("event_uuid")
var req updateStatusRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "field op is required"})
return
}
updated, err := transitionEventStatus(ctx, eventUUID, req.Op, requiredStatus, newStatus, action)
if err != nil {
switch {
case errors.Is(err, errEventNotFound):
c.JSON(http.StatusNotFound, gin.H{"error": "event not found"})
case errors.Is(err, errInvalidStatusTransition):
c.JSON(http.StatusConflict, gin.H{
"error": "invalid status transition",
"required_status": requiredStatus,
})
default:
logger.Error(ctx, "update event status failed",
"event_uuid", eventUUID,
"action", action,
"error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"})
}
return
}
c.JSON(http.StatusOK, updated)
}
// transitionEventStatus atomically validates the precondition status and applies the transition.
// Uses FindOneAndUpdate with a status filter so the check and update are a single MongoDB operation.
func transitionEventStatus(ctx context.Context, eventUUID, op string, requiredStatus, newStatus int, action string) (*event.EventRecord, error) {
col := database.GetMongoClient().
Database(constants.EventDBName).
Collection(constants.EventCollectionName)
opRecord := event.OperationRecord{
Action: action,
Op: op,
TS: time.Now().UnixNano() / int64(time.Millisecond),
}
filter := bson.M{
"event_uuid": eventUUID,
"status": requiredStatus,
}
update := bson.M{
"$set": bson.M{"status": newStatus},
"$push": bson.M{"operations": opRecord},
}
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
var updated event.EventRecord
err := col.FindOneAndUpdate(ctx, filter, update, opts).Decode(&updated)
if err != nil {
if !errors.Is(err, mongo.ErrNoDocuments) {
return nil, err
}
// FindOneAndUpdate returned no documents: either the event doesn't exist
// or the current status doesn't match the required precondition.
// Do a follow-up read to distinguish the two cases.
var existing event.EventRecord
findErr := col.FindOne(ctx, bson.M{"event_uuid": eventUUID}).Decode(&existing)
if errors.Is(findErr, mongo.ErrNoDocuments) {
return nil, errEventNotFound
}
if findErr != nil {
return nil, findErr
}
return nil, errInvalidStatusTransition
}
logger.Info(ctx, "event status transitioned",
"event_uuid", eventUUID,
"from", requiredStatus,
"to", newStatus,
"op", op)
return &updated, nil
}

73
logger/facede.go Normal file
View File

@ -0,0 +1,73 @@
// Package logger define log struct of eventRT project
package logger
import (
"context"
"sync"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
f *facade
fOnce sync.Once
)
type facade struct {
_logger *zap.Logger
}
// Debug define facade func of debug level log
func Debug(ctx context.Context, msg string, kv ...any) {
logFacade().log(ctx, zapcore.DebugLevel, msg, kv...)
}
// Info define facade func of info level log
func Info(ctx context.Context, msg string, kv ...any) {
logFacade().log(ctx, zapcore.InfoLevel, msg, kv...)
}
// Warn define facade func of warn level log
func Warn(ctx context.Context, msg string, kv ...any) {
logFacade().log(ctx, zapcore.WarnLevel, msg, kv...)
}
// Error define facade func of error level log
func Error(ctx context.Context, msg string, kv ...any) {
logFacade().log(ctx, zapcore.ErrorLevel, msg, kv...)
}
func (f *facade) log(ctx context.Context, lvl zapcore.Level, msg string, kv ...any) {
f.logSkip(ctx, lvl, 0, msg, kv...)
}
func (f *facade) logSkip(ctx context.Context, lvl zapcore.Level, extraSkip int, msg string, kv ...any) {
fields := makeLogFieldsSkip(ctx, extraSkip, kv...)
ce := f._logger.Check(lvl, msg)
ce.Write(fields...)
}
// ErrorSkip logs at error level with extra caller skip frames for wrapper functions.
func ErrorSkip(ctx context.Context, extraSkip int, msg string, kv ...any) {
logFacade().logSkip(ctx, zapcore.ErrorLevel, extraSkip, msg, kv...)
}
// WarnSkip logs at warn level with extra caller skip frames for wrapper functions.
func WarnSkip(ctx context.Context, extraSkip int, msg string, kv ...any) {
logFacade().logSkip(ctx, zapcore.WarnLevel, extraSkip, msg, kv...)
}
// InfoSkip logs at info level with extra caller skip frames for wrapper functions.
func InfoSkip(ctx context.Context, extraSkip int, msg string, kv ...any) {
logFacade().logSkip(ctx, zapcore.InfoLevel, extraSkip, msg, kv...)
}
func logFacade() *facade {
fOnce.Do(func() {
f = &facade{
_logger: GetLoggerInstance(),
}
})
return f
}

114
logger/logger.go Normal file
View File

@ -0,0 +1,114 @@
// Package logger define log struct of eventRT project
package logger
import (
"context"
"path"
"runtime"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Logger is the interface returned by New for structured, trace-aware logging.
type Logger interface {
Debug(msg string, kv ...any)
Info(msg string, kv ...any)
Warn(msg string, kv ...any)
Error(msg string, kv ...any)
}
type logger struct {
ctx context.Context
_logger *zap.Logger
}
func (l *logger) Debug(msg string, kv ...any) {
l.log(zapcore.DebugLevel, msg, kv...)
}
func (l *logger) Info(msg string, kv ...any) {
l.log(zapcore.InfoLevel, msg, kv...)
}
func (l *logger) Warn(msg string, kv ...any) {
l.log(zapcore.WarnLevel, msg, kv...)
}
func (l *logger) Error(msg string, kv ...any) {
l.log(zapcore.ErrorLevel, msg, kv...)
}
func (l *logger) log(lvl zapcore.Level, msg string, kv ...any) {
fields := makeLogFields(l.ctx, kv...)
ce := l._logger.Check(lvl, msg)
ce.Write(fields...)
}
func makeLogFields(ctx context.Context, kv ...any) []zap.Field {
return makeLogFieldsSkip(ctx, 0, kv...)
}
func makeLogFieldsSkip(ctx context.Context, extraSkip int, kv ...any) []zap.Field {
if len(kv)%2 != 0 {
kv = append(kv, "unknown")
}
spanCtx := trace.SpanFromContext(ctx).SpanContext()
traceID := spanCtx.TraceID().String()
spanID := spanCtx.SpanID().String()
kv = append(kv, "traceID", traceID, "spanID", spanID)
funcName, file, line := getLoggerCallerInfoSkip(extraSkip)
kv = append(kv, "func", funcName, "file", file, "line", line)
fields := make([]zap.Field, 0, len(kv)/2)
for i := 0; i < len(kv); i += 2 {
key := kv[i].(string)
value := kv[i+1]
switch v := value.(type) {
case string:
fields = append(fields, zap.String(key, v))
case int:
fields = append(fields, zap.Int(key, v))
case int64:
fields = append(fields, zap.Int64(key, v))
case float32:
fields = append(fields, zap.Float32(key, v))
case float64:
fields = append(fields, zap.Float64(key, v))
case bool:
fields = append(fields, zap.Bool(key, v))
case error:
fields = append(fields, zap.Error(v))
default:
fields = append(fields, zap.Any(key, v))
}
}
return fields
}
// getLoggerCallerInfo returns caller info at a fixed depth for the standard facade call chain.
func getLoggerCallerInfo() (funcName, file string, line int) {
return getLoggerCallerInfoSkip(0)
}
// getLoggerCallerInfoSkip returns caller info with additional skip frames beyond the standard depth.
func getLoggerCallerInfoSkip(extraSkip int) (funcName, file string, line int) {
pc, file, line, ok := runtime.Caller(4 + extraSkip)
if !ok {
return
}
file = path.Base(file)
funcName = runtime.FuncForPC(pc).Name()
return
}
// New returns a logger bound to ctx. Trace fields (traceID, spanID) are extracted
// from the OTel span stored in ctx and included in every log entry.
func New(ctx context.Context) Logger {
return &logger{
ctx: ctx,
_logger: GetLoggerInstance(),
}
}

132
logger/loki_syncer.go Normal file
View File

@ -0,0 +1,132 @@
// Package logger define log struct of eventRT project
package logger
import (
"bytes"
"context"
"encoding/json"
"fmt"
"maps"
"net/http"
"os"
"strconv"
"sync"
"time"
"eventRT/config"
)
type lokiPushRequest struct {
Streams []lokiStream `json:"streams"`
}
type lokiStream struct {
Stream map[string]string `json:"stream"`
Values [][2]string `json:"values"`
}
// lokiSyncer implements zapcore.WriteSyncer, batching log lines and pushing them
// to Loki's push API asynchronously. Errors are silently dropped so a unreachable
// Loki instance never blocks or crashes the application.
type lokiSyncer struct {
endpoint string
labels map[string]string
client *http.Client
ch chan string
wg sync.WaitGroup
closeOnce sync.Once
}
func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer {
// always tag development logs with env=development; caller-supplied labels override if needed
labels := map[string]string{"env": "development"}
maps.Copy(labels, lCfg.Labels)
ls := &lokiSyncer{
endpoint: lCfg.Endpoint + "/loki/api/v1/push",
labels: labels,
client: &http.Client{Timeout: 5 * time.Second},
ch: make(chan string, 512),
}
ls.wg.Add(1)
go ls.run()
return ls
}
func (ls *lokiSyncer) Write(p []byte) (int, error) {
select {
case ls.ch <- string(p):
default:
// channel full: drop the line rather than block the caller
}
return len(p), nil
}
// Sync flushes remaining buffered lines and shuts down the background goroutine.
// Called by zap.Logger.Sync() at application shutdown.
func (ls *lokiSyncer) Sync() error {
ls.closeOnce.Do(func() { close(ls.ch) })
ls.wg.Wait()
return nil
}
func (ls *lokiSyncer) run() {
defer ls.wg.Done()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
var batch []string
flush := func() {
if len(batch) == 0 {
return
}
ls.push(batch)
batch = batch[:0]
}
for {
select {
case line, ok := <-ls.ch:
if !ok {
flush()
return
}
batch = append(batch, line)
if len(batch) >= 100 {
flush()
}
case <-ticker.C:
flush()
}
}
}
func (ls *lokiSyncer) push(lines []string) {
ts := strconv.FormatInt(time.Now().UnixNano(), 10)
values := make([][2]string, len(lines))
for i, line := range lines {
values[i] = [2]string{ts, line}
}
body, err := json.Marshal(lokiPushRequest{
Streams: []lokiStream{{Stream: ls.labels, Values: values}},
})
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ls.endpoint, bytes.NewReader(body))
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := ls.client.Do(req)
if err != nil {
fmt.Fprintf(os.Stderr, "loki syncer: push failed: %v\n", err)
return
}
defer resp.Body.Close()
}

124
logger/zap.go Normal file
View File

@ -0,0 +1,124 @@
// Package logger define log struct of eventRT project
package logger
import (
"fmt"
"os"
"sync"
"time"
"eventRT/config"
"eventRT/constants"
"github.com/natefinch/lumberjack"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
once sync.Once
_globalLoggerMu sync.RWMutex
_globalLogger *zap.Logger
)
// getEncoder returns a console encoder for development (human-readable, colored) and a JSON encoder
// for container modes (parseable by Promtail pipeline_stages).
func getEncoder(mode string) zapcore.Encoder {
cfg := zap.NewProductionEncoderConfig()
cfg.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
cfg.TimeKey = "time"
cfg.EncodeCaller = zapcore.ShortCallerEncoder
if mode == constants.DevelopmentLogMode {
cfg.EncodeLevel = zapcore.CapitalColorLevelEncoder
return zapcore.NewConsoleEncoder(cfg)
}
cfg.EncodeLevel = zapcore.CapitalLevelEncoder
return zapcore.NewJSONEncoder(cfg)
}
// getWriteSyncer returns write targets based on mode:
// - development: stdout + optional Loki direct-push (when loki.endpoint is set)
// - container modes: stdout always (Promtail collects) + rotating file (when filepath is set)
func getWriteSyncer(lCfg config.LoggerConfig) zapcore.WriteSyncer {
stdout := zapcore.AddSync(os.Stdout)
if lCfg.Mode == constants.DevelopmentLogMode {
if lCfg.Loki.Endpoint == "" {
return stdout
}
return zapcore.NewMultiWriteSyncer(stdout, newLokiSyncer(lCfg.Loki))
}
syncers := []zapcore.WriteSyncer{stdout}
if lCfg.FilePath != "" {
dateStr := time.Now().Format("2006-01-02 15:04:05")
syncers = append(syncers, zapcore.AddSync(&lumberjack.Logger{
Filename: fmt.Sprintf(lCfg.FilePath, dateStr),
MaxSize: lCfg.MaxSize,
MaxAge: lCfg.MaxAge,
MaxBackups: lCfg.MaxBackups,
Compress: lCfg.Compress,
}))
}
return zapcore.NewMultiWriteSyncer(syncers...)
}
// containerFields reads K8s Downward API environment variables and returns them as global zap fields.
// These fields appear on every log line, allowing Loki/Grafana to filter by pod, namespace, and node.
// Inject them in the Deployment manifest:
//
// env:
// - name: K8S_NAMESPACE
// valueFrom: {fieldRef: {fieldPath: metadata.namespace}}
// - name: K8S_NODE_NAME
// valueFrom: {fieldRef: {fieldPath: spec.nodeName}}
func containerFields() []zap.Field {
var fields []zap.Field
// HOSTNAME is automatically set to the pod name by Kubernetes.
if pod := os.Getenv("HOSTNAME"); pod != "" {
fields = append(fields, zap.String("pod", pod))
}
if ns := os.Getenv("K8S_NAMESPACE"); ns != "" {
fields = append(fields, zap.String("namespace", ns))
}
if node := os.Getenv("K8S_NODE_NAME"); node != "" {
fields = append(fields, zap.String("node", node))
}
return fields
}
// initLogger return successfully initialized zap logger
func initLogger(lCfg config.LoggerConfig) *zap.Logger {
writeSyncer := getWriteSyncer(lCfg)
encoder := getEncoder(lCfg.Mode)
l := new(zapcore.Level)
if err := l.UnmarshalText([]byte(lCfg.Level)); err != nil {
panic(err)
}
core := zapcore.NewCore(encoder, writeSyncer, l)
opts := []zap.Option{zap.AddCaller()}
if lCfg.Mode != constants.DevelopmentLogMode {
opts = append(opts, zap.Fields(containerFields()...))
}
logger := zap.New(core, opts...)
zap.ReplaceGlobals(logger)
return logger
}
// InitLoggerInstance define func of return instance of zap logger
func InitLoggerInstance(lCfg config.LoggerConfig) {
once.Do(func() {
_globalLogger = initLogger(lCfg)
})
}
// GetLoggerInstance define func of returns the global logger instance It's safe for concurrent use.
func GetLoggerInstance() *zap.Logger {
_globalLoggerMu.RLock()
logger := _globalLogger
_globalLoggerMu.RUnlock()
return logger
}

148
main.go Normal file
View File

@ -0,0 +1,148 @@
// entry function
package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"eventRT/config"
"eventRT/constants"
"eventRT/database"
"eventRT/event"
"eventRT/handler"
"eventRT/logger"
"eventRT/middleware"
"eventRT/mq"
"eventRT/util"
"github.com/gin-gonic/gin"
"go.opentelemetry.io/otel"
)
var (
eventRTConfigDir = flag.String("eventRT_config_dir", "./configs", "config file dir")
eventRTConfigName = flag.String("eventRT_config_name", "config", "config file name")
eventRTConfigType = flag.String("eventRT_config_type", "yaml", "config file type")
)
func main() {
flag.Parse()
configPath := filepath.Join(*eventRTConfigDir, *eventRTConfigName+"."+*eventRTConfigType)
if _, err := os.Stat(configPath); os.IsNotExist(err) {
log.Println("configuration file not found,checking for example file")
exampleConfigPath := filepath.Join(*eventRTConfigDir, *eventRTConfigName+".example."+*eventRTConfigType)
configDir := filepath.Dir(configPath)
if err := os.MkdirAll(configDir, 0o755); err != nil {
panic(fmt.Errorf("failed to create config directory %s:%w", configDir, err))
}
if _, err := os.Stat(exampleConfigPath); err == nil {
if err := util.CopyFile(exampleConfigPath, configPath); err != nil {
panic(fmt.Errorf("failed to copy example config file:%w", err))
}
} else {
panic(errors.New("no config file and no config example file found"))
}
}
eventRTConfig := config.ReadAndInitConfig(*eventRTConfigDir, *eventRTConfigName, *eventRTConfigType)
// init logger instance
logger.InitLoggerInstance(eventRTConfig.LoggerConfig)
defer logger.GetLoggerInstance().Sync()
// init OTel TracerProvider
tp, tpErr := middleware.InitTracerProvider(context.Background(), eventRTConfig)
if tpErr != nil {
log.Printf("warn: OTLP tracer init failed, tracing disabled: %v", tpErr)
}
if tp != nil {
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
tp.Shutdown(shutdownCtx)
}()
}
notifyCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer stop()
notifyCtx, startupSpan := otel.Tracer("eventRT/main").Start(notifyCtx, "startup")
startupSpan.End()
// init MongoDB client
client := database.InitMongoInstance(notifyCtx, eventRTConfig)
defer func() {
disconnectCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Disconnect(disconnectCtx); err != nil {
logger.Error(notifyCtx, "mongodb disconnect failed", "err", err)
} else {
logger.Info(notifyCtx, "mongodb connection closed gracefully")
}
}()
// init RabbitMQ connection
mq.InitRabbitProxy(notifyCtx, eventRTConfig.RabbitMQConfig)
defer mq.GetConn().Close()
go event.ReceiptUpDownLimitAlarm(notifyCtx)
// use release mode in production
if eventRTConfig.DeployEnv == constants.ProductionDeployMode {
gin.SetMode(gin.ReleaseMode)
}
engine := gin.New()
engine.Use(gin.Logger(), gin.Recovery())
// TODO k8s liveness & readiness probe endpoints
// engine.GET("/ping", func(c *gin.Context) {
// c.JSON(200, gin.H{"status": "ok"})
// })
events := engine.Group("/events")
{
events.GET("/:event_uuid", handler.GetEventHandler)
events.PATCH("/:event_uuid/confirm", handler.ConfirmEventHandler)
events.PATCH("/:event_uuid/close", handler.CloseEventHandler)
}
server := http.Server{
Addr: eventRTConfig.ServiceAddr,
Handler: engine,
}
go func() {
<-notifyCtx.Done()
ctx := context.Background()
logger.Info(ctx, "shutdown signal received, cleaning up...")
shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := server.Shutdown(shutdownCtx); err != nil {
logger.Error(shutdownCtx, "server shutdown failed", "err", err)
}
mq.CloseRabbitProxy()
logger.Info(ctx, "resources cleaned up, exiting")
}()
logger.Info(notifyCtx, "starting EventRT server")
err := server.ListenAndServe()
if err != nil {
if err == http.ErrServerClosed {
// the service receives the shutdown signal normally and then closes
logger.Info(notifyCtx, "server closed under request")
} else {
// abnormal shutdown of service
logger.Error(notifyCtx, "server closed unexpected", "err", err)
}
}
}

82
middleware/trace.go Normal file
View File

@ -0,0 +1,82 @@
// Package middleware defines OTel tracing infrastructure and gin middlewares.
package middleware
import (
"context"
"fmt"
"eventRT/config"
"eventRT/constants"
"github.com/gin-gonic/gin"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
oteltrace "go.opentelemetry.io/otel/trace"
)
// InitTracerProvider creates an OTLP TracerProvider and registers it as the global provider.
// It registers the W3C TraceContext propagator (traceparent header).
// The caller is responsible for calling Shutdown on the returned provider during graceful shutdown.
func InitTracerProvider(ctx context.Context, cfg config.EventRTConfig) (*sdktrace.TracerProvider, error) {
opts := []otlptracehttp.Option{
otlptracehttp.WithEndpoint(cfg.OtelConfig.Endpoint),
}
if cfg.OtelConfig.Insecure {
opts = append(opts, otlptracehttp.WithInsecure())
}
exporter, err := otlptracehttp.New(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("create OTLP exporter: %w", err)
}
res := sdkresource.NewSchemaless(
attribute.String("service.name", cfg.ServiceName),
attribute.String("deployment.environment", cfg.DeployEnv),
)
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.TraceContext{})
return tp, nil
}
// StartTrace extracts upstream W3C trace context from request headers and starts a server span.
func StartTrace() gin.HandlerFunc {
tracer := otel.Tracer("eventRT/http")
return func(c *gin.Context) {
ctx := otel.GetTextMapPropagator().Extract(
c.Request.Context(),
propagation.HeaderCarrier(c.Request.Header),
)
spanName := c.FullPath()
if spanName == "" {
spanName = c.Request.URL.Path
}
ctx, span := tracer.Start(ctx, spanName,
oteltrace.WithSpanKind(oteltrace.SpanKindServer),
)
defer span.End()
spanCtx := span.SpanContext()
ctx = context.WithValue(ctx, constants.CtxKeyTraceID, spanCtx.TraceID().String())
ctx = context.WithValue(ctx, constants.CtxKeySpanID, spanCtx.SpanID().String())
c.Request = c.Request.WithContext(ctx)
c.Set(constants.HeaderTraceID, spanCtx.TraceID().String())
c.Set(constants.HeaderSpanID, spanCtx.SpanID().String())
c.Next()
}
}

217
mq/mq_init.go Normal file
View File

@ -0,0 +1,217 @@
// Package mq define message queue operation functions
package mq
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"sync"
"time"
"eventRT/config"
"eventRT/logger"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/youmark/pkcs8"
)
var (
_globalRabbitMQProxy *RabbitMQProxy
rabbitMQOnce sync.Once
)
// RabbitMQProxy define stuct of rabbitMQ connection proxy
type RabbitMQProxy struct {
tlsConf *tls.Config
conn *amqp.Connection
cancel context.CancelFunc
mu sync.Mutex
}
// rabbitMQCertConf define stuct of rabbitMQ connection certificates config
type rabbitMQCertConf struct {
serverName string
insecureSkipVerify bool
clientCert tls.Certificate
caCertPool *x509.CertPool
}
// GetConn define func to return the rabbitMQ connection
func GetConn() *amqp.Connection {
_globalRabbitMQProxy.mu.Lock()
defer _globalRabbitMQProxy.mu.Unlock()
return _globalRabbitMQProxy.conn
}
// InitRabbitProxy return instance of rabbitMQ connection
func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy {
amqpURI := generateRabbitMQURI(rCfg)
tlsConf, err := initCertConf(rCfg)
if err != nil {
logger.Error(ctx, "init rabbitMQ cert config failed", "error", err)
panic(err)
}
rabbitMQOnce.Do(func() {
cancelCtx, cancel := context.WithCancel(ctx)
conn := initRabbitMQ(ctx, amqpURI, tlsConf)
_globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel}
go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI)
})
return _globalRabbitMQProxy
}
// initRabbitMQ return instance of rabbitMQ connection
func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection {
logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI)
conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
TLSClientConfig: tlsConf,
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
Heartbeat: 10 * time.Second,
})
if err != nil {
logger.Error(ctx, "init rabbitMQ connection failed", "error", err)
panic(err)
}
return conn
}
func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) {
for {
closeChan := make(chan *amqp.Error)
GetConn().NotifyClose(closeChan)
select {
case <-ctx.Done():
logger.Info(ctx, "context cancelled, exiting handleReconnect")
return
case err, ok := <-closeChan:
if !ok {
logger.Info(ctx, "rabbitMQ notify channel closed")
return
}
if err == nil {
logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect")
return
}
logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err)
}
if !p.reconnect(ctx, rabbitMQURI) {
return
}
}
}
func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool {
for {
logger.Info(ctx, "attempting to reconnect to rabbitMQ...")
select {
case <-ctx.Done():
return false
case <-time.After(5 * time.Second):
}
newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
TLSClientConfig: p.tlsConf,
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
Heartbeat: 10 * time.Second,
})
if err == nil {
p.mu.Lock()
p.conn = newConn
p.mu.Unlock()
logger.Info(ctx, "rabbitMQ reconnected successfully")
return true
}
logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err)
}
}
// CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine
func CloseRabbitProxy() {
if _globalRabbitMQProxy != nil {
_globalRabbitMQProxy.cancel()
_globalRabbitMQProxy.mu.Lock()
if _globalRabbitMQProxy.conn != nil {
_globalRabbitMQProxy.conn.Close()
}
_globalRabbitMQProxy.mu.Unlock()
}
}
func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
// TODO 考虑拆分用户名密码配置项,兼容不同认证方式
// user := url.QueryEscape(rCfg.User)
// password := url.QueryEscape(rCfg.Password)
// amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/",
// user,
// password,
// rCfg.Host,
// rCfg.Port,
// )
amqpURI := fmt.Sprintf("amqps://%s:%d/",
rCfg.Host,
rCfg.Port,
)
return amqpURI
}
func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) {
tlsConf := &tls.Config{
InsecureSkipVerify: rCfg.InsecureSkipVerify,
ServerName: rCfg.ServerName,
}
caCert, err := os.ReadFile(rCfg.CACertPath)
if err != nil {
return nil, fmt.Errorf("read server ca file failed: %w", err)
}
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath)
}
tlsConf.RootCAs = caCertPool
certPEM, err := os.ReadFile(rCfg.ClientCertPath)
if err != nil {
return nil, fmt.Errorf("read client cert file failed: %w", err)
}
keyData, err := os.ReadFile(rCfg.ClientKeyPath)
if err != nil {
return nil, fmt.Errorf("read private key file failed: %w", err)
}
block, _ := pem.Decode(keyData)
if block == nil {
return nil, fmt.Errorf("failed to decode PEM block from private key")
}
der, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword))
if err != nil {
return nil, fmt.Errorf("parse password-protected private key failed: %w", err)
}
privBytes, err := x509.MarshalPKCS8PrivateKey(der)
if err != nil {
return nil, fmt.Errorf("marshal private key failed: %w", err)
}
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
clientCert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
return nil, fmt.Errorf("create x509 key pair failed: %w", err)
}
tlsConf.Certificates = []tls.Certificate{clientCert}
return tlsConf, nil
}

35
util/copy.go Normal file
View File

@ -0,0 +1,35 @@
// Package util provide some utility functions
package util
import (
"fmt"
"io"
"os"
)
// CopyFile define func of copies a file from src to dst.
// If the destination file exists, it will be overwritten.
func CopyFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return fmt.Errorf("failed to open source file %s: %w", src, err)
}
defer sourceFile.Close()
destFile, err := os.Create(dst)
if err != nil {
return fmt.Errorf("failed to create destination file %s: %w", dst, err)
}
defer destFile.Close()
_, err = io.Copy(destFile, sourceFile)
if err != nil {
return fmt.Errorf("failed to copy file contents from %s to %s: %w", src, dst, err)
}
err = destFile.Sync()
if err != nil {
return fmt.Errorf("failed to sync destination file %s: %w", dst, err)
}
return nil
}