diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..97a6cda --- /dev/null +++ b/.drone.yml @@ -0,0 +1,12 @@ +kind: pipeline +type: docker +name: default + +steps: + - name: build + image: golang:latest + environment: + GO111MODULE: on + GOPROXY: https://goproxy.cn,direct + commands: + - go build main.go diff --git a/.gitignore b/.gitignore index adf8f72..531a22a 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,11 @@ # Go workspace file go.work +# vscode files +.vscode/ + +# Shield all log files in the log folder +/log/ +# Shield config files in the configs folder +/configs/**/*.yaml +/configs/**/*.pem \ No newline at end of file diff --git a/README.md b/README.md index 8a8cd23..01b28df 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ # eventRT -事件处理服务 \ No newline at end of file +事件处理服务 + +[![Build Status](http://192.168.46.100:4080/api/badges/CL-Softwares/eventRT/status.svg?ref=refs/heads/feature-joinDebuggingDemo)](http://192.168.46.100:4080/CL-Softwares/eventRT) \ No newline at end of file diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..18ca080 --- /dev/null +++ b/config/config.go @@ -0,0 +1,109 @@ +// Package config define config struct of model runtime service +package config + +import ( + "fmt" + "net/url" + + "github.com/spf13/viper" +) + +// LokiConfig define config struct of loki direct-push (used in development mode) +type LokiConfig struct { + Endpoint string `mapstructure:"endpoint"` // empty disables direct push + Labels map[string]string `mapstructure:"labels"` +} + +// LoggerConfig define config struct of zap logger config +type LoggerConfig struct { + Mode string `mapstructure:"mode"` + Level string `mapstructure:"level"` + FilePath string `mapstructure:"filepath"` // empty disables file rotation in container modes + MaxSize int `mapstructure:"maxsize"` + MaxBackups int `mapstructure:"maxbackups"` + MaxAge int `mapstructure:"maxage"` + Compress bool `mapstructure:"compress"` + Loki LokiConfig `mapstructure:"loki"` +} + +// MongoDBConfig define config struct of mongoDB config +type MongoDBConfig struct { + Host string `mapstructure:"host"` + User string `mapstructure:"user"` + Password string `mapstructure:"password"` + Database string `mapstructure:"database"` + AuthDB string `mapstructure:"auth_db"` + Port int `mapstructure:"port"` + Timeout int `mapstructure:"timeout"` +} + +type RabbitMQConfig struct { + CACertPath string `mapstructure:"ca_cert_path"` + ClientKeyPath string `mapstructure:"client_key_path"` + ClientKeyPassword string `mapstructure:"client_key_password"` + ClientCertPath string `mapstructure:"client_cert_path"` + InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"` + ServerName string `mapstructure:"server_name"` + User string `mapstructure:"user"` + Password string `mapstructure:"password"` + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` +} + +// ServiceConfig define config struct of service config +type ServiceConfig struct { + ServiceAddr string `mapstructure:"service_addr"` + ServiceName string `mapstructure:"service_name"` + SecretKey string `mapstructure:"secret_key"` + DeployEnv string `mapstructure:"deploy_env"` +} + +// OtelConfig define config struct of OpenTelemetry tracing +type OtelConfig struct { + Endpoint string `mapstructure:"endpoint"` // e.g. "localhost:4318" + Insecure bool `mapstructure:"insecure"` +} + +// EventRTConfig define config struct of model runtime server +type EventRTConfig struct { + ServiceConfig `mapstructure:"service"` + LoggerConfig `mapstructure:"logger"` + MongoDBConfig `mapstructure:"mongodb"` + RabbitMQConfig `mapstructure:"rabbitMQ"` + OtelConfig OtelConfig `mapstructure:"otel"` + MongoDBURI string +} + +// ReadAndInitConfig return eventRT project config struct +func ReadAndInitConfig(configDir, configName, configType string) (eventRTConfig EventRTConfig) { + config := viper.New() + config.AddConfigPath(configDir) + config.SetConfigName(configName) + config.SetConfigType(configType) + if err := config.ReadInConfig(); err != nil { + if _, ok := err.(viper.ConfigFileNotFoundError); ok { + panic(fmt.Sprintf("can not find conifg file:%s\n", err.Error())) + } + panic(err) + } + + config.BindEnv("mongodb.password", "MONGODB_PASSWORD") + config.BindEnv("service.secret_key", "SERVICE_SECRET_KEY") + + if err := config.Unmarshal(&eventRTConfig); err != nil { + panic(fmt.Sprintf("unmarshal eventRT config failed:%s\n", err.Error())) + } + + if eventRTConfig.MongoDBConfig.Timeout <= 0 { + eventRTConfig.MongoDBConfig.Timeout = 10 + } + if eventRTConfig.MongoDBConfig.AuthDB == "" { + eventRTConfig.MongoDBConfig.AuthDB = "admin" + } + + // init mongoDB uri + user := url.QueryEscape(eventRTConfig.MongoDBConfig.User) + password := url.QueryEscape(eventRTConfig.MongoDBConfig.Password) + eventRTConfig.MongoDBURI = fmt.Sprintf("mongodb://%s:%s@%s:%d/?authSource=%s", user, password, eventRTConfig.MongoDBConfig.Host, eventRTConfig.MongoDBConfig.Port, eventRTConfig.MongoDBConfig.AuthDB) + return eventRTConfig +} diff --git a/constants/deploy_mode.go b/constants/deploy_mode.go new file mode 100644 index 0000000..3bc98c6 --- /dev/null +++ b/constants/deploy_mode.go @@ -0,0 +1,11 @@ +// Package constants define constant variable +package constants + +const ( + // DevelopmentDeployMode define development operator environment for eventRT project + DevelopmentDeployMode = "development" + // DebugDeployMode define debug operator environment for eventRT project + DebugDeployMode = "debug" + // ProductionDeployMode define production operator environment for eventRT project + ProductionDeployMode = "production" +) diff --git a/constants/event.go b/constants/event.go new file mode 100644 index 0000000..be70758 --- /dev/null +++ b/constants/event.go @@ -0,0 +1,29 @@ +// Package constants define constant variable +package constants + +const ( + // EventStatusHappened define status for event record when event just happened, no data attached yet + EventStatusHappened = iota + // EventStatusDataAttached define status for event record when event data attached, ready to be sent + EventStatusDataAttached + // EventStatusReported define status for event record when event reported to downstream + EventStatusReported + // EventStatusConfirmed define status for event record when event confirmed by operator or CIM + EventStatusConfirmed + // EventStatusClosed define status for event record when event closed due to condition recovery or manual close + EventStatusClosed +) + +const ( + // EventUIExchangeName define exchange name for pushing events to UI consumers + EventUIExchangeName = "event-ui-exchange" + // EventUIQueueName define queue name for UI consumers to subscribe to events + EventUIQueueName = "event-ui-queue" +) + +const ( + // EventDBName define MongoDB database name for event storage + EventDBName = "eventdb" + // EventCollectionName define MongoDB collection name for alarm event records + EventCollectionName = "alarms" +) diff --git a/constants/log_mode.go b/constants/log_mode.go new file mode 100644 index 0000000..078b6a0 --- /dev/null +++ b/constants/log_mode.go @@ -0,0 +1,11 @@ +// Package constants define constant variable +package constants + +const ( + // DevelopmentLogMode define development operator environment for eventRT project + DevelopmentLogMode = "development" + // DebugLogMode define debug operator environment for eventRT project + DebugLogMode = "debug" + // ProductionLogMode define production operator environment for eventRT project + ProductionLogMode = "production" +) diff --git a/constants/trace.go b/constants/trace.go new file mode 100644 index 0000000..e5e595a --- /dev/null +++ b/constants/trace.go @@ -0,0 +1,21 @@ +// Package constants define constant variable +package constants + +// Internal context keys for trace values set by StartTrace middleware. +// These are gin/stdlib context keys only — actual W3C header propagation +// (traceparent / tracestate) is handled automatically by the OTel propagator. +const ( + HeaderTraceID = "trace-id" + HeaderSpanID = "span-id" + HeaderParentSpanID = "parent-span-id" +) + +// traceCtxKey is an unexported type for context keys to avoid collisions with other packages. +type traceCtxKey string + +// Typed context keys for trace values — use these with context.WithValue / ctx.Value. +var ( + CtxKeyTraceID = traceCtxKey(HeaderTraceID) + CtxKeySpanID = traceCtxKey(HeaderSpanID) + CtxKeyParentSpanID = traceCtxKey(HeaderParentSpanID) +) diff --git a/database/mongo_init.go b/database/mongo_init.go new file mode 100644 index 0000000..0606537 --- /dev/null +++ b/database/mongo_init.go @@ -0,0 +1,56 @@ +// Package database define database operation functions +package database + +import ( + "context" + "sync" + "time" + + "eventRT/config" // + "eventRT/logger" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +var ( + mongoOnce sync.Once + _globalMongoClient *mongo.Client +) + +// GetMongoClient returns the global MongoDB client +func GetMongoClient() *mongo.Client { + return _globalMongoClient +} + +// InitMongoInstance return instance of MongoDB client +func InitMongoInstance(ctx context.Context, eCfg config.EventRTConfig) *mongo.Client { + mongoOnce.Do(func() { + _globalMongoClient = initMongoClient(ctx, eCfg.MongoDBURI, eCfg.Timeout) + }) + return _globalMongoClient +} + +// initMongoClient return successfully initialized MongoDB client +func initMongoClient(ctx context.Context, mongoDBURI string, timeout int) *mongo.Client { + clientOptions := options.Client().ApplyURI(mongoDBURI) + client, err := mongo.Connect(ctx, clientOptions) + if err != nil { + logger.Error(ctx, "failed to connect to MongoDB", "error", err) + panic(err) + } + + pingTimeout := time.Duration(timeout) * time.Second + if pingTimeout == 0 { + pingTimeout = 10 * time.Second + } + + pingCtx, cancel := context.WithTimeout(ctx, pingTimeout) + defer cancel() + if err := client.Ping(pingCtx, nil); err != nil { + logger.Error(ctx, "failed to ping operation with MongoDB", "error", err) + panic(err) + } + + return client +} diff --git a/deploy/deploy.md b/deploy/deploy.md new file mode 100644 index 0000000..119116d --- /dev/null +++ b/deploy/deploy.md @@ -0,0 +1,689 @@ +# 项目部署指南 + +本项目依赖 `MongoDB`(事件持久化存储)与 `RabbitMQ`(mTLS 消息队列),并通过 `OpenTelemetry + Jaeger` 完成链路追踪。 + +## 前提条件 + +1. 已安装 `Docker` 与 `kubectl` +2. `Minikube` 集群已启动并可访问 +3. 确保以下端口在宿主机上未被占用:`27017`(MongoDB)、`5671`(RabbitMQ AMQP) + +--- + +### 1\. 部署 MongoDB 数据库 + +EventRT 支持两种 MongoDB 部署方式,根据场景二选一:**Docker**(本地开发 / Ubuntu 宿主机直跑)或 **K8s**(Minikube 环境)。 + +#### 1.1 Docker 部署(本地开发) + +使用官方 `mongo:7.0` 镜像,在 Ubuntu 宿主机(`192.168.1.101`)上以 Docker 容器运行。 + +##### 1.1.1 部署命令 + +```bash +docker run --name mongodb \ + -e MONGO_INITDB_ROOT_USERNAME=coslight \ + -e MONGO_INITDB_ROOT_PASSWORD=coslight@tj \ + -p 27017:27017 \ + -d mongo:7.0 +``` + +##### 1.1.2 连接信息 + +| 参数 | 值 | 说明 | +| :--- | :--- | :--- | +| **容器名称** | `mongodb` | 容器名 | +| **镜像版本** | `mongo:7.0` | MongoDB 7.0 | +| **主机端口** | `27017` | 外部应用连接端口 | +| **用户名** | `coslight` | Root 管理员 | +| **密码** | `coslight@tj` | 启动时通过 `MONGO_INITDB_ROOT_PASSWORD` 设置 | +| **鉴权数据库** | `admin` | `auth_db` | +| **业务数据库** | `eventdb` | EventRT 事件存储库 | + +##### 1.1.3 状态检查 + +```bash +# 检查容器启动状态 +docker ps -a | grep mongodb +# 检查启动日志 +docker logs mongodb +``` + +> **注意:** 密码当前以明文形式写在 `docker run` 命令中,生产环境应通过 Docker Secret 或环境变量文件(`--env-file`)传入,避免在 Shell 历史记录中留存明文密码。 + +##### 1.1.4 连接验证 + +```bash +# 快速检查 MongoDB 是否接受连接 +docker exec -it mongodb mongosh \ + -u coslight -p "coslight@tj" --authenticationDatabase admin \ + --eval "db.adminCommand({ ping: 1 })" + +# 列出所有数据库(确认服务正常) +docker exec -it mongodb mongosh \ + -u coslight -p "coslight@tj" --authenticationDatabase admin \ + --eval "show dbs" +``` + +##### 1.1.5 初始化 eventdb 数据库 + +MongoDB 启动后进入容器,为 `eventdb` 库授权: + +```bash +docker exec -it mongodb mongosh \ + -u coslight -p "coslight@tj" --authenticationDatabase admin +``` + +在 `mongosh` 中执行: + +```javascript +// 切换到 eventdb,若不存在则自动创建 +use eventdb + +// 授予 coslight 用户对 eventdb 的读写权限(根用户已有全库权限,此步可选) +db.createUser({ + user: "coslight", + pwd: "coslight@tj", + roles: [{ role: "readWrite", db: "eventdb" }] +}) +``` + +#### 1.2 K8s 部署(Minikube) + +YAML 文件位于 `deploy/k8s/`(从 modelrt 仓库迁移而来,需确认文件已拷贝至本项目)。 + +```bash +kubectl apply -f deploy/k8s/mongodb-secret.yaml +kubectl apply -f deploy/k8s/mongodb-pvc.yaml +kubectl apply -f deploy/k8s/mongodb-statefulset.yaml +kubectl apply -f deploy/k8s/mongodb-service.yaml +``` + +| 参数 | 值 | 说明 | +| :--- | :--- | :--- | +| **镜像** | `mongo:7.0` | MongoDB 7.0 | +| **NodePort** | `30017` | 集群外访问端口 | +| **用户名** | `admin` | Root 管理员(Secret `mongodb-secret`) | +| **密码** | `coslight` | Secret `mongodb-secret` 中配置,生产环境请替换强密码 | +| **存储** | `2Gi` | PVC `mongodb-data` | +| **CPU** | `100m` 请求 / `500m` 上限 | StatefulSet `resources` 字段 | +| **内存** | `256Mi` 请求 / `512Mi` 上限 | StatefulSet `resources` 字段 | + +> **注意:** 密码存储在 `mongodb-secret.yaml` 的 `stringData` 中,生产环境应替换为强密码,并避免将明文密码提交至版本库。 + +##### 1.2.1 等待 Pod 就绪 + +```bash +kubectl wait --for=condition=ready pod -l app=mongodb --timeout=120s +``` + +##### 1.2.2 连接验证 + +```bash +kubectl exec -it $(kubectl get pod -l app=mongodb -o jsonpath='{.items[0].metadata.name}') \ + -- mongosh -u admin -p coslight --authenticationDatabase admin \ + --eval "db.adminCommand({ ping: 1 })" +``` + +##### 1.2.3 初始化 eventdb 数据库 + +```bash +kubectl exec -it $(kubectl get pod -l app=mongodb -o jsonpath='{.items[0].metadata.name}') \ + -- mongosh -u admin -p coslight --authenticationDatabase admin +``` + +在 `mongosh` 中执行: + +```javascript +use eventdb + +db.createUser({ + user: "coslight", + pwd: "coslight@tj", + roles: [{ role: "readWrite", db: "eventdb" }] +}) +``` + +##### 1.2.4 状态检查 + +```bash +kubectl get pods -l app=mongodb +kubectl logs -l app=mongodb --tail=30 +``` + +--- + +### 2\. 部署 RabbitMQ(Kubernetes) + +RabbitMQ 配置为仅允许 TLS 连接(`listeners.tcp = none`),所有客户端须持有由同一 CA 签发的证书。YAML 文件位于 `deploy/mq/`。 + +#### 2.1 TLS 证书生成 + +##### 2.1.1 生成根 CA + +```bash +git clone https://github.com/rabbitmq/tls-gen.git +cd tls-gen/basic + +# 生成根 CA,结果输出至 result/ +make CN=rabbitmq-server +# ca_certificate.pem 与 ca_key.pem(即 cakey.pem)生成于 result/ +``` + +##### 2.1.2 生成服务端证书 + +服务端证书需包含 SAN,使其同时匹配集群内 DNS 和 Minikube IP。使用 `deploy/mq/server.conf`(已预置正确配置): + +```bash +# 将 ca_certificate.pem 和 cakey.pem 放在当前目录 +openssl genrsa -out server_key.pem 2048 + +openssl req -new -key server_key.pem \ + -out server_cert.csr -config deploy/mq/server.conf + +openssl x509 -req -in server_cert.csr \ + -CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \ + -out server_certificate.pem -days 730 -sha256 \ + -extfile deploy/mq/server.conf -extensions v3_server + +rm server_cert.csr +``` + +##### 2.1.3 生成 EventRT 客户端证书 + +CN 必须与 RabbitMQ 中注册的用户名一致(`eventrt-client`)。使用 `deploy/mq/eventrt.conf`: + +```bash +openssl genrsa -out eventrt_client_key.pem 2048 + +openssl req -new -key eventrt_client_key.pem \ + -out eventrt_client.csr -config deploy/mq/eventrt.conf + +openssl x509 -req -in eventrt_client.csr \ + -CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \ + -out eventrt_client_cert.pem -days 365 \ + -extensions v3_client -extfile deploy/mq/eventrt.conf + +rm eventrt_client.csr +``` + +##### 2.1.4 生成 ModelRT 客户端证书 + +使用 `deploy/mq/modelrt.conf`(CN 为 `modelrt-client`): + +```bash +openssl genrsa -out modelrt_client_key.pem 2048 + +openssl req -new -key modelrt_client_key.pem \ + -out modelrt_client.csr -config deploy/mq/modelrt.conf + +openssl x509 -req -in modelrt_client.csr \ + -CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \ + -out modelrt_client_cert.pem -days 365 \ + -extensions v3_client -extfile deploy/mq/modelrt.conf + +rm modelrt_client.csr +``` + +##### 2.1.5 验证证书 + +```bash +# 验证服务端证书 +openssl verify -CAfile ca_certificate.pem server_certificate.pem + +# 验证客户端证书 +openssl verify -CAfile ca_certificate.pem eventrt_client_cert.pem +openssl verify -CAfile ca_certificate.pem modelrt_client_cert.pem + +# 确认 CN 和扩展(clientAuth / serverAuth) +openssl x509 -in server_certificate.pem -noout -subject -ext subjectAltName +openssl x509 -in eventrt_client_cert.pem -noout -subject -text | grep -A1 "Extended Key Usage" +openssl x509 -in modelrt_client_cert.pem -noout -subject +``` + +#### 2.2 部署 RabbitMQ + +##### 2.2.1 创建证书 Secret + +在证书文件所在目录执行: + +```bash +sh deploy/mq/secert.sh +``` + +该脚本等价于: + +```bash +kubectl create secret generic rabbitmq-certs \ + --from-file=ca_certificate.pem=./certs/ca_certificate.pem \ + --from-file=server_certificate.pem=./certs/server_certificate.pem \ + --from-file=server_key.pem=./certs/server_key.pem +``` + +##### 2.2.2 部署 + +```bash +kubectl apply -f deploy/mq/rabbitmq-secret.yaml +kubectl apply -f deploy/mq/rabbitmq-users-config.yaml +kubectl apply -f deploy/mq/rabbitmq-config.yaml +kubectl apply -f deploy/mq/rabbitmq-deployment.yaml +kubectl apply -f deploy/mq/rabbitmq-service.yaml +``` + +##### 2.2.3 端口汇总 + +| 端口 | NodePort | 说明 | +| :--- | :--- | :--- | +| `5671` | `30671` | AMQP over TLS(客户端连接) | +| `5672` | `30672` | AMQP 明文(内部备用,生产禁用) | +| `15671` | `31671` | Management UI over TLS | +| `15672` | `31672` | Management UI 明文(内部备用) | + +##### 2.2.4 用户与权限说明 + +用户定义在 `rabbitmq-users-config.yaml` 的 `definitions.json` 中,启动时通过 `load_definitions` 自动加载: + +| 用户 | 认证方式 | 权限 | 说明 | +| :--- | :--- | :--- | :--- | +| `coslight` | 密码 | administrator | 管理员,密码在 `rabbitmq-secret.yaml` | +| `eventrt-client` | X.509 证书(CN) | configure/read/write | EventRT 服务专用 | +| `modelrt-client` | X.509 证书(CN) | configure/read/write | ModelRT 服务专用 | +| `web-client` | X.509 证书(CN) | read/write | Web 客户端 | + +> **注意:** 证书认证用户的 `password_hash` 留空;RabbitMQ 通过 `ssl_cert_login_from = common_name` 将证书 CN 映射为用户名。 + +--- + +### 3\. 部署 EventRT(Kubernetes) + +所有资源部署在 `default` 命名空间,YAML 文件位于 `deploy/k8s/`。 + +#### 3.1 构建并推送镜像 + +镜像采用三阶段构建,最终基于 `scratch`: + +| 阶段 | 基础镜像 | 作用 | +| :--- | :--- | :--- | +| **builder** | `golang:1.26-alpine` | 编译 Go 二进制(`CGO_ENABLED=0`,`-trimpath -ldflags="-s -w"`) | +| **certs** | `alpine:3.21` | 提取 CA 证书、时区数据及非 root 用户定义(UID 默认 `1000`) | +| **runtime** | `scratch` | 仅含可执行文件与运行时依赖,无 shell、无包管理器 | + +**方式一:从源码构建并加载** + +```bash +# 在项目根目录执行(默认运行用户 UID=1000) +docker build -f deploy/dockerfile/eventrt.Dockerfile -t coslight/eventrt:latest . + +# 自定义运行用户 UID +docker build -f deploy/dockerfile/eventrt.Dockerfile \ + --build-arg USER_ID=2000 \ + -t coslight/eventrt:latest . + +# 加载到 Minikube(无需私有仓库) +minikube image load coslight/eventrt:latest +``` + +**方式二:直接加载已有本地镜像** + +Ubuntu 宿主机上已存在构建好的镜像(如 `eventrt:v1`)时,无需重新构建,直接导入 Minikube: + +```bash +# 确认本地镜像存在 +docker images eventrt:v1 + +# 加载到 Minikube +minikube image load eventrt:v1 + +# 验证镜像已进入 Minikube 缓存 +minikube image ls | grep eventrt +``` + +> **注意:** `deploy/k8s/eventrt-deployment.yaml` 中的 `image` 字段需与加载的镜像名称一致,并将 `imagePullPolicy` 设为 `Never`,防止 Minikube 尝试从远端拉取。 + +#### 3.1.1 镜像冒烟测试 + +```bash +# 查看镜像大小(scratch 镜像预期 ≤ 25 MB) +docker images coslight/eventrt:latest + +# 检查镜像元信息(确认 User、Cmd、架构) +docker inspect coslight/eventrt:latest + +# 验证二进制可执行(无 config 时程序报错退出属预期行为,说明镜像构建正常) +docker run --rm coslight/eventrt:latest + +# 挂载示例配置做完整启动验证(Ctrl+C 退出) +docker run --rm \ + -v "$(pwd)/configs/config.example.yaml:/app/configs/config.yaml" \ + -p 8081:8081 \ + coslight/eventrt:latest +``` + +> **注意:** `scratch` 镜像不含 shell,无法使用 `docker exec` 进入容器调试;如需排查问题,可临时将最终阶段改为 `alpine` 进行本地调试,确认后再切回 `scratch`。 + +#### 3.2 创建客户端证书 Secret + +在 RabbitMQ TLS 证书生成完成后(见 2.1),进入证书文件所在目录执行: + +```bash +sh deploy/k8s/eventrt-certs-secret.sh +``` + +该脚本等价于: + +```bash +kubectl create secret generic eventrt-certs \ + --from-file=ca_certificate.pem=./ca_certificate.pem \ + --from-file=eventrt_client_cert.pem=./eventrt_client_cert.pem \ + --from-file=eventrt_client_key.pem=./eventrt_client_key.pem +``` + +#### 3.3 部署 + +```bash +kubectl apply -f deploy/k8s/eventrt-secret.yaml +kubectl apply -f deploy/k8s/eventrt-configmap.yaml +kubectl apply -f deploy/k8s/eventrt-deployment.yaml +kubectl apply -f deploy/k8s/eventrt-service.yaml +``` + +等待 Pod 就绪: + +```bash +kubectl wait --for=condition=ready pod -l app=eventrt --timeout=120s +``` + +#### 3.4 配置说明 + +| 配置项 | 方式 | 说明 | +| :--- | :--- | :--- | +| `mongodb.password` | Secret `eventrt-secret` | 不写入 ConfigMap,通过环境变量 `MONGODB_PASSWORD` 注入 | +| `service.secret_key` | Secret `eventrt-secret` | 不写入 ConfigMap,通过环境变量 `SERVICE_SECRET_KEY` 注入 | +| RabbitMQ 客户端证书 | Secret `eventrt-certs` | 挂载至 `/app/configs/certs/` | +| `config.yaml` 其余配置 | ConfigMap `eventrt-config` | `rabbitmq.host` 已设为 K8s Service 名 `rabbitmq-service` | +| `K8S_NAMESPACE` / `K8S_NODE_NAME` | Downward API | 注入至日志全局字段 | + +#### 3.5 状态检查 + +```bash +# 查看 Pod 状态 +kubectl get pods -l app=eventrt + +# 查看启动日志 +kubectl logs -l app=eventrt --tail=50 + +# 查看 Service +kubectl get svc eventrt-service +``` + +#### 3.6 端口汇总 + +| NodePort | 说明 | +| :--- | :--- | +| `30081` | EventRT HTTP API | + +#### 3.7 清理 + +```bash +kubectl delete -f deploy/k8s/eventrt-service.yaml \ + -f deploy/k8s/eventrt-deployment.yaml \ + -f deploy/k8s/eventrt-configmap.yaml \ + -f deploy/k8s/eventrt-secret.yaml +kubectl delete secret eventrt-certs +``` + +--- + +### 4\. Mac 本地访问(SSH 隧道) + +`EventRT` 在 Mac 本地运行时,RabbitMQ 部署在 Ubuntu 宿主机(`192.168.1.101`)的 Minikube 中,MongoDB 直接运行在宿主机上。需通过 SSH 本地端口转发建立访问隧道。 + +#### 4.1 网络拓扑 + +```text +Mac 本地端口 ──SSH隧道──▶ Ubuntu 宿主机 (192.168.1.101) ──▶ Minikube NodePort (192.168.49.2) + └──▶ MongoDB Docker (宿主机 27017) +``` + +#### 4.2 建立隧道 + +```bash +ssh -L 27017:127.0.0.1:27017 \ + -L 5671:192.168.49.2:30671 \ + -L 15671:192.168.49.2:31671 \ + -L 4318:192.168.49.2:31318 \ + -L 16686:192.168.49.2:31686 \ + douxu@192.168.1.101 +``` + +如需后台静默运行(不占用终端): + +```bash +ssh -fN \ + -L 27017:127.0.0.1:27017 \ + -L 5671:192.168.49.2:30671 \ + -L 15671:192.168.49.2:31671 \ + -L 4318:192.168.49.2:31318 \ + -L 16686:192.168.49.2:31686 \ + douxu@192.168.1.101 +``` + +#### 4.3 端口映射说明 + +| Mac 本地端口 | 目标 | 服务 | 说明 | +| :--- | :--- | :--- | :--- | +| `27017` | 宿主机 `127.0.0.1:27017` | MongoDB | EventRT 事件数据库 | +| `5671` | Minikube `192.168.49.2:30671` | RabbitMQ AMQP | 消息队列 mTLS 连接 | +| `15671` | Minikube `192.168.49.2:31671` | RabbitMQ Management | 管理界面 `http://localhost:15671` | +| `4318` | Minikube `192.168.49.2:31318` | OTLP HTTP | OTel Trace 上报(Jaeger Collector) | +| `16686` | Minikube `192.168.49.2:31686` | Jaeger UI | 链路追踪查询 `http://localhost:16686` | + +> **注意:** 隧道建立后,本地 `config.yaml` 中所有服务地址填 `localhost:<本地端口>` 即可直接运行服务。 + +#### 4.4 关闭隧道 + +前台运行时直接 `Ctrl+C`;后台运行时查找并终止进程: + +```bash +# 找到 ssh 隧道进程 +ps aux | grep "ssh -fN" +# 终止(替换为实际 PID) +kill +``` + +--- + +### 5\. 本地运行(go run / 二进制) + +#### 5.1 配置服务配置文件 + +将 `configs/config.example.yaml` 复制为 `configs/config.yaml` 并按以下说明调整: + +##### 5.1.1 配置参数说明 + +| 类别 | 参数名 | 作用描述 | 示例值 | +| :--- | :--- | :--- | :--- | +| **MongoDB** | `host` | MongoDB 服务器地址 | `"localhost"` | +| | `port` | MongoDB 端口 | `27017` | +| | `user` | 连接用户名 | `"coslight"` | +| | `password` | 连接密码 | `"coslight@tj"` | +| | `database` | 业务数据库名 | `"eventdb"` | +| | `auth_db` | 鉴权数据库 | `"admin"` | +| | `timeout` | 连接超时(秒) | `10` | +| **RabbitMQ** | `host` | RabbitMQ 服务器地址 | `"localhost"` | +| | `port` | AMQP over TLS 端口 | `5671` | +| | `server_name` | TLS SNI / 证书 CN | `"rabbitmq-server"` | +| | `ca_cert_path` | CA 证书路径 | `"./configs/certs/ca_certificate.pem"` | +| | `client_cert_path` | 客户端证书路径 | `"./configs/certs/eventrt_client_cert.pem"` | +| | `client_key_path` | 客户端私钥路径 | `"./configs/certs/eventrt_client_key.pem"` | +| | `insecure_skip_verify` | 是否跳过证书校验(开发临时用) | `false` | +| **Logger (Zap)** | `mode` | 日志模式 `development` / `production` | `"development"` | +| | `level` | 最低日志级别 | `"debug"` | +| | `filepath` | 日志文件路径(空字符串输出到 stdout) | `""` | +| | `maxsize` | 单文件最大大小(MB) | `1` | +| | `maxbackups` | 保留旧文件最大个数 | `5` | +| | `maxage` | 保留旧文件最大天数 | `30` | +| | `compress` | 是否压缩备份文件 | `false` | +| **Service** | `service_addr` | HTTP 监听地址 | `":8081"` | +| | `service_name` | 服务名(日志/监控标识) | `"eventRT"` | +| | `secret_key` | 内部签名密钥 | `"eventrt_key"` | +| | `deploy_env` | 部署环境 `development` / `production` | `"development"` | +| **OTel** | `endpoint` | OTLP HTTP 上报地址(不含协议前缀) | `"localhost:4318"` | +| | `insecure` | 是否不启用 TLS | `true` | + +#### 5.2 编译 EventRT 服务 + +```bash +go build -o eventrt main.go +``` + +#### 5.3 启动服务 + +```bash +./eventrt +``` + +#### 5.4 检测服务启动日志 + +控制台输出 `starting EventRT server` 后即代表服务启动成功。 + +--- + +### 6\. 排查手册 + +#### 6.1 证书权限检查 + +确认客户端证书包含 `TLS Web Client Authentication`: + +```bash +openssl x509 -in configs/certs/eventrt_client_cert.pem -noout -text \ + | grep -A1 "Extended Key Usage" +``` + +预期输出包含 `TLS Web Client Authentication`。 + +#### 6.2 握手连通性验证 + +```bash +openssl s_client -connect localhost:5671 \ + -cert configs/certs/eventrt_client_cert.pem \ + -key configs/certs/eventrt_client_key.pem \ + -CAfile configs/certs/ca_certificate.pem +``` + +预期结果:看到 `Verify return code: 0 (ok)`。 + +#### 6.3 RabbitMQ 日志检查 + +连接成功后,RabbitMQ 日志应出现: + +``` +connection : user 'eventrt-client' authenticated and granted access to vhost '/' +``` + +查看 Pod 日志: + +```bash +kubectl logs -l app=rabbitmq --tail=50 | grep eventrt-client +``` + +#### 6.4 MongoDB 连通性验证 + +```bash +mongosh "mongodb://coslight:coslight@tj@localhost:27017/eventdb?authSource=admin" +``` + +预期进入 `mongosh` 提示符,执行 `show collections` 无报错。 + +--- + +### 7\. 后续操作(停止与清理) + +#### 7.1 本地 Docker 部署清理 + +适用于第 1 节使用 `docker run` 启动的 MongoDB 容器。 + +```bash +# 停止容器 +docker stop mongodb + +# 删除容器(容器内数据将同步丢失) +docker rm mongodb +``` + +#### 7.2 本地 go run 运行清理 + +适用于第 5 节以 `go run` 或编译后二进制方式在本地启动的 EventRT 服务。 + +前台运行时直接 `Ctrl+C` 终止;后台运行时查找并终止进程: + +```bash +# 终止 go run 启动的进程 +pkill -f "go run main.go" + +# 或终止编译后的二进制进程 +pkill eventrt +``` + +#### 7.3 K8s(Minikube) 部署清理 + +适用于第 1.2、2、3 节在 Minikube 中部署的所有资源。 + +##### 7.3.1 分服务清理 + +**仅停止(缩容至 0,PVC 数据与 Secret 保留)** + +将所有 Deployment 和 StatefulSet 缩容至 0 副本,Pod 停止运行但持久卷数据不删除,之后可直接缩容回 1 恢复服务。 + +```bash +# 停止所有 Deployment(EventRT / RabbitMQ) +kubectl scale deployment eventrt rabbitmq --replicas=0 + +# 停止 MongoDB StatefulSet(PVC 数据保留) +kubectl scale statefulset mongodb --replicas=0 +``` + +恢复时: + +```bash +kubectl scale deployment eventrt rabbitmq --replicas=1 +kubectl scale statefulset mongodb --replicas=1 +``` + +--- + +**永久清理(删除所有资源,数据不可恢复)** + +按部署顺序反向删除各服务资源: + +```bash +# EventRT 应用 +kubectl delete -f deploy/k8s/eventrt-service.yaml \ + -f deploy/k8s/eventrt-deployment.yaml \ + -f deploy/k8s/eventrt-configmap.yaml \ + -f deploy/k8s/eventrt-secret.yaml +kubectl delete secret eventrt-certs + +# MongoDB +kubectl delete -f deploy/k8s/mongodb-service.yaml \ + -f deploy/k8s/mongodb-statefulset.yaml \ + -f deploy/k8s/mongodb-pvc.yaml \ + -f deploy/k8s/mongodb-secret.yaml + +# RabbitMQ +kubectl delete -f deploy/mq/rabbitmq-service.yaml \ + -f deploy/mq/rabbitmq-deployment.yaml \ + -f deploy/mq/rabbitmq-users-config.yaml \ + -f deploy/mq/rabbitmq-config.yaml \ + -f deploy/mq/rabbitmq-secret.yaml +kubectl delete secret rabbitmq-certs +``` + +##### 7.3.2 一键清理 + +> **注意:** 此操作会删除 `deploy/k8s/` 和 `deploy/mq/` 下所有 YAML 对应的 K8s 资源,请确认后执行。 + +```bash +kubectl delete -f deploy/k8s/ -f deploy/mq/ +kubectl delete secret eventrt-certs rabbitmq-certs +``` diff --git a/deploy/dockerfile/eventrt.Dockerfile b/deploy/dockerfile/eventrt.Dockerfile new file mode 100644 index 0000000..1e67cb6 --- /dev/null +++ b/deploy/dockerfile/eventrt.Dockerfile @@ -0,0 +1,34 @@ +FROM golang:1.26-alpine AS builder +RUN apk --no-cache upgrade + +WORKDIR /app +COPY go.mod go.sum ./ +RUN GOPROXY="https://goproxy.cn,direct" go mod download +COPY . . +RUN CGO_ENABLED=0 GOOS=linux go build \ + -ldflags="-s -w" \ + -trimpath \ + -mod=readonly \ + -o eventrt main.go + +# prepare runtime dependencies in a pinned alpine stage so they can be +# copied into scratch without pulling any vulnerable os packages at run time. +FROM alpine:3.21 AS certs +ARG USER_ID=1000 +RUN apk --no-cache add ca-certificates tzdata && \ + adduser -D -u ${USER_ID} eventrt + +FROM scratch +# CA certificates required for TLS connections (RabbitMQ amqps://) +COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +# timezone data +COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo +# non-root user/group definitions +COPY --from=certs /etc/passwd /etc/passwd +COPY --from=certs /etc/group /etc/group + +WORKDIR /app +COPY --from=builder /app/eventrt ./eventrt + +USER eventrt +CMD ["/app/eventrt", "-eventRT_config_dir=/app/configs"] diff --git a/deploy/k8s/eventrt-certs-secret.sh b/deploy/k8s/eventrt-certs-secret.sh new file mode 100755 index 0000000..218d404 --- /dev/null +++ b/deploy/k8s/eventrt-certs-secret.sh @@ -0,0 +1,14 @@ +#!/bin/sh +# Create the eventrt client certificate secret. +# Run this script from the directory that contains the three cert files, +# or adjust the paths below to point at the actual files. +# +# Expected files (generated during RabbitMQ TLS setup): +# ca_certificate.pem +# eventrt_client_cert.pem +# eventrt_client_key.pem + +kubectl create secret generic eventrt-certs \ + --from-file=ca_certificate.pem=./ca_certificate.pem \ + --from-file=eventrt_client_cert.pem=./eventrt_client_cert.pem \ + --from-file=eventrt_client_key.pem=./eventrt_client_key.pem diff --git a/deploy/k8s/eventrt-configmap.yaml b/deploy/k8s/eventrt-configmap.yaml new file mode 100644 index 0000000..5375875 --- /dev/null +++ b/deploy/k8s/eventrt-configmap.yaml @@ -0,0 +1,47 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: eventrt-config +data: + config.yaml: | + rabbitmq: + ca_cert_path: "/app/configs/certs/ca_certificate.pem" + client_key_path: "/app/configs/certs/eventrt_client_key.pem" + client_key_password: "" + client_cert_path: "/app/configs/certs/eventrt_client_cert.pem" + insecure_skip_verify: false + server_name: "rabbitmq-server" + user: "" + password: "" + host: "rabbitmq-service" + port: 5671 + + mongodb: + host: "mongodb-service" + port: 27017 + user: "admin" + password: "" # injected via env MONGODB_PASSWORD + database: "eventdb" + auth_db: "admin" + timeout: 10 + + logger: + mode: "production" + level: "info" + filepath: "" + maxsize: 100 + maxbackups: 5 + maxage: 30 + compress: false + loki: + endpoint: "" # Promtail handles log collection in K8s, direct push disabled + + service: + service_addr: ":8081" + service_name: "eventRT" + secret_key: "" # injected via env SERVICE_SECRET_KEY + deploy_env: "development" + + otel: + endpoint: "jaeger-service:4318" + insecure: true diff --git a/deploy/k8s/eventrt-deployment.yaml b/deploy/k8s/eventrt-deployment.yaml new file mode 100644 index 0000000..ebfa3e2 --- /dev/null +++ b/deploy/k8s/eventrt-deployment.yaml @@ -0,0 +1,91 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: eventrt + labels: + app: eventrt +spec: + replicas: 1 + selector: + matchLabels: + app: eventrt + template: + metadata: + labels: + app: eventrt + spec: + containers: + - name: eventrt + image: eventrt:v1 + imagePullPolicy: IfNotPresent + command: ["/app/eventrt"] + args: + - "-eventRT_config_dir=/app/configs" + - "-eventRT_config_name=config" + - "-eventRT_config_type=yaml" + ports: + - containerPort: 8081 + env: + # Downward API — injected into every log line by logger + - name: K8S_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: K8S_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + # HOSTNAME is set automatically by K8s to the pod name + # Sensitive values injected from Secret so they stay out of ConfigMap + - name: MONGODB_PASSWORD + valueFrom: + secretKeyRef: + name: eventrt-secret + key: mongodb-password + - name: SERVICE_SECRET_KEY + valueFrom: + secretKeyRef: + name: eventrt-secret + key: secret-key + volumeMounts: + - name: config + mountPath: /app/configs/config.yaml + subPath: config.yaml + readOnly: true + - name: certs + mountPath: /app/configs/certs + readOnly: true + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 512Mi + securityContext: + runAsUser: 1000 + runAsNonRoot: true + readOnlyRootFilesystem: true + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + livenessProbe: + tcpSocket: + port: 8081 + initialDelaySeconds: 10 + periodSeconds: 30 + failureThreshold: 3 + readinessProbe: + tcpSocket: + port: 8081 + initialDelaySeconds: 5 + periodSeconds: 10 + failureThreshold: 3 + volumes: + - name: config + configMap: + name: eventrt-config + - name: certs + secret: + secretName: eventrt-certs diff --git a/deploy/k8s/eventrt-secret.yaml b/deploy/k8s/eventrt-secret.yaml new file mode 100644 index 0000000..9d176c5 --- /dev/null +++ b/deploy/k8s/eventrt-secret.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: eventrt-secret +type: Opaque +stringData: + mongodb-password: "coslight" + secret-key: "eventrt_key" diff --git a/deploy/k8s/eventrt-service.yaml b/deploy/k8s/eventrt-service.yaml new file mode 100644 index 0000000..25c0454 --- /dev/null +++ b/deploy/k8s/eventrt-service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: eventrt-service + labels: + app: eventrt +spec: + type: NodePort + selector: + app: eventrt + ports: + - name: http + port: 8081 + targetPort: 8081 + nodePort: 30081 diff --git a/deploy/k8s/mongodb-pvc.yaml b/deploy/k8s/mongodb-pvc.yaml new file mode 100644 index 0000000..d009b0a --- /dev/null +++ b/deploy/k8s/mongodb-pvc.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: mongodb-data +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi diff --git a/deploy/k8s/mongodb-secret.yaml b/deploy/k8s/mongodb-secret.yaml new file mode 100644 index 0000000..53363f2 --- /dev/null +++ b/deploy/k8s/mongodb-secret.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: mongodb-secret +type: Opaque +stringData: + MONGO_INITDB_ROOT_USERNAME: admin + MONGO_INITDB_ROOT_PASSWORD: coslight diff --git a/deploy/k8s/mongodb-service.yaml b/deploy/k8s/mongodb-service.yaml new file mode 100644 index 0000000..8345287 --- /dev/null +++ b/deploy/k8s/mongodb-service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: mongodb-service + labels: + app: mongodb +spec: + type: NodePort + selector: + app: mongodb + ports: + - name: mongodb + port: 27017 + targetPort: 27017 + nodePort: 30017 diff --git a/deploy/k8s/mongodb-statefulset.yaml b/deploy/k8s/mongodb-statefulset.yaml new file mode 100644 index 0000000..708caa6 --- /dev/null +++ b/deploy/k8s/mongodb-statefulset.yaml @@ -0,0 +1,61 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: mongodb + labels: + app: mongodb +spec: + serviceName: mongodb + replicas: 1 + selector: + matchLabels: + app: mongodb + template: + metadata: + labels: + app: mongodb + spec: + containers: + - name: mongodb + image: mongo:7.0 + imagePullPolicy: IfNotPresent + ports: + - name: mongodb + containerPort: 27017 + envFrom: + - secretRef: + name: mongodb-secret + volumeMounts: + - name: mongodb-data + mountPath: /data/db + readinessProbe: + exec: + command: + - mongosh + - --eval + - "db.adminCommand('ping')" + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 10 + failureThreshold: 12 + livenessProbe: + exec: + command: + - mongosh + - --eval + - "db.adminCommand('ping')" + initialDelaySeconds: 120 + periodSeconds: 10 + timeoutSeconds: 30 + failureThreshold: 5 + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi + volumes: + - name: mongodb-data + persistentVolumeClaim: + claimName: mongodb-data diff --git a/deploy/mq/client.conf b/deploy/mq/client.conf new file mode 100644 index 0000000..74244bf --- /dev/null +++ b/deploy/mq/client.conf @@ -0,0 +1,14 @@ +[req] +distinguished_name = req_distinguished_name +prompt = no + +[req_distinguished_name] +C = CN +ST = Beijing +L = Beijing +O = coslight +CN = web-client + +[v3_client] +keyUsage = critical, digitalSignature, keyEncipherment +extendedKeyUsage = clientAuth diff --git a/deploy/mq/eventrt.conf b/deploy/mq/eventrt.conf new file mode 100644 index 0000000..c326b54 --- /dev/null +++ b/deploy/mq/eventrt.conf @@ -0,0 +1,14 @@ +[req] +distinguished_name = req_distinguished_name +prompt = no + +[req_distinguished_name] +C = CN +ST = Beijing +L = Beijing +O = coslight +CN = eventrt-client + +[v3_client] +keyUsage = critical, digitalSignature, keyEncipherment +extendedKeyUsage = clientAuth diff --git a/deploy/mq/modelrt.conf b/deploy/mq/modelrt.conf new file mode 100644 index 0000000..2e4ada9 --- /dev/null +++ b/deploy/mq/modelrt.conf @@ -0,0 +1,14 @@ +[req] +distinguished_name = req_distinguished_name +prompt = no + +[req_distinguished_name] +C = CN +ST = Beijing +L = Beijing +O = coslight +CN = modelrt-client + +[v3_client] +keyUsage = critical, digitalSignature, keyEncipherment +extendedKeyUsage = clientAuth diff --git a/deploy/mq/plugins.sh b/deploy/mq/plugins.sh new file mode 100644 index 0000000..f40866e --- /dev/null +++ b/deploy/mq/plugins.sh @@ -0,0 +1 @@ +kubectl create configmap rabbit-plugins-conf --from-literal=enabled_plugins="[rabbitmq_auth_mechanism_ssl, rabbitmq_management, rabbitmq_management_agent, rabbitmq_prometheus, rabbitmq_web_dispatch]." \ No newline at end of file diff --git a/deploy/mq/rabbitmq-config.yaml b/deploy/mq/rabbitmq-config.yaml new file mode 100644 index 0000000..a5cbad7 --- /dev/null +++ b/deploy/mq/rabbitmq-config.yaml @@ -0,0 +1,33 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: rabbitmq-config +data: + rabbitmq.conf: | + # 确保允许PLAIN认证 + auth_mechanisms.1 = PLAIN + auth_mechanisms.2 = AMQPLAIN + auth_mechanisms.3 = EXTERNAL + # 允许admin用户通过远程方式连接 + loopback_users.admin = false + # 默认心跳和监听配置可在此扩展 + # 确定 ssl 连接时验证使用的用户名 + ssl_cert_login_from = common_name + # 开启此项配置会导致只能通过TLS端口访问 + listeners.tcp = none + listeners.ssl.default = 5671 + # default user config + load_definitions = /etc/rabbitmq/definitions.json + # ssl config + ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem + ssl_options.certfile = /etc/rabbitmq/certs/server_certificate.pem + ssl_options.keyfile = /etc/rabbitmq/certs/server_key.pem + ssl_options.verify = verify_peer + ssl_options.fail_if_no_peer_cert = true + # management config + management.ssl.port = 15671 + management.ssl.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem + management.ssl.certfile = /etc/rabbitmq/certs/server_certificate.pem + management.ssl.keyfile = /etc/rabbitmq/certs/server_key.pem + management.ssl.verify = verify_peer + management.ssl.fail_if_no_peer_cert = true diff --git a/deploy/mq/rabbitmq-deployment.yaml b/deploy/mq/rabbitmq-deployment.yaml new file mode 100644 index 0000000..2b741bd --- /dev/null +++ b/deploy/mq/rabbitmq-deployment.yaml @@ -0,0 +1,76 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: eventrt-rabbitmq +spec: + replicas: 1 + selector: + matchLabels: + app: rabbitmq + template: + metadata: + labels: + app: rabbitmq + spec: + containers: + - name: rabbitmq + image: rabbitmq:4.1.1-management-alpine + imagePullPolicy: IfNotPresent + ports: + - containerPort: 4369 + - containerPort: 5671 + - containerPort: 5672 # AMQP + - containerPort: 15671 + - containerPort: 15672 # Management UI + - containerPort: 15691 + - containerPort: 15692 + - containerPort: 25672 + env: + - name: RABBITMQ_DEFAULT_USER + valueFrom: + secretKeyRef: + name: rabbitmq-secret + key: rabbitmq-user + - name: RABBITMQ_DEFAULT_PASS + valueFrom: + secretKeyRef: + name: rabbitmq-secret + key: rabbitmq-pass + - name: RABBITMQ_ERLANG_COOKIE + valueFrom: + secretKeyRef: + name: rabbitmq-secret + key: erlang-cookie + - name: RABBITMQ_DEFAULT_VHOST + value: "/" + volumeMounts: + - name: rabbitmq-certs-volume + mountPath: /etc/rabbitmq/certs + readOnly: true + - name: rabbitmq-config-volume + mountPath: /etc/rabbitmq/rabbitmq.conf + subPath: rabbitmq.conf + readOnly: true + - name: plugins-config-volume + mountPath: /etc/rabbitmq/enabled_plugins + subPath: enabled_plugins + - name: users-config-volume + mountPath: /etc/rabbitmq/definitions.json + subPath: definitions.json + - name: rabbitmq-data + mountPath: /var/lib/rabbitmq + volumes: + - name: rabbitmq-certs-volume + secret: + secretName: rabbitmq-certs + - name: rabbitmq-config-volume + configMap: + name: rabbitmq-config + - name: plugins-config-volume + configMap: + name: rabbit-plugins-conf + - name: users-config-volume + configMap: + name: rabbitmq-users-definitions + - name: rabbitmq-data + emptyDir: {} diff --git a/deploy/mq/rabbitmq-secret.yaml b/deploy/mq/rabbitmq-secret.yaml new file mode 100644 index 0000000..eae46a1 --- /dev/null +++ b/deploy/mq/rabbitmq-secret.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: Secret +metadata: + name: rabbitmq-secret +type: Opaque +stringData: + rabbitmq-user: "coslight" + rabbitmq-pass: "coslight@tj" + erlang-cookie: "secret-erlang-cookie" diff --git a/deploy/mq/rabbitmq-service.yaml b/deploy/mq/rabbitmq-service.yaml new file mode 100644 index 0000000..6cdb259 --- /dev/null +++ b/deploy/mq/rabbitmq-service.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: Service +metadata: + name: rabbitmq-service +spec: + type: NodePort # 在 Minikube 中使用 NodePort 方便外部访问 + selector: + app: rabbitmq + ports: + - name: amqp-ssl + protocol: TCP + port: 5671 + targetPort: 5671 + nodePort: 30671 + - name: amqp + protocol: TCP + port: 5672 + targetPort: 5672 + nodePort: 30672 + - name: management-ssl + protocol: TCP + port: 15671 + targetPort: 15671 + nodePort: 31671 + - name: management + protocol: TCP + port: 15672 + targetPort: 15672 + nodePort: 31672 diff --git a/deploy/mq/rabbitmq-users-config.yaml b/deploy/mq/rabbitmq-users-config.yaml new file mode 100644 index 0000000..8de5f30 --- /dev/null +++ b/deploy/mq/rabbitmq-users-config.yaml @@ -0,0 +1,77 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: rabbitmq-users-definitions +data: + definitions.json: | + { + "users": [ + { + "name": "coslight", + "password_hash": "Gl2XVEJwPwDZQF8ZhsYnvm83wMkdftY3/raxyntdZueyx/Uv", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": ["administrator"] + }, + { + "name": "web-client", + "password_hash": "", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": ["management"] + }, + { + "name": "modelrt-client", + "password_hash": "", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": ["management"] + }, + { + "name": "eventrt-client", + "password_hash": "", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": ["management"] + } + ], + "vhosts": [ { "name": "/" } ], + "permissions": [ + { + "user": "coslight", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + }, + { + "user": "web-client", + "vhost": "/", + "configure": "^$", + "write": ".*", + "read": ".*" + }, + { + "user": "modelrt-client", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + }, + { + "user": "eventrt-client", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + } + ], + "topic_permissions": [], + "parameters": [], + "global_parameters": [ + { + "name": "cluster_name", + "value": "evnetrt-rabbitmq-cluster" + } + ], + "policies": [], + "queues": [], + "exchanges": [], + "bindings": [] + } diff --git a/deploy/mq/secert.sh b/deploy/mq/secert.sh new file mode 100644 index 0000000..40cf1e3 --- /dev/null +++ b/deploy/mq/secert.sh @@ -0,0 +1,4 @@ +kubectl create secret generic rabbitmq-certs \ + --from-file=ca_certificate.pem=./certs/ca_certificate.pem \ + --from-file=server_certificate.pem=./certs/server_certificate.pem \ + --from-file=server_key.pem=./certs/server_key.pem \ No newline at end of file diff --git a/deploy/mq/server.conf b/deploy/mq/server.conf new file mode 100644 index 0000000..cf57908 --- /dev/null +++ b/deploy/mq/server.conf @@ -0,0 +1,22 @@ +[req] +distinguished_name = req_distinguished_name +prompt = no + +[req_distinguished_name] +C = CN +ST = Beijing +L = Beijing +O = coslight +CN = rabbitmq-server + +[v3_server] +keyUsage = critical, digitalSignature, keyEncipherment +extendedKeyUsage = serverAuth, clientAuth +subjectAltName = @alt_names + +[alt_names] +DNS.1 = rabbitmq-server +DNS.2 = rabbitmq-service.default.svc.cluster.local +DNS.3 = localhost +IP.1 = 192.168.49.2 +IP.2 = 127.0.0.1 diff --git a/docs/event-flow-analysis.md b/docs/event-flow-analysis.md new file mode 100644 index 0000000..ab276ea --- /dev/null +++ b/docs/event-flow-analysis.md @@ -0,0 +1,141 @@ +# eventRT 事件流转与存储分析 + +## 一、整体架构 + +```bash +modelRT 服务 + │ + │ 发布事件 (AMQPS + mTLS) + ▼ +RabbitMQ (exchange: event-exchange) + │ routing key: event.#.updown.* + ▼ +Queue: event-up-down-queue + │ + ▼ +eventRT (消费者) + │ + │ 解析 + 写入 + ▼ +MongoDB (eventdb.alarms) +``` + +--- + +## 二、事件数据结构 (`event/event.go`) + +| 字段 | 类型 | 说明 | +| :--- | :--- | :--- | +| `event` | string | 事件名称 | +| `event_uuid` | string | 唯一标识符 | +| `type` | int | 事件类型 | +| `priority` | int | 优先级 0-9 | +| `status` | int | 事件状态(见第四节) | +| `category` | string | 可选模板参数 | +| `timestamp` | int64 | 毫秒级 Unix 时间戳 | +| `from` | string | 来源:station / platform / msa | +| `condition` | map | 触发条件(如阈值、当前值) | +| `attached_subscriptions` | []any | 关联订阅信息 | +| `result` | map | 事件分析结果 | +| `operations` | []OperationRecord | 操作历史(见第四节) | +| `origin` | map | 子站原始告警数据 (CIM Alarm) | + +`OperationRecord` 结构: + +```go +Action string // 动作类型,如 "acknowledgment" +Op string // 操作人标识 +TS int64 // 操作时间(毫秒) +``` + +--- + +## 三、事件流转流程 (`event/up_down_limit_alarm.go`) + +```bash +main.go 启动 + └─ go event.ReceiptUpDownLimitAlarm(ctx) // goroutine 异步运行 + +ReceiptUpDownLimitAlarm(): + 1. GetConn() // 获取全局 RabbitMQ 连接 + 2. conn.Channel() // 创建 Channel + 3. channel.QueueBind( // 绑定路由 + queue = "event-up-down-queue", + exchange = "event-exchange", + key = "event.#.updown.*" + ) + 4. channel.Qos(1, 0, false) // 每次只预取 1 条,防止积压 + 5. channel.Consume(autoAck=false) // 手动 ACK 模式 + 6. for { select msg / ctx.Done() } // 事件循环 + +processAlarmEventMessage(msg): + 1. json.Unmarshal → EventRecord // 结构体反序列化 + 2. InsertOne(ctx, alarmEvent) // ⚠️ 第一次写入(结构体方式) + 3. bson.UnmarshalExtJSON → bson.D // 原始 BSON 反序列化 + 4. InsertOne(ctx, doc) // 第二次写入(原始文档方式) + 5. msg.Ack(false) // 手动确认消息 +``` + +--- + +## 四、事件状态流转 + +**现状:`Status` 字段有定义,但无状态常量,无状态机逻辑。** + +从 `EventRecord.Status int` + `Operations []OperationRecord` 的设计意图推断,其预期的状态流转应为: + +```bash +[未定义初始态] + │ + │ 由 modelRT 发布事件写入 + ▼ + status = ? (活跃/未确认) + │ + │ Operations 中追加 {action: "acknowledgment", op: 用户, ts: 时间} + ▼ + status = ? (已确认) + │ + │ 后续处理(result 填充等) + ▼ + status = ? (已关闭/已解决) +``` + +目前**状态码值没有任何枚举常量定义**,`constants/` 包下只有 trace、log_mode、deploy_mode,状态机逻辑尚未实现(代码有多处 `TODO`)。 + +--- + +## 五、存储方式 + +- **数据库**:MongoDB +- **库/集合**:`eventdb` / `alarms` +- **写入时机**:消息到达后立即插入,无缓冲 + +**当前代码存在一个明显问题**(`up_down_limit_alarm.go:99-115`): + +```go +// 第一次:以结构体插入(会被 Go 零值污染,如 omitempty 未设置的字段) +mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent) + +// 第二次:以原始 BSON 插入(保留原始字段,是更合理的方式) +mongodbClient.Database(dbName).Collection(collectionName).InsertOne(ctx, doc) +``` + +每条消息会**插入两份文档**到 `eventdb.alarms`,且 `msg.Ack` 只在第二次成功后才调用。这是个明显的 TODO 遗留问题,第一次插入应当被删除。 + +--- + +## 六、连接可靠性设计 + +RabbitMQ 连接有自动重连机制(`mq/mq_init.go:82`):断连后每 5 秒重试一次,直到 context 取消。MongoDB 目前无重连逻辑,直接依赖驱动内置能力。 + +--- + +## 七、总结 + +| 维度 | 现状 | +| :--- | :--- | +| 事件流转 | modelRT → RabbitMQ topic exchange → eventRT 消费 → MongoDB | +| 消息可靠性 | 手动 ACK + QoS=1 + 断连重连 | +| 存储方式 | MongoDB `eventdb.alarms`,目前每条消息重复写入两次(bug) | +| 状态流转 | `Status` + `Operations` 字段已建模,但状态常量和状态机**尚未实现** | +| 主要 TODO | 删除重复插入、定义状态枚举、实现状态流转逻辑、补充推送前端逻辑 | diff --git a/event/event.go b/event/event.go new file mode 100644 index 0000000..0ab1007 --- /dev/null +++ b/event/event.go @@ -0,0 +1,41 @@ +// Package event define real time data evnet operation functions +package event + +// EventRecord define struct for CIM event record +type EventRecord struct { + // 事件名称 + EventName string `json:"event" bson:"event"` + // 事件唯一标识符 + EventUUID string `json:"event_uuid" bson:"event_uuid"` + // 事件类型 + Type int `json:"type" bson:"type"` + // 事件优先级 (0-9) + Priority int `json:"priority" bson:"priority"` + // 事件状态 + Status int `json:"status" bson:"status"` + // 是否已持久化到数据库,由 eventRT 消费并落库后置为 true + IsPersisted bool `json:"is_persisted" bson:"is_persisted"` + // 可选模板参数 + Category string `json:"category,omitempty" bson:"category,omitempty"` + // 毫秒级时间戳 (Unix epoch) + Timestamp int64 `json:"timestamp" bson:"timestamp"` + // 事件来源 (station, platform, msa) + From string `json:"from" bson:"from"` + // 事件场景描述对象 (如阈值、当前值) + Condition map[string]any `json:"condition" bson:"condition"` + // 与事件相关的订阅信息 + AttachedSubscriptions []any `json:"attached_subscriptions" bson:"attached_subscriptions"` + // 事件分析结果对象 + Result map[string]any `json:"result,omitempty" bson:"result,omitempty"` + // 操作历史记录 (CIM ActivityRecord) + Operations []OperationRecord `json:"operations" bson:"operations"` + // 子站告警原始数据 (CIM Alarm 数据) + Origin map[string]any `json:"origin,omitempty" bson:"origin,omitempty"` +} + +// OperationRecord 描述对事件的操作记录,如确认(acknowledgment)等 +type OperationRecord struct { + Action string `json:"action" bson:"action"` // 执行的动作,如 "acknowledgment" + Op string `json:"op" bson:"op"` // 操作人/操作账号标识 + TS int64 `json:"ts" bson:"ts"` // 操作发生的毫秒时间戳 +} diff --git a/event/publish_ui_event.go b/event/publish_ui_event.go new file mode 100644 index 0000000..436f207 --- /dev/null +++ b/event/publish_ui_event.go @@ -0,0 +1,81 @@ +// Package event define real time data evnet operation functions +package event + +import ( + "context" + "encoding/json" + "time" + + "eventRT/constants" + "eventRT/logger" + "eventRT/mq" + + amqp "github.com/rabbitmq/amqp091-go" + "go.opentelemetry.io/otel" +) + +// initUIEventChannel declares the fanout exchange and queue used by UI consumers, +// then returns a channel ready for publishing. +func initUIEventChannel(ctx context.Context) (*amqp.Channel, error) { + ch, err := mq.GetConn().Channel() + if err != nil { + logger.Error(ctx, "open rabbitMQ channel for UI event publish failed", "error", err) + return nil, err + } + + err = ch.ExchangeDeclare(constants.EventUIExchangeName, "fanout", true, false, false, false, nil) + if err != nil { + logger.Error(ctx, "declare UI event exchange failed", "error", err) + return nil, err + } + + _, err = ch.QueueDeclare(constants.EventUIQueueName, true, false, false, false, nil) + if err != nil { + logger.Error(ctx, "declare UI event queue failed", "error", err) + return nil, err + } + + // fanout exchange routes to all bound queues regardless of routing key + err = ch.QueueBind(constants.EventUIQueueName, "", constants.EventUIExchangeName, false, nil) + if err != nil { + logger.Error(ctx, "bind UI event queue to exchange failed", "error", err) + return nil, err + } + + return ch, nil +} + +// PublishEventToUI sets the event status to Reported and publishes the record +// to the UI-facing fanout exchange. Intended to be called after the event has +// been successfully persisted to the database. +func PublishEventToUI(ctx context.Context, ch *amqp.Channel, record *EventRecord) error { + body, err := json.Marshal(record) + if err != nil { + logger.Error(ctx, "marshal event record for UI publish failed", "event_uuid", record.EventUUID, "error", err) + return err + } + + headers := amqp.Table{} + otel.GetTextMapPropagator().Inject(ctx, amqpHeaderCarrier(headers)) + + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + err = ch.PublishWithContext(pubCtx, + constants.EventUIExchangeName, + "", // fanout exchange ignores routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: body, + Headers: headers, + }) + if err != nil { + logger.Error(ctx, "publish event to UI exchange failed", "event_uuid", record.EventUUID, "error", err) + return err + } + + logger.Info(ctx, "event published to UI exchange", "event_uuid", record.EventUUID, "status", record.Status) + return nil +} diff --git a/event/up_down_limit_alarm.go b/event/up_down_limit_alarm.go new file mode 100644 index 0000000..84f711a --- /dev/null +++ b/event/up_down_limit_alarm.go @@ -0,0 +1,153 @@ +// Package event define real time data evnet operation functions +package event + +import ( + "context" + "encoding/json" + + "eventRT/constants" + "eventRT/database" + "eventRT/logger" + "eventRT/mq" + + amqp "github.com/rabbitmq/amqp091-go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" +) + +// amqpHeaderCarrier adapts amqp.Table to propagation.TextMapCarrier for trace context extraction. +type amqpHeaderCarrier amqp.Table + +func (c amqpHeaderCarrier) Get(key string) string { + val, ok := amqp.Table(c)[key] + if !ok { + return "" + } + str, _ := val.(string) + return str +} + +func (c amqpHeaderCarrier) Set(key, value string) { + amqp.Table(c)[key] = value +} + +func (c amqpHeaderCarrier) Keys() []string { + keys := make([]string, 0, len(c)) + for k := range c { + keys = append(keys, k) + } + return keys +} + +var _ propagation.TextMapCarrier = amqpHeaderCarrier{} + +const ( + queueName = "event-up-down-queue" + exchangeName = "event-exchange" +) + +// ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm +func ReceiptUpDownLimitAlarm(ctx context.Context) { + conn := mq.GetConn() + if conn == nil { + logger.Error(ctx, "get rabbitMQ connection for receiving alarms failed") + return + } + + channel, err := conn.Channel() + if err != nil { + logger.Error(ctx, "open rabbitMQ channel for consumer failed", "error", err) + return + } + defer channel.Close() + + uiChannel, err := initUIEventChannel(ctx) + if err != nil { + logger.Error(ctx, "init UI event channel failed", "error", err) + return + } + defer uiChannel.Close() + + err = channel.QueueBind( + queueName, // 队列名 + "event.#.updown.*", + exchangeName, // 交换机名 + false, + nil, + ) + if err != nil { + logger.Error(ctx, "channel bind queue and exchange failed", "error", err) + return + } + + err = channel.Qos(1, 0, false) + if err != nil { + logger.Error(ctx, "set rabbitMQ Qos config failed", "error", err) + } + + // registered consumer + msgs, err := channel.Consume( + queueName, // queue + "", // consumer tag + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + logger.Error(ctx, "failed to register a consumer", "error", err) + return + } + + logger.Info(ctx, "started receiving up-down limit alarms from rabbitMQ") + + for { + select { + case <-ctx.Done(): + logger.Info(ctx, "receipt up-down limit alarm stopped by context cancel") + return + case msg, ok := <-msgs: + if !ok { + logger.Error(ctx, "message channel closed, exiting consumer loop") + return + } + processAlarmEventMessage(ctx, msg, uiChannel) + } + } +} + +func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery, uiCh *amqp.Channel) { + // extract upstream trace context injected by modelRT into AMQP headers + ctx = otel.GetTextMapPropagator().Extract(ctx, amqpHeaderCarrier(msg.Headers)) + ctx, span := otel.Tracer("eventRT/event").Start(ctx, "processAlarmEventMessage") + defer span.End() + + logger.Info(ctx, "received event alarm from modelRT up and down limit compute process") + mongodbClient := database.GetMongoClient() + + var alarmEvent EventRecord + if err := json.Unmarshal(msg.Body, &alarmEvent); err != nil { + logger.Error(ctx, "unmarshal alarm event message failed", "error", err) + return + } + + alarmEvent.IsPersisted = true + alarmEvent.Status = constants.EventStatusReported + + result, err := mongodbClient.Database(constants.EventDBName).Collection(constants.EventCollectionName).InsertOne(ctx, alarmEvent) + if err != nil { + logger.Error(ctx, "failed to insert alarm event into database", "error", err) + return + } + logger.Info(ctx, "alarm event inserted into database", "result", result) + + if err := msg.Ack(false); err != nil { + logger.Error(ctx, "ack alarm event message failed", "error", err) + return + } + + if err := PublishEventToUI(ctx, uiCh, &alarmEvent); err != nil { + logger.Error(ctx, "publish alarm event to UI failed", "event_uuid", alarmEvent.EventUUID, "error", err) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ab36524 --- /dev/null +++ b/go.mod @@ -0,0 +1,83 @@ +module eventRT + +go 1.26.3 + +require ( + github.com/gin-gonic/gin v1.11.0 + github.com/natefinch/lumberjack v2.0.0+incompatible + github.com/rabbitmq/amqp091-go v1.10.0 + github.com/spf13/viper v1.21.0 + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 + go.mongodb.org/mongo-driver v1.17.9 + go.opentelemetry.io/otel v1.43.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 + go.opentelemetry.io/otel/sdk v1.43.0 + go.opentelemetry.io/otel/trace v1.43.0 + go.uber.org/zap v1.27.1 +) + +require ( + github.com/BurntSushi/toml v1.6.0 // indirect + github.com/bytedance/sonic v1.14.0 // indirect + github.com/bytedance/sonic/loader v0.3.0 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.8 // indirect + github.com/gin-contrib/sse v1.1.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.27.0 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/goccy/go-yaml v1.18.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/quic-go/qpack v0.5.1 // indirect + github.com/quic-go/quic-go v0.54.0 // indirect + github.com/sagikazarmark/locafero v0.11.0 // indirect + github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect + github.com/spf13/afero v1.15.0 // indirect + github.com/spf13/cast v1.10.0 // indirect + github.com/spf13/pflag v1.0.10 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.3.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect + go.opentelemetry.io/otel/metric v1.43.0 // indirect + go.opentelemetry.io/proto/otlp v1.10.0 // indirect + go.uber.org/mock v0.5.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/arch v0.20.0 // indirect + golang.org/x/crypto v0.49.0 // indirect + golang.org/x/mod v0.33.0 // indirect + golang.org/x/net v0.52.0 // indirect + golang.org/x/sync v0.20.0 // indirect + golang.org/x/sys v0.42.0 // indirect + golang.org/x/text v0.35.0 // indirect + golang.org/x/tools v0.42.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect + google.golang.org/grpc v1.80.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7c1fd03 --- /dev/null +++ b/go.sum @@ -0,0 +1,215 @@ +github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk= +github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ= +github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA= +github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= +github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= +github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= +github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w= +github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM= +github.com/gin-gonic/gin v1.11.0 h1:OW/6PLjyusp2PPXtyxKHU0RbX6I/l28FTdDlae5ueWk= +github.com/gin-gonic/gin v1.11.0/go.mod h1:+iq/FyxlGzII0KHiBGjuNn4UNENUlKbGlNmc+W50Dls= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4= +github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo= +github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= +github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw= +github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM= +github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk= +github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= +github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= +github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= +github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg= +github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc= +github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik= +github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 h1:+jumHNA0Wrelhe64i8F6HNlS8pkoyMv5sreGx2Ry5Rw= +github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8/go.mod h1:3n1Cwaq1E1/1lhQhtRK2ts/ZwZEhjcQeJQ1RuC6Q/8U= +github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I= +github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg= +github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= +github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= +github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= +github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= +github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.3.0 h1:Qd2W2sQawAfG8XSvzwhBeoGq71zXOC/Q1E9y/wUcsUA= +github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.17.9 h1:IexDdCuuNJ3BHrELgBlyaH9p60JXAvdzWR128q+U5tU= +go.mongodb.org/mongo-driver v1.17.9/go.mod h1:LlOhpH5NUEfhxcAwG0UEkMqwYcc4JU18gtCdGudk/tQ= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= +go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 h1:88Y4s2C8oTui1LGM6bTWkw0ICGcOLCAI5l6zsD1j20k= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0/go.mod h1:Vl1/iaggsuRlrHf/hfPJPvVag77kKyvrLeD10kpMl+A= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 h1:3iZJKlCZufyRzPzlQhUIWVmfltrXuGyfjREgGP3UUjc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0/go.mod h1:/G+nUPfhq2e+qiXMGxMwumDrP5jtzU+mWN7/sjT2rak= +go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= +go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= +go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg= +go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg= +go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw= +go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A= +go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= +go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= +go.opentelemetry.io/proto/otlp v1.10.0 h1:IQRWgT5srOCYfiWnpqUYz9CVmbO8bFmKcwYxpuCSL2g= +go.opentelemetry.io/proto/otlp v1.10.0/go.mod h1:/CV4QoCR/S9yaPj8utp3lvQPoqMtxXdzn7ozvvozVqk= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= +go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c= +golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= +golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= +golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= +google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA= +google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= +google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/handler/event_query.go b/handler/event_query.go new file mode 100644 index 0000000..da4014f --- /dev/null +++ b/handler/event_query.go @@ -0,0 +1,40 @@ +// Package handler define HTTP handler functions for eventRT service +package handler + +import ( + "errors" + "net/http" + + "eventRT/constants" + "eventRT/database" + "eventRT/event" + "eventRT/logger" + + "github.com/gin-gonic/gin" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +// GetEventHandler handles GET /events/:event_uuid +func GetEventHandler(c *gin.Context) { + ctx := c.Request.Context() + eventUUID := c.Param("event_uuid") + + var record event.EventRecord + err := database.GetMongoClient(). + Database(constants.EventDBName). + Collection(constants.EventCollectionName). + FindOne(ctx, bson.M{"event_uuid": eventUUID}). + Decode(&record) + if err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + c.JSON(http.StatusNotFound, gin.H{"error": "event not found"}) + return + } + logger.Error(ctx, "query event by uuid failed", "event_uuid", eventUUID, "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"}) + return + } + + c.JSON(http.StatusOK, record) +} diff --git a/handler/event_status.go b/handler/event_status.go new file mode 100644 index 0000000..f2c28ed --- /dev/null +++ b/handler/event_status.go @@ -0,0 +1,133 @@ +// Package handler define HTTP handler functions for eventRT service +package handler + +import ( + "context" + "errors" + "net/http" + "time" + + "eventRT/constants" + "eventRT/database" + "eventRT/event" + "eventRT/logger" + + "github.com/gin-gonic/gin" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +var ( + errEventNotFound = errors.New("event not found") + errInvalidStatusTransition = errors.New("invalid status transition: precondition status mismatch") +) + +type updateStatusRequest struct { + Op string `json:"op" binding:"required"` +} + +// ConfirmEventHandler handles PATCH /events/:event_uuid/confirm +// Transitions event status from EventStatusReported to EventStatusConfirmed. +func ConfirmEventHandler(c *gin.Context) { + handleStatusTransition(c, + constants.EventStatusReported, + constants.EventStatusConfirmed, + "confirm", + ) +} + +// CloseEventHandler handles PATCH /events/:event_uuid/close +// Transitions event status from EventStatusConfirmed to EventStatusClosed. +func CloseEventHandler(c *gin.Context) { + handleStatusTransition(c, + constants.EventStatusConfirmed, + constants.EventStatusClosed, + "close", + ) +} + +func handleStatusTransition(c *gin.Context, requiredStatus, newStatus int, action string) { + ctx := c.Request.Context() + eventUUID := c.Param("event_uuid") + + var req updateStatusRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "field op is required"}) + return + } + + updated, err := transitionEventStatus(ctx, eventUUID, req.Op, requiredStatus, newStatus, action) + if err != nil { + switch { + case errors.Is(err, errEventNotFound): + c.JSON(http.StatusNotFound, gin.H{"error": "event not found"}) + case errors.Is(err, errInvalidStatusTransition): + c.JSON(http.StatusConflict, gin.H{ + "error": "invalid status transition", + "required_status": requiredStatus, + }) + default: + logger.Error(ctx, "update event status failed", + "event_uuid", eventUUID, + "action", action, + "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"}) + } + return + } + + c.JSON(http.StatusOK, updated) +} + +// transitionEventStatus atomically validates the precondition status and applies the transition. +// Uses FindOneAndUpdate with a status filter so the check and update are a single MongoDB operation. +func transitionEventStatus(ctx context.Context, eventUUID, op string, requiredStatus, newStatus int, action string) (*event.EventRecord, error) { + col := database.GetMongoClient(). + Database(constants.EventDBName). + Collection(constants.EventCollectionName) + + opRecord := event.OperationRecord{ + Action: action, + Op: op, + TS: time.Now().UnixNano() / int64(time.Millisecond), + } + + filter := bson.M{ + "event_uuid": eventUUID, + "status": requiredStatus, + } + update := bson.M{ + "$set": bson.M{"status": newStatus}, + "$push": bson.M{"operations": opRecord}, + } + opts := options.FindOneAndUpdate().SetReturnDocument(options.After) + + var updated event.EventRecord + err := col.FindOneAndUpdate(ctx, filter, update, opts).Decode(&updated) + if err != nil { + if !errors.Is(err, mongo.ErrNoDocuments) { + return nil, err + } + // FindOneAndUpdate returned no documents: either the event doesn't exist + // or the current status doesn't match the required precondition. + // Do a follow-up read to distinguish the two cases. + var existing event.EventRecord + findErr := col.FindOne(ctx, bson.M{"event_uuid": eventUUID}).Decode(&existing) + if errors.Is(findErr, mongo.ErrNoDocuments) { + return nil, errEventNotFound + } + if findErr != nil { + return nil, findErr + } + return nil, errInvalidStatusTransition + } + + logger.Info(ctx, "event status transitioned", + "event_uuid", eventUUID, + "from", requiredStatus, + "to", newStatus, + "op", op) + + return &updated, nil +} diff --git a/logger/facede.go b/logger/facede.go new file mode 100644 index 0000000..cf88ccf --- /dev/null +++ b/logger/facede.go @@ -0,0 +1,73 @@ +// Package logger define log struct of eventRT project +package logger + +import ( + "context" + "sync" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var ( + f *facade + fOnce sync.Once +) + +type facade struct { + _logger *zap.Logger +} + +// Debug define facade func of debug level log +func Debug(ctx context.Context, msg string, kv ...any) { + logFacade().log(ctx, zapcore.DebugLevel, msg, kv...) +} + +// Info define facade func of info level log +func Info(ctx context.Context, msg string, kv ...any) { + logFacade().log(ctx, zapcore.InfoLevel, msg, kv...) +} + +// Warn define facade func of warn level log +func Warn(ctx context.Context, msg string, kv ...any) { + logFacade().log(ctx, zapcore.WarnLevel, msg, kv...) +} + +// Error define facade func of error level log +func Error(ctx context.Context, msg string, kv ...any) { + logFacade().log(ctx, zapcore.ErrorLevel, msg, kv...) +} + +func (f *facade) log(ctx context.Context, lvl zapcore.Level, msg string, kv ...any) { + f.logSkip(ctx, lvl, 0, msg, kv...) +} + +func (f *facade) logSkip(ctx context.Context, lvl zapcore.Level, extraSkip int, msg string, kv ...any) { + fields := makeLogFieldsSkip(ctx, extraSkip, kv...) + ce := f._logger.Check(lvl, msg) + ce.Write(fields...) +} + +// ErrorSkip logs at error level with extra caller skip frames for wrapper functions. +func ErrorSkip(ctx context.Context, extraSkip int, msg string, kv ...any) { + logFacade().logSkip(ctx, zapcore.ErrorLevel, extraSkip, msg, kv...) +} + +// WarnSkip logs at warn level with extra caller skip frames for wrapper functions. +func WarnSkip(ctx context.Context, extraSkip int, msg string, kv ...any) { + logFacade().logSkip(ctx, zapcore.WarnLevel, extraSkip, msg, kv...) +} + +// InfoSkip logs at info level with extra caller skip frames for wrapper functions. +func InfoSkip(ctx context.Context, extraSkip int, msg string, kv ...any) { + logFacade().logSkip(ctx, zapcore.InfoLevel, extraSkip, msg, kv...) +} + +func logFacade() *facade { + fOnce.Do(func() { + f = &facade{ + _logger: GetLoggerInstance(), + } + }) + return f +} diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..862216f --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,114 @@ +// Package logger define log struct of eventRT project +package logger + +import ( + "context" + "path" + "runtime" + + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// Logger is the interface returned by New for structured, trace-aware logging. +type Logger interface { + Debug(msg string, kv ...any) + Info(msg string, kv ...any) + Warn(msg string, kv ...any) + Error(msg string, kv ...any) +} + +type logger struct { + ctx context.Context + _logger *zap.Logger +} + +func (l *logger) Debug(msg string, kv ...any) { + l.log(zapcore.DebugLevel, msg, kv...) +} + +func (l *logger) Info(msg string, kv ...any) { + l.log(zapcore.InfoLevel, msg, kv...) +} + +func (l *logger) Warn(msg string, kv ...any) { + l.log(zapcore.WarnLevel, msg, kv...) +} + +func (l *logger) Error(msg string, kv ...any) { + l.log(zapcore.ErrorLevel, msg, kv...) +} + +func (l *logger) log(lvl zapcore.Level, msg string, kv ...any) { + fields := makeLogFields(l.ctx, kv...) + ce := l._logger.Check(lvl, msg) + ce.Write(fields...) +} + +func makeLogFields(ctx context.Context, kv ...any) []zap.Field { + return makeLogFieldsSkip(ctx, 0, kv...) +} + +func makeLogFieldsSkip(ctx context.Context, extraSkip int, kv ...any) []zap.Field { + if len(kv)%2 != 0 { + kv = append(kv, "unknown") + } + + spanCtx := trace.SpanFromContext(ctx).SpanContext() + traceID := spanCtx.TraceID().String() + spanID := spanCtx.SpanID().String() + kv = append(kv, "traceID", traceID, "spanID", spanID) + + funcName, file, line := getLoggerCallerInfoSkip(extraSkip) + kv = append(kv, "func", funcName, "file", file, "line", line) + fields := make([]zap.Field, 0, len(kv)/2) + for i := 0; i < len(kv); i += 2 { + key := kv[i].(string) + value := kv[i+1] + switch v := value.(type) { + case string: + fields = append(fields, zap.String(key, v)) + case int: + fields = append(fields, zap.Int(key, v)) + case int64: + fields = append(fields, zap.Int64(key, v)) + case float32: + fields = append(fields, zap.Float32(key, v)) + case float64: + fields = append(fields, zap.Float64(key, v)) + case bool: + fields = append(fields, zap.Bool(key, v)) + case error: + fields = append(fields, zap.Error(v)) + default: + fields = append(fields, zap.Any(key, v)) + } + } + return fields +} + +// getLoggerCallerInfo returns caller info at a fixed depth for the standard facade call chain. +func getLoggerCallerInfo() (funcName, file string, line int) { + return getLoggerCallerInfoSkip(0) +} + +// getLoggerCallerInfoSkip returns caller info with additional skip frames beyond the standard depth. +func getLoggerCallerInfoSkip(extraSkip int) (funcName, file string, line int) { + pc, file, line, ok := runtime.Caller(4 + extraSkip) + if !ok { + return + } + file = path.Base(file) + funcName = runtime.FuncForPC(pc).Name() + return +} + +// New returns a logger bound to ctx. Trace fields (traceID, spanID) are extracted +// from the OTel span stored in ctx and included in every log entry. +func New(ctx context.Context) Logger { + return &logger{ + ctx: ctx, + _logger: GetLoggerInstance(), + } +} diff --git a/logger/loki_syncer.go b/logger/loki_syncer.go new file mode 100644 index 0000000..3792c2e --- /dev/null +++ b/logger/loki_syncer.go @@ -0,0 +1,132 @@ +// Package logger define log struct of eventRT project +package logger + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "maps" + "net/http" + "os" + "strconv" + "sync" + "time" + + "eventRT/config" +) + +type lokiPushRequest struct { + Streams []lokiStream `json:"streams"` +} + +type lokiStream struct { + Stream map[string]string `json:"stream"` + Values [][2]string `json:"values"` +} + +// lokiSyncer implements zapcore.WriteSyncer, batching log lines and pushing them +// to Loki's push API asynchronously. Errors are silently dropped so a unreachable +// Loki instance never blocks or crashes the application. +type lokiSyncer struct { + endpoint string + labels map[string]string + client *http.Client + ch chan string + wg sync.WaitGroup + closeOnce sync.Once +} + +func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer { + // always tag development logs with env=development; caller-supplied labels override if needed + labels := map[string]string{"env": "development"} + maps.Copy(labels, lCfg.Labels) + ls := &lokiSyncer{ + endpoint: lCfg.Endpoint + "/loki/api/v1/push", + labels: labels, + client: &http.Client{Timeout: 5 * time.Second}, + ch: make(chan string, 512), + } + ls.wg.Add(1) + go ls.run() + return ls +} + +func (ls *lokiSyncer) Write(p []byte) (int, error) { + select { + case ls.ch <- string(p): + default: + // channel full: drop the line rather than block the caller + } + return len(p), nil +} + +// Sync flushes remaining buffered lines and shuts down the background goroutine. +// Called by zap.Logger.Sync() at application shutdown. +func (ls *lokiSyncer) Sync() error { + ls.closeOnce.Do(func() { close(ls.ch) }) + ls.wg.Wait() + return nil +} + +func (ls *lokiSyncer) run() { + defer ls.wg.Done() + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + var batch []string + flush := func() { + if len(batch) == 0 { + return + } + ls.push(batch) + batch = batch[:0] + } + + for { + select { + case line, ok := <-ls.ch: + if !ok { + flush() + return + } + batch = append(batch, line) + if len(batch) >= 100 { + flush() + } + case <-ticker.C: + flush() + } + } +} + +func (ls *lokiSyncer) push(lines []string) { + ts := strconv.FormatInt(time.Now().UnixNano(), 10) + values := make([][2]string, len(lines)) + for i, line := range lines { + values[i] = [2]string{ts, line} + } + + body, err := json.Marshal(lokiPushRequest{ + Streams: []lokiStream{{Stream: ls.labels, Values: values}}, + }) + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, ls.endpoint, bytes.NewReader(body)) + if err != nil { + return + } + req.Header.Set("Content-Type", "application/json") + + resp, err := ls.client.Do(req) + if err != nil { + fmt.Fprintf(os.Stderr, "loki syncer: push failed: %v\n", err) + return + } + defer resp.Body.Close() +} diff --git a/logger/zap.go b/logger/zap.go new file mode 100644 index 0000000..c716ffb --- /dev/null +++ b/logger/zap.go @@ -0,0 +1,124 @@ +// Package logger define log struct of eventRT project +package logger + +import ( + "fmt" + "os" + "sync" + "time" + + "eventRT/config" + "eventRT/constants" + + "github.com/natefinch/lumberjack" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var ( + once sync.Once + _globalLoggerMu sync.RWMutex + _globalLogger *zap.Logger +) + +// getEncoder returns a console encoder for development (human-readable, colored) and a JSON encoder +// for container modes (parseable by Promtail pipeline_stages). +func getEncoder(mode string) zapcore.Encoder { + cfg := zap.NewProductionEncoderConfig() + cfg.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05") + cfg.TimeKey = "time" + cfg.EncodeCaller = zapcore.ShortCallerEncoder + if mode == constants.DevelopmentLogMode { + cfg.EncodeLevel = zapcore.CapitalColorLevelEncoder + return zapcore.NewConsoleEncoder(cfg) + } + cfg.EncodeLevel = zapcore.CapitalLevelEncoder + return zapcore.NewJSONEncoder(cfg) +} + +// getWriteSyncer returns write targets based on mode: +// - development: stdout + optional Loki direct-push (when loki.endpoint is set) +// - container modes: stdout always (Promtail collects) + rotating file (when filepath is set) +func getWriteSyncer(lCfg config.LoggerConfig) zapcore.WriteSyncer { + stdout := zapcore.AddSync(os.Stdout) + + if lCfg.Mode == constants.DevelopmentLogMode { + if lCfg.Loki.Endpoint == "" { + return stdout + } + return zapcore.NewMultiWriteSyncer(stdout, newLokiSyncer(lCfg.Loki)) + } + + syncers := []zapcore.WriteSyncer{stdout} + if lCfg.FilePath != "" { + dateStr := time.Now().Format("2006-01-02 15:04:05") + syncers = append(syncers, zapcore.AddSync(&lumberjack.Logger{ + Filename: fmt.Sprintf(lCfg.FilePath, dateStr), + MaxSize: lCfg.MaxSize, + MaxAge: lCfg.MaxAge, + MaxBackups: lCfg.MaxBackups, + Compress: lCfg.Compress, + })) + } + return zapcore.NewMultiWriteSyncer(syncers...) +} + +// containerFields reads K8s Downward API environment variables and returns them as global zap fields. +// These fields appear on every log line, allowing Loki/Grafana to filter by pod, namespace, and node. +// Inject them in the Deployment manifest: +// +// env: +// - name: K8S_NAMESPACE +// valueFrom: {fieldRef: {fieldPath: metadata.namespace}} +// - name: K8S_NODE_NAME +// valueFrom: {fieldRef: {fieldPath: spec.nodeName}} +func containerFields() []zap.Field { + var fields []zap.Field + // HOSTNAME is automatically set to the pod name by Kubernetes. + if pod := os.Getenv("HOSTNAME"); pod != "" { + fields = append(fields, zap.String("pod", pod)) + } + if ns := os.Getenv("K8S_NAMESPACE"); ns != "" { + fields = append(fields, zap.String("namespace", ns)) + } + if node := os.Getenv("K8S_NODE_NAME"); node != "" { + fields = append(fields, zap.String("node", node)) + } + return fields +} + +// initLogger return successfully initialized zap logger +func initLogger(lCfg config.LoggerConfig) *zap.Logger { + writeSyncer := getWriteSyncer(lCfg) + encoder := getEncoder(lCfg.Mode) + + l := new(zapcore.Level) + if err := l.UnmarshalText([]byte(lCfg.Level)); err != nil { + panic(err) + } + + core := zapcore.NewCore(encoder, writeSyncer, l) + opts := []zap.Option{zap.AddCaller()} + if lCfg.Mode != constants.DevelopmentLogMode { + opts = append(opts, zap.Fields(containerFields()...)) + } + logger := zap.New(core, opts...) + + zap.ReplaceGlobals(logger) + return logger +} + +// InitLoggerInstance define func of return instance of zap logger +func InitLoggerInstance(lCfg config.LoggerConfig) { + once.Do(func() { + _globalLogger = initLogger(lCfg) + }) +} + +// GetLoggerInstance define func of returns the global logger instance It's safe for concurrent use. +func GetLoggerInstance() *zap.Logger { + _globalLoggerMu.RLock() + logger := _globalLogger + _globalLoggerMu.RUnlock() + return logger +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..8e71591 --- /dev/null +++ b/main.go @@ -0,0 +1,148 @@ +// entry function +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + + "eventRT/config" + "eventRT/constants" + "eventRT/database" + "eventRT/event" + "eventRT/handler" + "eventRT/logger" + "eventRT/middleware" + "eventRT/mq" + "eventRT/util" + + "github.com/gin-gonic/gin" + "go.opentelemetry.io/otel" +) + +var ( + eventRTConfigDir = flag.String("eventRT_config_dir", "./configs", "config file dir") + eventRTConfigName = flag.String("eventRT_config_name", "config", "config file name") + eventRTConfigType = flag.String("eventRT_config_type", "yaml", "config file type") +) + +func main() { + flag.Parse() + + configPath := filepath.Join(*eventRTConfigDir, *eventRTConfigName+"."+*eventRTConfigType) + if _, err := os.Stat(configPath); os.IsNotExist(err) { + log.Println("configuration file not found,checking for example file") + + exampleConfigPath := filepath.Join(*eventRTConfigDir, *eventRTConfigName+".example."+*eventRTConfigType) + configDir := filepath.Dir(configPath) + if err := os.MkdirAll(configDir, 0o755); err != nil { + panic(fmt.Errorf("failed to create config directory %s:%w", configDir, err)) + } + if _, err := os.Stat(exampleConfigPath); err == nil { + if err := util.CopyFile(exampleConfigPath, configPath); err != nil { + panic(fmt.Errorf("failed to copy example config file:%w", err)) + } + } else { + panic(errors.New("no config file and no config example file found")) + } + } + + eventRTConfig := config.ReadAndInitConfig(*eventRTConfigDir, *eventRTConfigName, *eventRTConfigType) + // init logger instance + logger.InitLoggerInstance(eventRTConfig.LoggerConfig) + defer logger.GetLoggerInstance().Sync() + + // init OTel TracerProvider + tp, tpErr := middleware.InitTracerProvider(context.Background(), eventRTConfig) + if tpErr != nil { + log.Printf("warn: OTLP tracer init failed, tracing disabled: %v", tpErr) + } + if tp != nil { + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + tp.Shutdown(shutdownCtx) + }() + } + + notifyCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + defer stop() + + notifyCtx, startupSpan := otel.Tracer("eventRT/main").Start(notifyCtx, "startup") + startupSpan.End() + + // init MongoDB client + client := database.InitMongoInstance(notifyCtx, eventRTConfig) + defer func() { + disconnectCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := client.Disconnect(disconnectCtx); err != nil { + logger.Error(notifyCtx, "mongodb disconnect failed", "err", err) + } else { + logger.Info(notifyCtx, "mongodb connection closed gracefully") + } + }() + + // init RabbitMQ connection + mq.InitRabbitProxy(notifyCtx, eventRTConfig.RabbitMQConfig) + defer mq.GetConn().Close() + + go event.ReceiptUpDownLimitAlarm(notifyCtx) + + // use release mode in production + if eventRTConfig.DeployEnv == constants.ProductionDeployMode { + gin.SetMode(gin.ReleaseMode) + } + engine := gin.New() + engine.Use(gin.Logger(), gin.Recovery()) + + // TODO k8s liveness & readiness probe endpoints + // engine.GET("/ping", func(c *gin.Context) { + // c.JSON(200, gin.H{"status": "ok"}) + // }) + + events := engine.Group("/events") + { + events.GET("/:event_uuid", handler.GetEventHandler) + events.PATCH("/:event_uuid/confirm", handler.ConfirmEventHandler) + events.PATCH("/:event_uuid/close", handler.CloseEventHandler) + } + + server := http.Server{ + Addr: eventRTConfig.ServiceAddr, + Handler: engine, + } + + go func() { + <-notifyCtx.Done() + ctx := context.Background() + logger.Info(ctx, "shutdown signal received, cleaning up...") + shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := server.Shutdown(shutdownCtx); err != nil { + logger.Error(shutdownCtx, "server shutdown failed", "err", err) + } + mq.CloseRabbitProxy() + logger.Info(ctx, "resources cleaned up, exiting") + }() + + logger.Info(notifyCtx, "starting EventRT server") + err := server.ListenAndServe() + if err != nil { + if err == http.ErrServerClosed { + // the service receives the shutdown signal normally and then closes + logger.Info(notifyCtx, "server closed under request") + } else { + // abnormal shutdown of service + logger.Error(notifyCtx, "server closed unexpected", "err", err) + } + } +} diff --git a/middleware/trace.go b/middleware/trace.go new file mode 100644 index 0000000..e08f349 --- /dev/null +++ b/middleware/trace.go @@ -0,0 +1,82 @@ +// Package middleware defines OTel tracing infrastructure and gin middlewares. +package middleware + +import ( + "context" + "fmt" + + "eventRT/config" + "eventRT/constants" + + "github.com/gin-gonic/gin" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" + sdkresource "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + oteltrace "go.opentelemetry.io/otel/trace" +) + +// InitTracerProvider creates an OTLP TracerProvider and registers it as the global provider. +// It registers the W3C TraceContext propagator (traceparent header). +// The caller is responsible for calling Shutdown on the returned provider during graceful shutdown. +func InitTracerProvider(ctx context.Context, cfg config.EventRTConfig) (*sdktrace.TracerProvider, error) { + opts := []otlptracehttp.Option{ + otlptracehttp.WithEndpoint(cfg.OtelConfig.Endpoint), + } + if cfg.OtelConfig.Insecure { + opts = append(opts, otlptracehttp.WithInsecure()) + } + + exporter, err := otlptracehttp.New(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("create OTLP exporter: %w", err) + } + + res := sdkresource.NewSchemaless( + attribute.String("service.name", cfg.ServiceName), + attribute.String("deployment.environment", cfg.DeployEnv), + ) + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + sdktrace.WithSampler(sdktrace.AlwaysSample()), + ) + + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return tp, nil +} + +// StartTrace extracts upstream W3C trace context from request headers and starts a server span. +func StartTrace() gin.HandlerFunc { + tracer := otel.Tracer("eventRT/http") + return func(c *gin.Context) { + ctx := otel.GetTextMapPropagator().Extract( + c.Request.Context(), + propagation.HeaderCarrier(c.Request.Header), + ) + + spanName := c.FullPath() + if spanName == "" { + spanName = c.Request.URL.Path + } + ctx, span := tracer.Start(ctx, spanName, + oteltrace.WithSpanKind(oteltrace.SpanKindServer), + ) + defer span.End() + + spanCtx := span.SpanContext() + ctx = context.WithValue(ctx, constants.CtxKeyTraceID, spanCtx.TraceID().String()) + ctx = context.WithValue(ctx, constants.CtxKeySpanID, spanCtx.SpanID().String()) + + c.Request = c.Request.WithContext(ctx) + c.Set(constants.HeaderTraceID, spanCtx.TraceID().String()) + c.Set(constants.HeaderSpanID, spanCtx.SpanID().String()) + + c.Next() + } +} diff --git a/mq/mq_init.go b/mq/mq_init.go new file mode 100644 index 0000000..608ffaf --- /dev/null +++ b/mq/mq_init.go @@ -0,0 +1,217 @@ +// Package mq define message queue operation functions +package mq + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "fmt" + "os" + "sync" + "time" + + "eventRT/config" + "eventRT/logger" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/youmark/pkcs8" +) + +var ( + _globalRabbitMQProxy *RabbitMQProxy + rabbitMQOnce sync.Once +) + +// RabbitMQProxy define stuct of rabbitMQ connection proxy +type RabbitMQProxy struct { + tlsConf *tls.Config + conn *amqp.Connection + cancel context.CancelFunc + mu sync.Mutex +} + +// rabbitMQCertConf define stuct of rabbitMQ connection certificates config +type rabbitMQCertConf struct { + serverName string + insecureSkipVerify bool + clientCert tls.Certificate + caCertPool *x509.CertPool +} + +// GetConn define func to return the rabbitMQ connection +func GetConn() *amqp.Connection { + _globalRabbitMQProxy.mu.Lock() + defer _globalRabbitMQProxy.mu.Unlock() + return _globalRabbitMQProxy.conn +} + +// InitRabbitProxy return instance of rabbitMQ connection +func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy { + amqpURI := generateRabbitMQURI(rCfg) + tlsConf, err := initCertConf(rCfg) + if err != nil { + logger.Error(ctx, "init rabbitMQ cert config failed", "error", err) + panic(err) + } + rabbitMQOnce.Do(func() { + cancelCtx, cancel := context.WithCancel(ctx) + conn := initRabbitMQ(ctx, amqpURI, tlsConf) + _globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel} + go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI) + }) + return _globalRabbitMQProxy +} + +// initRabbitMQ return instance of rabbitMQ connection +func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection { + logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI) + conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{ + TLSClientConfig: tlsConf, + SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, + Heartbeat: 10 * time.Second, + }) + if err != nil { + logger.Error(ctx, "init rabbitMQ connection failed", "error", err) + panic(err) + } + + return conn +} + +func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) { + for { + closeChan := make(chan *amqp.Error) + GetConn().NotifyClose(closeChan) + + select { + case <-ctx.Done(): + logger.Info(ctx, "context cancelled, exiting handleReconnect") + return + case err, ok := <-closeChan: + if !ok { + logger.Info(ctx, "rabbitMQ notify channel closed") + return + } + + if err == nil { + logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect") + return + } + + logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err) + } + + if !p.reconnect(ctx, rabbitMQURI) { + return + } + } +} + +func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool { + for { + logger.Info(ctx, "attempting to reconnect to rabbitMQ...") + select { + case <-ctx.Done(): + return false + case <-time.After(5 * time.Second): + + } + + newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{ + TLSClientConfig: p.tlsConf, + SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, + Heartbeat: 10 * time.Second, + }) + if err == nil { + p.mu.Lock() + p.conn = newConn + p.mu.Unlock() + logger.Info(ctx, "rabbitMQ reconnected successfully") + return true + } + logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err) + } +} + +// CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine +func CloseRabbitProxy() { + if _globalRabbitMQProxy != nil { + _globalRabbitMQProxy.cancel() + _globalRabbitMQProxy.mu.Lock() + if _globalRabbitMQProxy.conn != nil { + _globalRabbitMQProxy.conn.Close() + } + _globalRabbitMQProxy.mu.Unlock() + } +} + +func generateRabbitMQURI(rCfg config.RabbitMQConfig) string { + // TODO 考虑拆分用户名密码配置项,兼容不同认证方式 + // user := url.QueryEscape(rCfg.User) + // password := url.QueryEscape(rCfg.Password) + + // amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/", + // user, + // password, + // rCfg.Host, + // rCfg.Port, + // ) + amqpURI := fmt.Sprintf("amqps://%s:%d/", + rCfg.Host, + rCfg.Port, + ) + return amqpURI +} + +func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) { + tlsConf := &tls.Config{ + InsecureSkipVerify: rCfg.InsecureSkipVerify, + ServerName: rCfg.ServerName, + } + + caCert, err := os.ReadFile(rCfg.CACertPath) + if err != nil { + return nil, fmt.Errorf("read server ca file failed: %w", err) + } + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath) + } + tlsConf.RootCAs = caCertPool + + certPEM, err := os.ReadFile(rCfg.ClientCertPath) + if err != nil { + return nil, fmt.Errorf("read client cert file failed: %w", err) + } + + keyData, err := os.ReadFile(rCfg.ClientKeyPath) + if err != nil { + return nil, fmt.Errorf("read private key file failed: %w", err) + } + + block, _ := pem.Decode(keyData) + if block == nil { + return nil, fmt.Errorf("failed to decode PEM block from private key") + } + + der, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword)) + if err != nil { + return nil, fmt.Errorf("parse password-protected private key failed: %w", err) + } + + privBytes, err := x509.MarshalPKCS8PrivateKey(der) + if err != nil { + return nil, fmt.Errorf("marshal private key failed: %w", err) + } + + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}) + + clientCert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + return nil, fmt.Errorf("create x509 key pair failed: %w", err) + } + + tlsConf.Certificates = []tls.Certificate{clientCert} + return tlsConf, nil +} diff --git a/util/copy.go b/util/copy.go new file mode 100644 index 0000000..789c978 --- /dev/null +++ b/util/copy.go @@ -0,0 +1,35 @@ +// Package util provide some utility functions +package util + +import ( + "fmt" + "io" + "os" +) + +// CopyFile define func of copies a file from src to dst. +// If the destination file exists, it will be overwritten. +func CopyFile(src, dst string) error { + sourceFile, err := os.Open(src) + if err != nil { + return fmt.Errorf("failed to open source file %s: %w", src, err) + } + defer sourceFile.Close() + + destFile, err := os.Create(dst) + if err != nil { + return fmt.Errorf("failed to create destination file %s: %w", dst, err) + } + defer destFile.Close() + + _, err = io.Copy(destFile, sourceFile) + if err != nil { + return fmt.Errorf("failed to copy file contents from %s to %s: %w", src, dst, err) + } + + err = destFile.Sync() + if err != nil { + return fmt.Errorf("failed to sync destination file %s: %w", dst, err) + } + return nil +}