Compare commits
10 Commits
develop
...
chore/mode
| Author | SHA1 | Date |
|---|---|---|
|
|
ca68cf6c18 | |
|
|
c82ad773a3 | |
|
|
82622d0d85 | |
|
|
908c713565 | |
|
|
64b6562784 | |
|
|
05c64dda14 | |
|
|
c4e892f1c7 | |
|
|
195150d9b1 | |
|
|
3309e53653 | |
|
|
c6545e29ba |
|
|
@ -32,6 +32,7 @@ go.work
|
||||||
# ai config
|
# ai config
|
||||||
.cursor/
|
.cursor/
|
||||||
.claude/
|
.claude/
|
||||||
|
.codewhale/
|
||||||
.cursorrules
|
.cursorrules
|
||||||
.copilot/
|
.copilot/
|
||||||
.chatgpt/
|
.chatgpt/
|
||||||
|
|
|
||||||
|
|
@ -19,15 +19,15 @@ const (
|
||||||
|
|
||||||
// channel name suffix
|
// channel name suffix
|
||||||
const (
|
const (
|
||||||
ChannelSuffixP = "P"
|
ChannelSuffixP = "p"
|
||||||
ChannelSuffixQ = "Q"
|
ChannelSuffixQ = "q"
|
||||||
ChannelSuffixS = "S"
|
ChannelSuffixS = "s"
|
||||||
ChannelSuffixPS = "PS"
|
ChannelSuffixPF = "pf"
|
||||||
ChannelSuffixF = "F"
|
ChannelSuffixF = "f"
|
||||||
ChannelSuffixDeltaF = "deltaF"
|
ChannelSuffixDeltaF = "df"
|
||||||
ChannelSuffixUAB = "UAB"
|
ChannelSuffixUAB = "uab"
|
||||||
ChannelSuffixUBC = "UBC"
|
ChannelSuffixUBC = "ubc"
|
||||||
ChannelSuffixUCA = "UCA"
|
ChannelSuffixUCA = "uca"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
|
||||||
269
deploy/deploy.md
269
deploy/deploy.md
|
|
@ -640,6 +640,12 @@ openssl x509 -in eventrt_client_cert.pem -noout -subject
|
||||||
|
|
||||||
将服务器端三个证书文件打包为 K8s Secret(在证书文件所在目录执行):
|
将服务器端三个证书文件打包为 K8s Secret(在证书文件所在目录执行):
|
||||||
|
|
||||||
|
```bash
|
||||||
|
sh deploy/k8s/rabbitmq-certs-secret.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
该脚本等价于:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
kubectl create secret generic rabbitmq-certs \
|
kubectl create secret generic rabbitmq-certs \
|
||||||
--from-file=ca_certificate.pem=./ca_certificate.pem \
|
--from-file=ca_certificate.pem=./ca_certificate.pem \
|
||||||
|
|
@ -695,7 +701,11 @@ kubectl apply -f deploy/k8s/pg-service.yaml
|
||||||
| **数据库** | `demo` | ConfigMap 中 `POSTGRES_DB` |
|
| **数据库** | `demo` | ConfigMap 中 `POSTGRES_DB` |
|
||||||
| **用户名** | `postgres` | ConfigMap 中 `POSTGRES_USER` |
|
| **用户名** | `postgres` | ConfigMap 中 `POSTGRES_USER` |
|
||||||
| **密码** | `coslight` | ConfigMap `postgres-config` 中配置,生产环境迁移至 Secret |
|
| **密码** | `coslight` | ConfigMap `postgres-config` 中配置,生产环境迁移至 Secret |
|
||||||
| **存储** | `2Gi` | PVC `postgres-data` |
|
| **存储** | `6Gi` | PVC `postgres-data` |
|
||||||
|
| **CPU** | `100m` 请求 / `500m` 上限 | StatefulSet `resources` 字段 |
|
||||||
|
| **内存** | `256Mi` 请求 / `512Mi` 上限 | StatefulSet `resources` 字段 |
|
||||||
|
|
||||||
|
> **注意:** 密码当前以明文形式存储在 `pg-configmap.yaml` 中,生产环境应将其迁移至 K8s Secret,并通过环境变量注入容器,避免将明文密码提交至版本库。
|
||||||
|
|
||||||
##### 4.4.1 等待 Pod 就绪
|
##### 4.4.1 等待 Pod 就绪
|
||||||
|
|
||||||
|
|
@ -703,7 +713,23 @@ kubectl apply -f deploy/k8s/pg-service.yaml
|
||||||
kubectl wait --for=condition=ready pod -l app=postgres --timeout=120s
|
kubectl wait --for=condition=ready pod -l app=postgres --timeout=120s
|
||||||
```
|
```
|
||||||
|
|
||||||
##### 4.4.2 初始化异步任务表
|
##### 4.4.2 连接验证
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 快速检查 PostgreSQL 是否接受连接
|
||||||
|
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
|
||||||
|
-- pg_isready -U postgres -d demo
|
||||||
|
|
||||||
|
# 进入 psql 执行简单查询确认数据库可用
|
||||||
|
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
|
||||||
|
-- psql -U postgres -d demo -c "SELECT current_database(), version();"
|
||||||
|
|
||||||
|
# 列出所有数据库(确认 demo 库已创建)
|
||||||
|
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
|
||||||
|
-- psql -U postgres -c "\l"
|
||||||
|
```
|
||||||
|
|
||||||
|
##### 4.4.3 初始化异步任务表
|
||||||
|
|
||||||
PostgreSQL 就绪后执行 1.4 节的建表 SQL,可通过以下方式进入容器执行:
|
PostgreSQL 就绪后执行 1.4 节的建表 SQL,可通过以下方式进入容器执行:
|
||||||
|
|
||||||
|
|
@ -717,14 +743,14 @@ kubectl exec -i $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metada
|
||||||
-- psql -U postgres -d demo < /path/to/init.sql
|
-- psql -U postgres -d demo < /path/to/init.sql
|
||||||
```
|
```
|
||||||
|
|
||||||
##### 4.4.3 状态检查
|
##### 4.4.4 状态检查
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
kubectl get pods -l app=postgres
|
kubectl get pods -l app=postgres
|
||||||
kubectl logs -l app=postgres --tail=30
|
kubectl logs -l app=postgres --tail=30
|
||||||
```
|
```
|
||||||
|
|
||||||
##### 4.4.4 清理
|
##### 4.4.5 清理
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
kubectl delete -f deploy/k8s/pg-service.yaml \
|
kubectl delete -f deploy/k8s/pg-service.yaml \
|
||||||
|
|
@ -733,68 +759,73 @@ kubectl delete -f deploy/k8s/pg-service.yaml \
|
||||||
-f deploy/k8s/pg-configmap.yaml
|
-f deploy/k8s/pg-configmap.yaml
|
||||||
```
|
```
|
||||||
|
|
||||||
#### 4.5 部署 MongoDB
|
|
||||||
|
|
||||||
```bash
|
|
||||||
kubectl apply -f deploy/k8s/mongodb-secret.yaml
|
|
||||||
kubectl apply -f deploy/k8s/mongodb-pvc.yaml
|
|
||||||
kubectl apply -f deploy/k8s/mongodb-statefulset.yaml
|
|
||||||
kubectl apply -f deploy/k8s/mongodb-service.yaml
|
|
||||||
```
|
|
||||||
|
|
||||||
| 参数 | 值 | 说明 |
|
|
||||||
| :--- | :--- | :--- |
|
|
||||||
| **镜像** | `mongo:7.0` | MongoDB 7.0 |
|
|
||||||
| **NodePort** | `30017` | 集群外访问端口 |
|
|
||||||
| **用户名** | `admin` | Root 管理员 |
|
|
||||||
| **密码** | `coslight` | Secret `mongodb-secret` 中配置,生产环境请替换强密码 |
|
|
||||||
| **存储** | `2Gi` | PVC `mongodb-data` |
|
|
||||||
|
|
||||||
> **注意:** 密码存储在 `mongodb-secret.yaml` 的 `stringData` 中,生产环境应替换为强密码,并避免将明文密码提交至版本库。
|
|
||||||
|
|
||||||
##### 4.5.1 等待 Pod 就绪
|
|
||||||
|
|
||||||
```bash
|
|
||||||
kubectl wait --for=condition=ready pod -l app=mongodb --timeout=120s
|
|
||||||
```
|
|
||||||
|
|
||||||
##### 4.5.2 连接验证
|
|
||||||
|
|
||||||
```bash
|
|
||||||
kubectl exec -it $(kubectl get pod -l app=mongodb -o jsonpath='{.items[0].metadata.name}') \
|
|
||||||
-- mongosh -u admin -p coslight --authenticationDatabase admin
|
|
||||||
```
|
|
||||||
|
|
||||||
##### 4.5.3 状态检查
|
|
||||||
|
|
||||||
```bash
|
|
||||||
kubectl get pods -l app=mongodb
|
|
||||||
kubectl logs -l app=mongodb --tail=30
|
|
||||||
```
|
|
||||||
|
|
||||||
##### 4.5.4 清理
|
|
||||||
|
|
||||||
```bash
|
|
||||||
kubectl delete -f deploy/k8s/mongodb-service.yaml \
|
|
||||||
-f deploy/k8s/mongodb-statefulset.yaml \
|
|
||||||
-f deploy/k8s/mongodb-pvc.yaml \
|
|
||||||
-f deploy/k8s/mongodb-secret.yaml
|
|
||||||
```
|
|
||||||
|
|
||||||
### 5\. 部署 ModelRT(Kubernetes)
|
### 5\. 部署 ModelRT(Kubernetes)
|
||||||
|
|
||||||
所有资源部署在 `default` 命名空间,YAML 文件位于 `deploy/k8s/`。
|
所有资源部署在 `default` 命名空间,YAML 文件位于 `deploy/k8s/`。
|
||||||
|
|
||||||
#### 5.1 构建并推送镜像
|
#### 5.1 构建并推送镜像
|
||||||
|
|
||||||
|
镜像采用三阶段构建,最终基于 `scratch`:
|
||||||
|
|
||||||
|
| 阶段 | 基础镜像 | 作用 |
|
||||||
|
| :--- | :--- | :--- |
|
||||||
|
| **builder** | `golang:1.26-alpine` | 编译 Go 二进制(`CGO_ENABLED=0`,`-trimpath -ldflags="-s -w"`) |
|
||||||
|
| **certs** | `alpine:3.21` | 提取 CA 证书、时区数据及非 root 用户定义(UID 默认 `1000`) |
|
||||||
|
| **runtime** | `scratch` | 仅含可执行文件与运行时依赖,无 shell、无包管理器 |
|
||||||
|
|
||||||
|
**方式一:从源码构建并加载**
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# 在项目根目录执行
|
# 在项目根目录执行(默认运行用户 UID=1000)
|
||||||
docker build -f deploy/dockerfile/modelrt.Dockerfile -t coslight/modelrt:latest .
|
docker build -f deploy/dockerfile/modelrt.Dockerfile -t coslight/modelrt:latest .
|
||||||
|
|
||||||
# 推送到镜像仓库(或直接加载到 Minikube)
|
# 自定义运行用户 UID
|
||||||
|
docker build -f deploy/dockerfile/modelrt.Dockerfile \
|
||||||
|
--build-arg USER_ID=2000 \
|
||||||
|
-t coslight/modelrt:latest .
|
||||||
|
|
||||||
|
# 加载到 Minikube(无需私有仓库)
|
||||||
minikube image load coslight/modelrt:latest
|
minikube image load coslight/modelrt:latest
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**方式二:直接加载已有本地镜像**
|
||||||
|
|
||||||
|
Ubuntu 宿主机上已存在构建好的镜像(如 `modelrt:v1`)时,无需重新构建,直接导入 Minikube:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 确认本地镜像存在
|
||||||
|
docker images modelrt:v1
|
||||||
|
|
||||||
|
# 加载到 Minikube
|
||||||
|
minikube image load modelrt:v1
|
||||||
|
|
||||||
|
# 验证镜像已进入 Minikube 缓存
|
||||||
|
minikube image ls | grep modelrt
|
||||||
|
```
|
||||||
|
|
||||||
|
> **注意:** `deploy/k8s/modelrt-deployment.yaml` 中的 `image` 字段需与加载的镜像名称一致,并将 `imagePullPolicy` 设为 `Never`,防止 Minikube 尝试从远端拉取。
|
||||||
|
|
||||||
|
#### 5.1.1 镜像冒烟测试
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 查看镜像大小(scratch 镜像预期 ≤ 25 MB)
|
||||||
|
docker images coslight/modelrt:latest
|
||||||
|
|
||||||
|
# 检查镜像元信息(确认 User、Cmd、架构)
|
||||||
|
docker inspect coslight/modelrt:latest
|
||||||
|
|
||||||
|
# 验证二进制可执行(无 config 时程序报错退出属预期行为,说明镜像构建正常)
|
||||||
|
docker run --rm coslight/modelrt:latest
|
||||||
|
|
||||||
|
# 挂载示例配置做完整启动验证(Ctrl+C 退出)
|
||||||
|
docker run --rm \
|
||||||
|
-v "$(pwd)/configs/config.example.yaml:/app/configs/config.yaml" \
|
||||||
|
-p 8080:8080 \
|
||||||
|
coslight/modelrt:latest
|
||||||
|
```
|
||||||
|
|
||||||
|
> **注意:** `scratch` 镜像不含 shell,无法使用 `docker exec` 进入容器调试;如需排查问题,可临时将最终阶段改为 `alpine` 进行本地调试,确认后再切回 `scratch`。
|
||||||
|
|
||||||
#### 5.2 创建客户端证书 Secret
|
#### 5.2 创建客户端证书 Secret
|
||||||
|
|
||||||
在 RabbitMQ TLS 证书生成完成后(见 4.2),进入证书文件所在目录执行:
|
在 RabbitMQ TLS 证书生成完成后(见 4.2),进入证书文件所在目录执行:
|
||||||
|
|
@ -864,7 +895,9 @@ kubectl delete secret modelrt-certs
|
||||||
|
|
||||||
### 6\. 部署可观测性栈(Kubernetes)
|
### 6\. 部署可观测性栈(Kubernetes)
|
||||||
|
|
||||||
在 `Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Promtail + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`。
|
在 `Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Alloy + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`。
|
||||||
|
|
||||||
|
> **日志采集器说明:** 集群内的日志采集由 `Grafana Alloy`(DaemonSet)负责,它通过 Kubernetes API 抓取带 `app` label 的 Pod 容器日志,解析 `zap` 输出的 JSON 字段后推送到 `Loki`。Alloy 已**替代**早期的 `Promtail`,两者推送目标(`loki-service:3100`)与标签解析完全一致,**不要同时部署**,否则会导致 Loki 中日志翻倍。
|
||||||
|
|
||||||
#### 6.1 部署 Jaeger
|
#### 6.1 部署 Jaeger
|
||||||
|
|
||||||
|
|
@ -882,14 +915,16 @@ kubectl apply -f deploy/k8s/loki-deployment.yaml
|
||||||
kubectl apply -f deploy/k8s/loki-service.yaml
|
kubectl apply -f deploy/k8s/loki-service.yaml
|
||||||
```
|
```
|
||||||
|
|
||||||
#### 6.3 部署 Promtail
|
#### 6.3 部署 Alloy
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
kubectl apply -f deploy/k8s/promtail-rbac.yaml
|
kubectl apply -f deploy/k8s/alloy-rbac.yaml
|
||||||
kubectl apply -f deploy/k8s/promtail-configmap.yaml
|
kubectl apply -f deploy/k8s/alloy-configmap.yaml
|
||||||
kubectl apply -f deploy/k8s/promtail-daemonset.yaml
|
kubectl apply -f deploy/k8s/alloy-daemonset.yaml
|
||||||
```
|
```
|
||||||
|
|
||||||
|
> Alloy 以 DaemonSet 形式在每个节点运行,需要 `ServiceAccount` + `ClusterRole`(`alloy-rbac.yaml`)授予读取 `nodes/pods/pods/log` 的权限。采集与解析规则定义在 `alloy-configmap.yaml` 的 `config.alloy` 中。
|
||||||
|
|
||||||
#### 6.4 部署 Grafana
|
#### 6.4 部署 Grafana
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|
@ -907,9 +942,9 @@ kubectl apply -f deploy/k8s/jaeger-deployment.yaml \
|
||||||
-f deploy/k8s/loki-pvc.yaml \
|
-f deploy/k8s/loki-pvc.yaml \
|
||||||
-f deploy/k8s/loki-deployment.yaml \
|
-f deploy/k8s/loki-deployment.yaml \
|
||||||
-f deploy/k8s/loki-service.yaml \
|
-f deploy/k8s/loki-service.yaml \
|
||||||
-f deploy/k8s/promtail-rbac.yaml \
|
-f deploy/k8s/alloy-rbac.yaml \
|
||||||
-f deploy/k8s/promtail-configmap.yaml \
|
-f deploy/k8s/alloy-configmap.yaml \
|
||||||
-f deploy/k8s/promtail-daemonset.yaml \
|
-f deploy/k8s/alloy-daemonset.yaml \
|
||||||
-f deploy/k8s/grafana-configmap.yaml \
|
-f deploy/k8s/grafana-configmap.yaml \
|
||||||
-f deploy/k8s/grafana-deployment.yaml \
|
-f deploy/k8s/grafana-deployment.yaml \
|
||||||
-f deploy/k8s/grafana-service.yaml
|
-f deploy/k8s/grafana-service.yaml
|
||||||
|
|
@ -955,7 +990,6 @@ Mac 本地端口 ──SSH隧道──▶ Ubuntu 宿主机 (192.168.1.101)
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
ssh -L 5432:192.168.49.2:30432 \
|
ssh -L 5432:192.168.49.2:30432 \
|
||||||
-L 27017:192.168.49.2:30017 \
|
|
||||||
-L 5671:192.168.49.2:30671 \
|
-L 5671:192.168.49.2:30671 \
|
||||||
-L 15671:192.168.49.2:31671 \
|
-L 15671:192.168.49.2:31671 \
|
||||||
-L 6379:192.168.49.2:30001 \
|
-L 6379:192.168.49.2:30001 \
|
||||||
|
|
@ -971,7 +1005,6 @@ ssh -L 5432:192.168.49.2:30432 \
|
||||||
```bash
|
```bash
|
||||||
ssh -fN \
|
ssh -fN \
|
||||||
-L 5432:192.168.49.2:30432 \
|
-L 5432:192.168.49.2:30432 \
|
||||||
-L 27017:192.168.49.2:30017 \
|
|
||||||
-L 5671:192.168.49.2:30671 \
|
-L 5671:192.168.49.2:30671 \
|
||||||
-L 15671:192.168.49.2:31671 \
|
-L 15671:192.168.49.2:31671 \
|
||||||
-L 6379:192.168.49.2:30001 \
|
-L 6379:192.168.49.2:30001 \
|
||||||
|
|
@ -987,7 +1020,6 @@ ssh -fN \
|
||||||
| Mac 本地端口 | Minikube NodePort | 服务 | 说明 |
|
| Mac 本地端口 | Minikube NodePort | 服务 | 说明 |
|
||||||
| :--- | :--- | :--- | :--- |
|
| :--- | :--- | :--- | :--- |
|
||||||
| `5432` | `30432` | PostgreSQL | 数据库连接 `localhost:5432` |
|
| `5432` | `30432` | PostgreSQL | 数据库连接 `localhost:5432` |
|
||||||
| `27017` | `30017` | MongoDB | 数据库连接 `localhost:27017` |
|
|
||||||
| `5671` | `30671` | RabbitMQ AMQP | ModelRT / EventRT 消息队列连接 |
|
| `5671` | `30671` | RabbitMQ AMQP | ModelRT / EventRT 消息队列连接 |
|
||||||
| `15671` | `31671` | RabbitMQ Management | RabbitMQ 管理界面 `http://localhost:15671` |
|
| `15671` | `31671` | RabbitMQ Management | RabbitMQ 管理界面 `http://localhost:15671` |
|
||||||
| `6379` | `30001` | Redis | 分布式锁 / 数据存储 |
|
| `6379` | `30001` | Redis | 分布式锁 / 数据存储 |
|
||||||
|
|
@ -1011,14 +1043,111 @@ kill <PID>
|
||||||
|
|
||||||
### 8\. 后续操作(停止与清理)
|
### 8\. 后续操作(停止与清理)
|
||||||
|
|
||||||
#### 8.1 停止容器
|
#### 8.1 本地 Docker 部署清理
|
||||||
|
|
||||||
|
适用于第 1、2 节使用 `docker run` 启动的 PostgreSQL 和 Redis 容器。
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
# 停止容器
|
||||||
docker stop postgres redis
|
docker stop postgres redis
|
||||||
```
|
|
||||||
|
|
||||||
#### 8.2 删除容器(删除后数据将丢失)
|
# 删除容器(容器内数据将同步丢失)
|
||||||
|
|
||||||
```bash
|
|
||||||
docker rm postgres redis
|
docker rm postgres redis
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### 8.2 本地运行清理
|
||||||
|
|
||||||
|
适用于第 3 节以 `go run` 或编译后二进制方式在本地启动的 ModelRT 服务。
|
||||||
|
|
||||||
|
前台运行时直接 `Ctrl+C` 终止;后台运行时查找并终止进程:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 终止 go run 启动的进程
|
||||||
|
pkill -f "go run main.go"
|
||||||
|
|
||||||
|
# 或终止编译后的二进制进程
|
||||||
|
pkill model-rt
|
||||||
|
```
|
||||||
|
|
||||||
|
#### 8.3 K8s(Minikube) 部署清理
|
||||||
|
|
||||||
|
适用于第 4、5、6 节在 Minikube 中部署的所有资源。
|
||||||
|
|
||||||
|
##### 8.3.1 分服务清理
|
||||||
|
|
||||||
|
**仅停止(缩容至 0,PVC 数据保留)**
|
||||||
|
|
||||||
|
将所有 Deployment 和 StatefulSet 缩容至 0 副本,Pod 停止运行但持久卷数据不删除,之后可直接缩容回 1 恢复服务。
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 停止所有 Deployment(Redis / RabbitMQ / ModelRT / Jaeger / Loki / Grafana)
|
||||||
|
kubectl scale deployment --all --replicas=0
|
||||||
|
|
||||||
|
# 停止所有 StatefulSet(PostgreSQL,PVC 数据保留)
|
||||||
|
kubectl scale statefulset --all --replicas=0
|
||||||
|
```
|
||||||
|
|
||||||
|
恢复时:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
kubectl scale deployment --all --replicas=1
|
||||||
|
kubectl scale statefulset --all --replicas=1
|
||||||
|
```
|
||||||
|
|
||||||
|
> **注意:** DaemonSet(Alloy)无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/alloy-daemonset.yaml`。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**永久清理(删除所有资源,包含 PVC,数据不可恢复)**
|
||||||
|
|
||||||
|
按部署顺序反向删除各服务资源:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 可观测性栈(Grafana / Alloy / Loki / Jaeger)
|
||||||
|
kubectl delete -f deploy/k8s/grafana-service.yaml \
|
||||||
|
-f deploy/k8s/grafana-deployment.yaml \
|
||||||
|
-f deploy/k8s/grafana-configmap.yaml \
|
||||||
|
-f deploy/k8s/alloy-daemonset.yaml \
|
||||||
|
-f deploy/k8s/alloy-configmap.yaml \
|
||||||
|
-f deploy/k8s/alloy-rbac.yaml \
|
||||||
|
-f deploy/k8s/loki-service.yaml \
|
||||||
|
-f deploy/k8s/loki-deployment.yaml \
|
||||||
|
-f deploy/k8s/loki-pvc.yaml \
|
||||||
|
-f deploy/k8s/loki-configmap.yaml \
|
||||||
|
-f deploy/k8s/jaeger-service.yaml \
|
||||||
|
-f deploy/k8s/jaeger-deployment.yaml
|
||||||
|
|
||||||
|
# ModelRT 应用
|
||||||
|
kubectl delete -f deploy/k8s/modelrt-service.yaml \
|
||||||
|
-f deploy/k8s/modelrt-deployment.yaml \
|
||||||
|
-f deploy/k8s/modelrt-configmap.yaml \
|
||||||
|
-f deploy/k8s/modelrt-secret.yaml
|
||||||
|
kubectl delete secret modelrt-certs
|
||||||
|
|
||||||
|
# PostgreSQL
|
||||||
|
kubectl delete -f deploy/k8s/pg-service.yaml \
|
||||||
|
-f deploy/k8s/pg-statefulset.yaml \
|
||||||
|
-f deploy/k8s/pg-pvc.yaml \
|
||||||
|
-f deploy/k8s/pg-configmap.yaml
|
||||||
|
|
||||||
|
# RabbitMQ
|
||||||
|
kubectl delete -f deploy/k8s/rabbitmq-service.yaml \
|
||||||
|
-f deploy/k8s/rabbitmq-deployment.yaml \
|
||||||
|
-f deploy/k8s/rabbitmq-users-config.yaml \
|
||||||
|
-f deploy/k8s/rabbitmq-config.yaml \
|
||||||
|
-f deploy/k8s/rabbitmq-secret.yaml
|
||||||
|
kubectl delete secret rabbitmq-certs
|
||||||
|
|
||||||
|
# Redis
|
||||||
|
kubectl delete -f deploy/k8s/redis-service.yaml \
|
||||||
|
-f deploy/k8s/redis-deployment.yaml
|
||||||
|
```
|
||||||
|
|
||||||
|
##### 8.3.2 一键清理
|
||||||
|
|
||||||
|
> **注意:** 此操作会删除 `deploy/k8s/` 下所有 YAML 对应的 K8s 资源,包括 PVC,**持久化数据将永久丢失**,请确认后执行。
|
||||||
|
|
||||||
|
```bash
|
||||||
|
kubectl delete -f deploy/k8s/
|
||||||
|
kubectl delete secret rabbitmq-certs modelrt-certs
|
||||||
|
```
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.25-alpine AS builder
|
FROM golang:1.26-alpine AS builder
|
||||||
RUN apk --no-cache upgrade
|
RUN apk --no-cache upgrade
|
||||||
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
@ -11,8 +11,8 @@ RUN CGO_ENABLED=0 GOOS=linux go build \
|
||||||
-mod=readonly \
|
-mod=readonly \
|
||||||
-o modelrt main.go
|
-o modelrt main.go
|
||||||
|
|
||||||
# Prepare runtime dependencies in a pinned Alpine stage so they can be
|
# prepare runtime dependencies in a pinned alpine stage so they can be
|
||||||
# copied into scratch without pulling any vulnerable OS packages at run time.
|
# copied into scratch without pulling any vulnerable os packages at run time.
|
||||||
FROM alpine:3.21 AS certs
|
FROM alpine:3.21 AS certs
|
||||||
ARG USER_ID=1000
|
ARG USER_ID=1000
|
||||||
RUN apk --no-cache add ca-certificates tzdata && \
|
RUN apk --no-cache add ca-certificates tzdata && \
|
||||||
|
|
@ -21,15 +21,14 @@ RUN apk --no-cache add ca-certificates tzdata && \
|
||||||
FROM scratch
|
FROM scratch
|
||||||
# CA certificates required for TLS connections (RabbitMQ amqps://)
|
# CA certificates required for TLS connections (RabbitMQ amqps://)
|
||||||
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
|
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
|
||||||
# Timezone data
|
# timezone data
|
||||||
COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo
|
COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo
|
||||||
# Non-root user/group definitions
|
# non-root user/group definitions
|
||||||
COPY --from=certs /etc/passwd /etc/passwd
|
COPY --from=certs /etc/passwd /etc/passwd
|
||||||
COPY --from=certs /etc/group /etc/group
|
COPY --from=certs /etc/group /etc/group
|
||||||
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
COPY --from=builder /app/modelrt ./modelrt
|
COPY --from=builder /app/modelrt ./modelrt
|
||||||
COPY configs/config.example.yaml ./configs/config.example.yaml
|
|
||||||
|
|
||||||
USER modelrt
|
USER modelrt
|
||||||
CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"]
|
CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"]
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ data:
|
||||||
- name: Loki
|
- name: Loki
|
||||||
type: loki
|
type: loki
|
||||||
access: proxy
|
access: proxy
|
||||||
url: http://loki:3100
|
url: http://loki-service:3100
|
||||||
isDefault: true
|
isDefault: true
|
||||||
jsonData:
|
jsonData:
|
||||||
# derivedFields: 从日志的 traceID 字段生成跳转链接到 Jaeger
|
# derivedFields: 从日志的 traceID 字段生成跳转链接到 Jaeger
|
||||||
|
|
@ -23,4 +23,4 @@ data:
|
||||||
type: jaeger
|
type: jaeger
|
||||||
uid: jaeger
|
uid: jaeger
|
||||||
access: proxy
|
access: proxy
|
||||||
url: http://jaeger:16686
|
url: http://jaeger-service:16686
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ spec:
|
||||||
containers:
|
containers:
|
||||||
- name: grafana
|
- name: grafana
|
||||||
image: grafana/grafana:10.4.2
|
image: grafana/grafana:10.4.2
|
||||||
|
imagePullPolicy: IfNotPresent
|
||||||
ports:
|
ports:
|
||||||
- containerPort: 3000
|
- containerPort: 3000
|
||||||
env:
|
env:
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,14 @@
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: Service
|
kind: Service
|
||||||
metadata:
|
metadata:
|
||||||
name: grafana
|
name: grafana-service
|
||||||
namespace: default
|
namespace: default
|
||||||
spec:
|
spec:
|
||||||
ports:
|
ports:
|
||||||
- name: http
|
- name: http
|
||||||
port: 3000
|
port: 3000
|
||||||
targetPort: 3000
|
targetPort: 3000
|
||||||
nodePort: 31000 # Grafana UI: http://<NodeIP>:31000
|
nodePort: 31000 # Grafana UI: http://<NodeIP>:31000
|
||||||
selector:
|
selector:
|
||||||
app: grafana
|
app: grafana
|
||||||
type: NodePort
|
type: NodePort
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ spec:
|
||||||
containers:
|
containers:
|
||||||
- name: jaeger
|
- name: jaeger
|
||||||
image: jaegertracing/all-in-one:1.56
|
image: jaegertracing/all-in-one:1.56
|
||||||
|
imagePullPolicy: IfNotPresent
|
||||||
env:
|
env:
|
||||||
- name: COLLECTOR_OTLP_ENABLED
|
- name: COLLECTOR_OTLP_ENABLED
|
||||||
value: "true"
|
value: "true"
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: Service
|
kind: Service
|
||||||
metadata:
|
metadata:
|
||||||
name: jaeger
|
name: jaeger-service
|
||||||
labels:
|
labels:
|
||||||
app: jaeger
|
app: jaeger
|
||||||
spec:
|
spec:
|
||||||
|
|
@ -9,19 +9,19 @@ spec:
|
||||||
- name: ui
|
- name: ui
|
||||||
port: 16686
|
port: 16686
|
||||||
targetPort: 16686
|
targetPort: 16686
|
||||||
nodePort: 31686 # Jaeger UI,浏览器访问 http://<NodeIP>:31686
|
nodePort: 31686 # Jaeger UI,浏览器访问 http://<NodeIP>:31686
|
||||||
- name: collector-http
|
- name: collector-http
|
||||||
port: 14268
|
port: 14268
|
||||||
targetPort: 14268
|
targetPort: 14268
|
||||||
nodePort: 31268 # Jaeger 原生 HTTP collector(非 OTel)
|
nodePort: 31268 # Jaeger 原生 HTTP collector(非 OTel)
|
||||||
- name: otlp-http
|
- name: otlp-http
|
||||||
port: 4318
|
port: 4318
|
||||||
targetPort: 4318
|
targetPort: 4318
|
||||||
nodePort: 31318 # OTLP HTTP,集群外使用 <NodeIP>:31318
|
nodePort: 31318 # OTLP HTTP,集群外使用 <NodeIP>:31318
|
||||||
- name: otlp-grpc
|
- name: otlp-grpc
|
||||||
port: 4317
|
port: 4317
|
||||||
targetPort: 4317
|
targetPort: 4317
|
||||||
nodePort: 31317 # OTLP gRPC,集群外使用 <NodeIP>:31317
|
nodePort: 31317 # OTLP gRPC,集群外使用 <NodeIP>:31317
|
||||||
selector:
|
selector:
|
||||||
app: jaeger
|
app: jaeger
|
||||||
type: NodePort
|
type: NodePort
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ spec:
|
||||||
containers:
|
containers:
|
||||||
- name: loki
|
- name: loki
|
||||||
image: grafana/loki:2.9.4
|
image: grafana/loki:2.9.4
|
||||||
|
imagePullPolicy: IfNotPresent
|
||||||
args:
|
args:
|
||||||
- -config.file=/etc/loki/loki.yaml
|
- -config.file=/etc/loki/loki.yaml
|
||||||
ports:
|
ports:
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,14 @@
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: Service
|
kind: Service
|
||||||
metadata:
|
metadata:
|
||||||
name: loki
|
name: loki-service
|
||||||
namespace: default
|
namespace: default
|
||||||
spec:
|
spec:
|
||||||
ports:
|
ports:
|
||||||
- name: http
|
- name: http
|
||||||
port: 3100
|
port: 3100
|
||||||
targetPort: 3100
|
targetPort: 3100
|
||||||
nodePort: 31100 # 集群外访问: http://<NodeIP>:31100
|
nodePort: 31100 # 集群外访问: http://<NodeIP>:31100
|
||||||
selector:
|
selector:
|
||||||
app: loki
|
app: loki
|
||||||
type: NodePort
|
type: NodePort
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ metadata:
|
||||||
data:
|
data:
|
||||||
config.yaml: |
|
config.yaml: |
|
||||||
postgres:
|
postgres:
|
||||||
host: "192.168.1.101"
|
host: "postgres-service"
|
||||||
port: 5432
|
port: 5432
|
||||||
database: "demo"
|
database: "demo"
|
||||||
user: "postgres"
|
user: "postgres"
|
||||||
|
|
@ -35,7 +35,7 @@ data:
|
||||||
endpoint: "" # Promtail handles log collection in K8s, direct push disabled
|
endpoint: "" # Promtail handles log collection in K8s, direct push disabled
|
||||||
|
|
||||||
otel:
|
otel:
|
||||||
endpoint: "jaeger:4318"
|
endpoint: "jaeger-service:4318"
|
||||||
insecure: true
|
insecure: true
|
||||||
|
|
||||||
ants:
|
ants:
|
||||||
|
|
@ -77,7 +77,7 @@ data:
|
||||||
service_addr: ":8080"
|
service_addr: ":8080"
|
||||||
service_name: "modelRT"
|
service_name: "modelRT"
|
||||||
secret_key: "" # injected via env SERVICE_SECRET_KEY
|
secret_key: "" # injected via env SERVICE_SECRET_KEY
|
||||||
deploy_env: "production"
|
deploy_env: "development"
|
||||||
|
|
||||||
dataRT:
|
dataRT:
|
||||||
host: "http://127.0.0.1"
|
host: "http://127.0.0.1"
|
||||||
|
|
|
||||||
|
|
@ -16,8 +16,9 @@ spec:
|
||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: modelrt
|
- name: modelrt
|
||||||
image: coslight/modelrt:latest
|
image: modelrt:v1
|
||||||
imagePullPolicy: IfNotPresent
|
imagePullPolicy: IfNotPresent
|
||||||
|
command: ["/app/modelrt"]
|
||||||
args:
|
args:
|
||||||
- "-modelRT_config_dir=/app/configs"
|
- "-modelRT_config_dir=/app/configs"
|
||||||
- "-modelRT_config_name=config"
|
- "-modelRT_config_name=config"
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: Service
|
kind: Service
|
||||||
metadata:
|
metadata:
|
||||||
name: mongodb
|
name: mongodb-service
|
||||||
labels:
|
labels:
|
||||||
app: mongodb
|
app: mongodb
|
||||||
spec:
|
spec:
|
||||||
|
|
|
||||||
|
|
@ -34,9 +34,9 @@ spec:
|
||||||
- mongosh
|
- mongosh
|
||||||
- --eval
|
- --eval
|
||||||
- "db.adminCommand('ping')"
|
- "db.adminCommand('ping')"
|
||||||
initialDelaySeconds: 10
|
initialDelaySeconds: 30
|
||||||
periodSeconds: 5
|
periodSeconds: 10
|
||||||
timeoutSeconds: 3
|
timeoutSeconds: 10
|
||||||
failureThreshold: 12
|
failureThreshold: 12
|
||||||
livenessProbe:
|
livenessProbe:
|
||||||
exec:
|
exec:
|
||||||
|
|
@ -44,10 +44,10 @@ spec:
|
||||||
- mongosh
|
- mongosh
|
||||||
- --eval
|
- --eval
|
||||||
- "db.adminCommand('ping')"
|
- "db.adminCommand('ping')"
|
||||||
initialDelaySeconds: 30
|
initialDelaySeconds: 120
|
||||||
periodSeconds: 20
|
periodSeconds: 10
|
||||||
timeoutSeconds: 3
|
timeoutSeconds: 30
|
||||||
failureThreshold: 3
|
failureThreshold: 5
|
||||||
resources:
|
resources:
|
||||||
requests:
|
requests:
|
||||||
cpu: 100m
|
cpu: 100m
|
||||||
|
|
|
||||||
|
|
@ -7,4 +7,4 @@ spec:
|
||||||
- ReadWriteOnce
|
- ReadWriteOnce
|
||||||
resources:
|
resources:
|
||||||
requests:
|
requests:
|
||||||
storage: 2Gi
|
storage: 6Gi
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: Service
|
kind: Service
|
||||||
metadata:
|
metadata:
|
||||||
name: postgres
|
name: postgres-service
|
||||||
labels:
|
labels:
|
||||||
app: postgres
|
app: postgres
|
||||||
spec:
|
spec:
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ data:
|
||||||
filename: /tmp/positions.yaml
|
filename: /tmp/positions.yaml
|
||||||
|
|
||||||
clients:
|
clients:
|
||||||
- url: http://loki:3100/loki/api/v1/push
|
- url: http://loki-service:3100/loki/api/v1/push
|
||||||
|
|
||||||
scrape_configs:
|
scrape_configs:
|
||||||
- job_name: kubernetes-pods
|
- job_name: kubernetes-pods
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ spec:
|
||||||
containers:
|
containers:
|
||||||
- name: promtail
|
- name: promtail
|
||||||
image: grafana/promtail:2.9.4
|
image: grafana/promtail:2.9.4
|
||||||
|
imagePullPolicy: IfNotPresent
|
||||||
args:
|
args:
|
||||||
- -config.file=/etc/promtail/promtail.yaml
|
- -config.file=/etc/promtail/promtail.yaml
|
||||||
ports:
|
ports:
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,14 @@
|
||||||
|
#!/bin/sh
|
||||||
|
# Create the rabbitmq server certificate secret.
|
||||||
|
# Run this script from the directory that contains the three cert files,
|
||||||
|
# or adjust the paths below to point at the actual files.
|
||||||
|
#
|
||||||
|
# Expected files (generated during RabbitMQ TLS setup):
|
||||||
|
# ca_certificate.pem
|
||||||
|
# server_certificate.pem
|
||||||
|
# server_key.pem
|
||||||
|
|
||||||
|
kubectl create secret generic rabbitmq-certs \
|
||||||
|
--from-file=ca_certificate.pem=./ca_certificate.pem \
|
||||||
|
--from-file=server_certificate.pem=./server_certificate.pem \
|
||||||
|
--from-file=server_key.pem=./server_key.pem
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
apiVersion: apps/v1
|
apiVersion: apps/v1
|
||||||
kind: Deployment
|
kind: Deployment
|
||||||
metadata:
|
metadata:
|
||||||
name: eventrt-rabbitmq
|
name: rabbitmq
|
||||||
spec:
|
spec:
|
||||||
replicas: 1
|
replicas: 1
|
||||||
selector:
|
selector:
|
||||||
|
|
@ -15,6 +15,7 @@ spec:
|
||||||
containers:
|
containers:
|
||||||
- name: rabbitmq
|
- name: rabbitmq
|
||||||
image: rabbitmq:4.1.1-management-alpine
|
image: rabbitmq:4.1.1-management-alpine
|
||||||
|
imagePullPolicy: IfNotPresent
|
||||||
ports:
|
ports:
|
||||||
- containerPort: 4369
|
- containerPort: 4369
|
||||||
- containerPort: 5671
|
- containerPort: 5671
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
apiVersion: v1
|
||||||
|
kind: ConfigMap
|
||||||
|
metadata:
|
||||||
|
name: rabbit-plugins-conf
|
||||||
|
data:
|
||||||
|
enabled_plugins: |
|
||||||
|
[rabbitmq_auth_mechanism_ssl, rabbitmq_management, rabbitmq_management_agent, rabbitmq_prometheus, rabbitmq_web_dispatch].
|
||||||
|
|
@ -15,6 +15,7 @@ spec:
|
||||||
containers:
|
containers:
|
||||||
- name: redis
|
- name: redis
|
||||||
image: redis/redis-stack-server:latest
|
image: redis/redis-stack-server:latest
|
||||||
|
imagePullPolicy: IfNotPresent
|
||||||
resources:
|
resources:
|
||||||
limits:
|
limits:
|
||||||
memory: "128Mi"
|
memory: "128Mi"
|
||||||
|
|
|
||||||
|
|
@ -2,24 +2,20 @@
|
||||||
package diagram
|
package diagram
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
|
"modelRT/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// anchorValueOverview define struct of storage all anchor value
|
// anchorValueOverview define struct of storage all anchor value keyed by component uuid
|
||||||
var anchorValueOverview sync.Map
|
var anchorValueOverview util.TypedMap[string, string]
|
||||||
|
|
||||||
// GetAnchorValue define func of get circuit diagram data by componentID
|
// GetAnchorValue define func of get circuit diagram data by componentID
|
||||||
func GetAnchorValue(componentUUID string) (string, error) {
|
func GetAnchorValue(componentUUID string) (string, error) {
|
||||||
value, ok := diagramsOverview.Load(componentUUID)
|
anchorValue, ok := anchorValueOverview.Load(componentUUID)
|
||||||
if !ok {
|
if !ok {
|
||||||
return "", fmt.Errorf("can not find anchor value by componentUUID:%s", componentUUID)
|
return "", fmt.Errorf("can not find anchor value by componentUUID:%s", componentUUID)
|
||||||
}
|
}
|
||||||
anchorValue, ok := value.(string)
|
|
||||||
if !ok {
|
|
||||||
return "", errors.New("convert to string failed")
|
|
||||||
}
|
|
||||||
return anchorValue, nil
|
return anchorValue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,32 +2,27 @@
|
||||||
package diagram
|
package diagram
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"modelRT/orm"
|
"modelRT/orm"
|
||||||
|
"modelRT/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// diagramsOverview define struct of storage all circuit diagram data
|
// diagramsOverview define struct of storage all circuit diagram data keyed by component uuid
|
||||||
var diagramsOverview sync.Map
|
var diagramsOverview util.TypedMap[string, *orm.Component]
|
||||||
|
|
||||||
// GetComponentMap define func of get circuit diagram data by component uuid
|
// GetComponentMap define func of get circuit diagram data by component uuid
|
||||||
func GetComponentMap(componentUUID string) (*orm.Component, error) {
|
func GetComponentMap(componentUUID string) (*orm.Component, error) {
|
||||||
value, ok := diagramsOverview.Load(componentUUID)
|
componentInfo, ok := diagramsOverview.Load(componentUUID)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("can not find graph by global uuid:%s", componentUUID)
|
return nil, fmt.Errorf("can not find graph by global uuid:%s", componentUUID)
|
||||||
}
|
}
|
||||||
componentInfo, ok := value.(*orm.Component)
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("convert to component map struct failed")
|
|
||||||
}
|
|
||||||
return componentInfo, nil
|
return componentInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateComponentMap define func of update circuit diagram data by component uuid and component info
|
// UpdateComponentMap define func of update circuit diagram data by component uuid and component info
|
||||||
func UpdateComponentMap(componentID int64, componentInfo *orm.Component) bool {
|
func UpdateComponentMap(componentUUID string, componentInfo *orm.Component) bool {
|
||||||
_, result := diagramsOverview.Swap(componentID, componentInfo)
|
_, result := diagramsOverview.Swap(componentUUID, componentInfo)
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -65,9 +65,7 @@ func (g *Graph) AddEdge(from, to uuid.UUID) {
|
||||||
|
|
||||||
// 创建新的拓扑信息时,如果被链接的点已经存在于游离节点中
|
// 创建新的拓扑信息时,如果被链接的点已经存在于游离节点中
|
||||||
// 则将其移除
|
// 则将其移除
|
||||||
if _, exist := g.FreeVertexs[toKey]; exist {
|
delete(g.FreeVertexs, toKey)
|
||||||
delete(g.FreeVertexs, toKey)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DelNode delete a node to the graph
|
// DelNode delete a node to the graph
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,6 @@ package diagram
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"iter"
|
|
||||||
"maps"
|
|
||||||
|
|
||||||
locker "modelRT/distributedlock"
|
locker "modelRT/distributedlock"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
|
|
@ -70,55 +68,3 @@ func (rs *RedisZSet) ZRANGE(setKey string, start, stop int64) ([]string, error)
|
||||||
}
|
}
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type Comparer[T any] interface {
|
|
||||||
Compare(T) int
|
|
||||||
}
|
|
||||||
|
|
||||||
type ComparableComparer[T any] interface {
|
|
||||||
Compare(T) int
|
|
||||||
comparable // 直接嵌入 comparable 约束
|
|
||||||
}
|
|
||||||
|
|
||||||
type methodNode[E Comparer[E]] struct {
|
|
||||||
value E
|
|
||||||
left *methodNode[E]
|
|
||||||
right *methodNode[E]
|
|
||||||
}
|
|
||||||
|
|
||||||
type MethodTree[E Comparer[E]] struct {
|
|
||||||
root *methodNode[E]
|
|
||||||
}
|
|
||||||
|
|
||||||
type OrderedSet[E interface {
|
|
||||||
comparable
|
|
||||||
Comparer[E]
|
|
||||||
}] struct {
|
|
||||||
tree MethodTree[E]
|
|
||||||
elements map[E]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type ComparableOrderedSet[E ComparableComparer[E]] struct {
|
|
||||||
tree MethodTree[E]
|
|
||||||
elements map[E]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type Set[E any] interface {
|
|
||||||
Insert(E)
|
|
||||||
Delete(E)
|
|
||||||
Has(E) bool
|
|
||||||
All() iter.Seq[E]
|
|
||||||
}
|
|
||||||
|
|
||||||
func InsertAll[E any](set Set[E], seq iter.Seq[E]) {
|
|
||||||
for v := range seq {
|
|
||||||
set.Insert(v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type HashSet[E comparable] map[E]bool
|
|
||||||
|
|
||||||
func (s HashSet[E]) Insert(v E) { s[v] = true }
|
|
||||||
func (s HashSet[E]) Delete(v E) { delete(s, v) }
|
|
||||||
func (s HashSet[E]) Has(v E) bool { return s[v] }
|
|
||||||
func (s HashSet[E]) All() iter.Seq[E] { return maps.Keys(s) }
|
|
||||||
|
|
|
||||||
|
|
@ -2,32 +2,27 @@
|
||||||
package diagram
|
package diagram
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
|
"modelRT/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// graphOverview define struct of storage all circuit diagram topologic data
|
// graphOverview define struct of storage all circuit diagram topologic data keyed by pageID
|
||||||
var graphOverview sync.Map
|
var graphOverview util.TypedMap[int64, *Graph]
|
||||||
|
|
||||||
// PrintGrapMap define func of print circuit diagram topologic info data
|
// PrintGrapMap define func of print circuit diagram topologic info data
|
||||||
func PrintGrapMap() {
|
func PrintGrapMap() {
|
||||||
graphOverview.Range(func(key, value any) bool {
|
for pageID, graph := range graphOverview.All() {
|
||||||
fmt.Println(key, value)
|
fmt.Println(pageID, graph)
|
||||||
return true
|
}
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetGraphMap define func of get circuit diagram topologic data by pageID
|
// GetGraphMap define func of get circuit diagram topologic data by pageID
|
||||||
func GetGraphMap(pageID int64) (*Graph, error) {
|
func GetGraphMap(pageID int64) (*Graph, error) {
|
||||||
value, ok := graphOverview.Load(pageID)
|
graph, ok := graphOverview.Load(pageID)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("can not find graph by pageID:%d", pageID)
|
return nil, fmt.Errorf("can not find graph by pageID:%d", pageID)
|
||||||
}
|
}
|
||||||
graph, ok := value.(*Graph)
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("convert to graph struct failed")
|
|
||||||
}
|
|
||||||
return graph, nil
|
return graph, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -137,7 +137,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
|
||||||
c.JSON(http.StatusOK, resp)
|
c.JSON(http.StatusOK, resp)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
diagram.UpdateComponentMap(info.ID, component)
|
diagram.UpdateComponentMap(info.UUID, component)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(request.FreeVertexs) > 0 {
|
if len(request.FreeVertexs) > 0 {
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ package logger
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
|
@ -29,17 +30,17 @@ func (l *GormLogger) LogMode(_ gormLogger.LogLevel) gormLogger.Interface {
|
||||||
|
|
||||||
// Info define func for implementing gormLogger.Interface
|
// Info define func for implementing gormLogger.Interface
|
||||||
func (l *GormLogger) Info(ctx context.Context, msg string, data ...any) {
|
func (l *GormLogger) Info(ctx context.Context, msg string, data ...any) {
|
||||||
Info(ctx, msg, "data", data)
|
Info(ctx, fmt.Sprintf(msg, data...))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Warn define func for implementing gormLogger.Interface
|
// Warn define func for implementing gormLogger.Interface
|
||||||
func (l *GormLogger) Warn(ctx context.Context, msg string, data ...any) {
|
func (l *GormLogger) Warn(ctx context.Context, msg string, data ...any) {
|
||||||
Warn(ctx, msg, "data", data)
|
Warn(ctx, fmt.Sprintf(msg, data...))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Error define func for implementing gormLogger.Interface
|
// Error define func for implementing gormLogger.Interface
|
||||||
func (l *GormLogger) Error(ctx context.Context, msg string, data ...any) {
|
func (l *GormLogger) Error(ctx context.Context, msg string, data ...any) {
|
||||||
Error(ctx, msg, "data", data)
|
Error(ctx, fmt.Sprintf(msg, data...))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trace define func for implementing gormLogger.Interface
|
// Trace define func for implementing gormLogger.Interface
|
||||||
|
|
|
||||||
|
|
@ -47,8 +47,7 @@ func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer {
|
||||||
client: &http.Client{Timeout: 5 * time.Second},
|
client: &http.Client{Timeout: 5 * time.Second},
|
||||||
ch: make(chan string, 512),
|
ch: make(chan string, 512),
|
||||||
}
|
}
|
||||||
ls.wg.Add(1)
|
ls.wg.Go(ls.run)
|
||||||
go ls.run()
|
|
||||||
return ls
|
return ls
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -70,7 +69,6 @@ func (ls *lokiSyncer) Sync() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ls *lokiSyncer) run() {
|
func (ls *lokiSyncer) run() {
|
||||||
defer ls.wg.Done()
|
|
||||||
ticker := time.NewTicker(2 * time.Second)
|
ticker := time.NewTicker(2 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
|
|
||||||
8
main.go
8
main.go
|
|
@ -179,8 +179,8 @@ func main() {
|
||||||
// init async task worker
|
// init async task worker
|
||||||
taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient)
|
taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "Failed to initialize task worker", "error", err)
|
logger.Error(ctx, "failed to initialize task worker", "error", err)
|
||||||
// Continue without task worker, but log warning
|
// continue without task worker, but log warning
|
||||||
} else {
|
} else {
|
||||||
go taskWorker.Start()
|
go taskWorker.Start()
|
||||||
defer taskWorker.Stop()
|
defer taskWorker.Stop()
|
||||||
|
|
@ -258,7 +258,7 @@ func main() {
|
||||||
gin.SetMode(gin.ReleaseMode)
|
gin.SetMode(gin.ReleaseMode)
|
||||||
}
|
}
|
||||||
engine := gin.New()
|
engine := gin.New()
|
||||||
// 添加CORS中间件
|
// add CORS middleware
|
||||||
engine.Use(cors.New(cors.Config{
|
engine.Use(cors.New(cors.Config{
|
||||||
AllowOrigins: []string{"*"}, // 或指定具体域名
|
AllowOrigins: []string{"*"}, // 或指定具体域名
|
||||||
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
|
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
|
||||||
|
|
@ -278,7 +278,7 @@ func main() {
|
||||||
Handler: engine,
|
Handler: engine,
|
||||||
}
|
}
|
||||||
|
|
||||||
// creating a System Signal Receiver
|
// creating a system signal receiver
|
||||||
done := make(chan os.Signal, 10)
|
done := make(chan os.Signal, 10)
|
||||||
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/orm"
|
"modelRT/orm"
|
||||||
|
"modelRT/util"
|
||||||
|
|
||||||
"github.com/RediSearch/redisearch-go/v2/redisearch"
|
"github.com/RediSearch/redisearch-go/v2/redisearch"
|
||||||
)
|
)
|
||||||
|
|
@ -63,10 +64,9 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
|
||||||
}
|
}
|
||||||
|
|
||||||
safeSAdd(constants.RedisAllGridSetKey, measSet.AllGridTags)
|
safeSAdd(constants.RedisAllGridSetKey, measSet.AllGridTags)
|
||||||
gridSug := make([]redisearch.Suggestion, 0, len(measSet.AllGridTags))
|
gridSug := util.MapSlice(measSet.AllGridTags, func(gridTag string) redisearch.Suggestion {
|
||||||
for _, gridTag := range measSet.AllGridTags {
|
return redisearch.Suggestion{Term: gridTag, Score: constants.DefaultScore}
|
||||||
gridSug = append(gridSug, redisearch.Suggestion{Term: gridTag, Score: constants.DefaultScore})
|
})
|
||||||
}
|
|
||||||
ac.AddTerms(gridSug...)
|
ac.AddTerms(gridSug...)
|
||||||
|
|
||||||
safeSAdd(constants.RedisAllZoneSetKey, measSet.AllZoneTags)
|
safeSAdd(constants.RedisAllZoneSetKey, measSet.AllZoneTags)
|
||||||
|
|
@ -78,19 +78,16 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
|
||||||
|
|
||||||
// building the grid -> zones hierarchy
|
// building the grid -> zones hierarchy
|
||||||
for gridTag, zoneTags := range measSet.GridToZoneTags {
|
for gridTag, zoneTags := range measSet.GridToZoneTags {
|
||||||
sug := make([]redisearch.Suggestion, 0, len(zoneTags))
|
// add redis fuzzy search suggestion for token1-token7 type
|
||||||
for _, zoneTag := range zoneTags {
|
sug := util.MapSlice(zoneTags, func(zoneTag string) redisearch.Suggestion {
|
||||||
term := fmt.Sprintf("%s.%s", gridTag, zoneTag)
|
return redisearch.Suggestion{Term: fmt.Sprintf("%s.%s", gridTag, zoneTag), Score: constants.DefaultScore}
|
||||||
// add redis fuzzy search suggestion for token1-token7 type
|
})
|
||||||
sug = append(sug, redisearch.Suggestion{Term: term, Score: constants.DefaultScore})
|
|
||||||
}
|
|
||||||
safeSAdd(fmt.Sprintf(constants.RedisSpecGridZoneSetKey, gridTag), zoneTags)
|
safeSAdd(fmt.Sprintf(constants.RedisSpecGridZoneSetKey, gridTag), zoneTags)
|
||||||
ac.AddTerms(sug...)
|
ac.AddTerms(sug...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// building the zone -> stations hierarchy
|
// building the zone -> stations hierarchy
|
||||||
for zoneTag, stationTags := range measSet.ZoneToStationTags {
|
for zoneTag, stationTags := range measSet.ZoneToStationTags {
|
||||||
sug := make([]redisearch.Suggestion, 0, len(stationTags))
|
|
||||||
gridTag, exists := zoneToGridPath[zoneTag]
|
gridTag, exists := zoneToGridPath[zoneTag]
|
||||||
if !exists {
|
if !exists {
|
||||||
err := fmt.Errorf("zone tag to grid tag mapping not found for zoneTag: %s", zoneTag)
|
err := fmt.Errorf("zone tag to grid tag mapping not found for zoneTag: %s", zoneTag)
|
||||||
|
|
@ -98,11 +95,10 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, stationTag := range stationTags {
|
// add redis fuzzy search suggestion for token1-token7 type
|
||||||
// add redis fuzzy search suggestion for token1-token7 type
|
sug := util.MapSlice(stationTags, func(stationTag string) redisearch.Suggestion {
|
||||||
term := fmt.Sprintf("%s.%s.%s", gridTag, zoneTag, stationTag)
|
return redisearch.Suggestion{Term: fmt.Sprintf("%s.%s.%s", gridTag, zoneTag, stationTag), Score: constants.DefaultScore}
|
||||||
sug = append(sug, redisearch.Suggestion{Term: term, Score: constants.DefaultScore})
|
})
|
||||||
}
|
|
||||||
|
|
||||||
safeSAdd(fmt.Sprintf(constants.RedisSpecZoneStationSetKey, zoneTag), stationTags)
|
safeSAdd(fmt.Sprintf(constants.RedisSpecZoneStationSetKey, zoneTag), stationTags)
|
||||||
ac.AddTerms(sug...)
|
ac.AddTerms(sug...)
|
||||||
|
|
|
||||||
|
|
@ -59,36 +59,36 @@ func NewPower104DataSource(station string, packet, offset int) (*MeasurementData
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateChannelName(prefix string, number int, suffix string) (string, error) {
|
func generateChannelName(prefix string, number int, suffix string) (string, error) {
|
||||||
|
// shortPrefix is the literal prefix written into the channel name (tm/ts/tc),
|
||||||
|
// maxNumber is the inclusive upper bound, padWidth is the zero-padded digit width.
|
||||||
|
var shortPrefix string
|
||||||
|
var maxNumber, padWidth int
|
||||||
switch prefix {
|
switch prefix {
|
||||||
case constants.ChannelPrefixTelemetry:
|
case constants.ChannelPrefixTelemetry:
|
||||||
if number > 10 {
|
shortPrefix, maxNumber, padWidth = "tm", 8, 1
|
||||||
return "", common.ErrExceedsLimitType
|
|
||||||
}
|
|
||||||
var builder strings.Builder
|
|
||||||
numberStr := strconv.Itoa(number)
|
|
||||||
builder.Grow(len(prefix) + len(numberStr) + len(suffix))
|
|
||||||
builder.WriteString(prefix)
|
|
||||||
builder.WriteString(numberStr)
|
|
||||||
builder.WriteString(suffix)
|
|
||||||
channelName := builder.String()
|
|
||||||
return channelName, nil
|
|
||||||
case constants.ChannelPrefixTelesignal:
|
case constants.ChannelPrefixTelesignal:
|
||||||
var numberStr string
|
shortPrefix, maxNumber, padWidth = "ts", 16, 2
|
||||||
if number < 10 {
|
case constants.ChannelPrefixTelecommand:
|
||||||
numberStr = "0" + strconv.Itoa(number)
|
shortPrefix, maxNumber, padWidth = "tc", 9, 1
|
||||||
}
|
|
||||||
numberStr = strconv.Itoa(number)
|
|
||||||
|
|
||||||
var builder strings.Builder
|
|
||||||
builder.Grow(len(prefix) + len(numberStr) + len(suffix))
|
|
||||||
builder.WriteString(prefix)
|
|
||||||
builder.WriteString(numberStr)
|
|
||||||
builder.WriteString(suffix)
|
|
||||||
channelName := builder.String()
|
|
||||||
return channelName, nil
|
|
||||||
default:
|
default:
|
||||||
return "", common.ErrUnsupportedChannelPrefixType
|
return "", common.ErrUnsupportedChannelPrefixType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if number < 1 || number > maxNumber {
|
||||||
|
return "", common.ErrExceedsLimitType
|
||||||
|
}
|
||||||
|
|
||||||
|
numberStr := strconv.Itoa(number)
|
||||||
|
if len(numberStr) < padWidth {
|
||||||
|
numberStr = strings.Repeat("0", padWidth-len(numberStr)) + numberStr
|
||||||
|
}
|
||||||
|
|
||||||
|
var builder strings.Builder
|
||||||
|
builder.Grow(len(shortPrefix) + len(numberStr) + len(suffix))
|
||||||
|
builder.WriteString(shortPrefix)
|
||||||
|
builder.WriteString(numberStr)
|
||||||
|
builder.WriteString(suffix)
|
||||||
|
return builder.String(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTelemetryChannel define func of generate telemetry channel CL3611 data source
|
// NewTelemetryChannel define func of generate telemetry channel CL3611 data source
|
||||||
|
|
@ -109,6 +109,15 @@ func NewTelesignalChannel(station, device, channelNameSuffix string, channelNumb
|
||||||
return NewCL3611DataSource(station, device, channelName)
|
return NewCL3611DataSource(station, device, channelName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewTelecommandChannel define func of generate telecommand channel CL3611 data source
|
||||||
|
func NewTelecommandChannel(station, device, channelNameSuffix string, channelNumber int) (*MeasurementDataSource, error) {
|
||||||
|
channelName, err := generateChannelName(constants.ChannelPrefixTelecommand, channelNumber, channelNameSuffix)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to generate channel name: %w", err)
|
||||||
|
}
|
||||||
|
return NewCL3611DataSource(station, device, channelName)
|
||||||
|
}
|
||||||
|
|
||||||
// NewStandardChannel define func of generate standard channel CL3611 data source
|
// NewStandardChannel define func of generate standard channel CL3611 data source
|
||||||
func NewStandardChannel(station, device, channelType string) (*MeasurementDataSource, error) {
|
func NewStandardChannel(station, device, channelType string) (*MeasurementDataSource, error) {
|
||||||
return NewCL3611DataSource(station, device, channelType)
|
return NewCL3611DataSource(station, device, channelType)
|
||||||
|
|
@ -264,9 +273,9 @@ func GenerateMeasureIdentifier(source map[string]any) (string, error) {
|
||||||
func concatP104WithPlus(station string, packet int, offset int) string {
|
func concatP104WithPlus(station string, packet int, offset int) string {
|
||||||
packetStr := strconv.Itoa(packet)
|
packetStr := strconv.Itoa(packet)
|
||||||
offsetStr := strconv.Itoa(offset)
|
offsetStr := strconv.Itoa(offset)
|
||||||
return station + ":" + packetStr + ":" + offsetStr
|
return strings.ToLower(station + ":104:" + packetStr + ":" + offsetStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func concatCL361WithPlus(station, device, channel string) string {
|
func concatCL361WithPlus(station, device, channel string) string {
|
||||||
return station + ":" + device + ":" + "phasor" + ":" + channel
|
return strings.ToLower(station + ":" + device + ":" + "phasor" + ":" + channel)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@
|
||||||
package realtimedata
|
package realtimedata
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"modelRT/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ComputeConfig define struct of measurement computation
|
// ComputeConfig define struct of measurement computation
|
||||||
|
|
@ -19,54 +19,15 @@ type ComputeConfig struct {
|
||||||
Analyzer RealTimeAnalyzer
|
Analyzer RealTimeAnalyzer
|
||||||
}
|
}
|
||||||
|
|
||||||
// MeasComputeState define struct of manages the state of measurement computations using sync.Map
|
// MeasComputeState define struct of manages the state of measurement
|
||||||
|
// computations. It embeds util.TypedMap to inherit the concurrency-safe,
|
||||||
|
// type-safe Store/Load/Delete/LoadOrStore/Range/All/Len operations without
|
||||||
|
// per-call-site type assertions.
|
||||||
type MeasComputeState struct {
|
type MeasComputeState struct {
|
||||||
measMap sync.Map
|
util.TypedMap[string, *ComputeConfig]
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMeasComputeState define func to create and returns a new instance of MeasComputeState
|
// NewMeasComputeState define func to create and returns a new instance of MeasComputeState
|
||||||
func NewMeasComputeState() *MeasComputeState {
|
func NewMeasComputeState() *MeasComputeState {
|
||||||
return &MeasComputeState{}
|
return &MeasComputeState{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store define func to store a compute configuration for the specified key
|
|
||||||
func (m *MeasComputeState) Store(key string, config *ComputeConfig) {
|
|
||||||
m.measMap.Store(key, config)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load define func to retrieve the compute configuration for the specified key
|
|
||||||
func (m *MeasComputeState) Load(key string) (*ComputeConfig, bool) {
|
|
||||||
value, ok := m.measMap.Load(key)
|
|
||||||
if !ok {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
return value.(*ComputeConfig), true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete define func to remove the compute configuration for the specified key
|
|
||||||
func (m *MeasComputeState) Delete(key string) {
|
|
||||||
m.measMap.Delete(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadOrStore define func to returns the existing compute configuration for the key if present,otherwise stores and returns the given configuration
|
|
||||||
func (m *MeasComputeState) LoadOrStore(key string, config *ComputeConfig) (*ComputeConfig, bool) {
|
|
||||||
value, loaded := m.measMap.LoadOrStore(key, config)
|
|
||||||
return value.(*ComputeConfig), loaded
|
|
||||||
}
|
|
||||||
|
|
||||||
// Range define func to iterate over all key-configuration pairs in the map
|
|
||||||
func (m *MeasComputeState) Range(f func(key string, config *ComputeConfig) bool) {
|
|
||||||
m.measMap.Range(func(key, value any) bool {
|
|
||||||
return f(key.(string), value.(*ComputeConfig))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Len define func to return the number of compute configurations in the map
|
|
||||||
func (m *MeasComputeState) Len() int {
|
|
||||||
count := 0
|
|
||||||
m.measMap.Range(func(_, _ any) bool {
|
|
||||||
count++
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
return count
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ func (f *HandlerFactory) RegisterHandler(ctx context.Context, taskType TaskType,
|
||||||
defer f.mu.Unlock()
|
defer f.mu.Unlock()
|
||||||
|
|
||||||
f.handlers[taskType] = handler
|
f.handlers[taskType] = handler
|
||||||
logger.Info(ctx, "Handler registered",
|
logger.Info(ctx, "handler registered",
|
||||||
"task_type", taskType,
|
"task_type", taskType,
|
||||||
"handler_name", handler.Name(),
|
"handler_name", handler.Name(),
|
||||||
)
|
)
|
||||||
|
|
@ -319,7 +319,7 @@ func NewEventAnalysisHandler() *EventAnalysisHandler {
|
||||||
|
|
||||||
// Execute processes an event analysis task
|
// Execute processes an event analysis task
|
||||||
func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
||||||
logger.Info(ctx, "Starting event analysis",
|
logger.Info(ctx, "starting event analysis",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"task_params", params,
|
"task_params", params,
|
||||||
)
|
)
|
||||||
|
|
@ -332,7 +332,7 @@ func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, pa
|
||||||
// 4. Storing results in database
|
// 4. Storing results in database
|
||||||
|
|
||||||
// Simulate work
|
// Simulate work
|
||||||
logger.Info(ctx, "Event analysis completed",
|
logger.Info(ctx, "event analysis completed",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"task_params", params,
|
"task_params", params,
|
||||||
"db", db,
|
"db", db,
|
||||||
|
|
@ -360,7 +360,7 @@ func NewBatchImportHandler() *BatchImportHandler {
|
||||||
|
|
||||||
// Execute processes a batch import task
|
// Execute processes a batch import task
|
||||||
func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
||||||
logger.Info(ctx, "Starting batch import",
|
logger.Info(ctx, "starting batch import",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"task_params", params,
|
"task_params", params,
|
||||||
"db", db,
|
"db", db,
|
||||||
|
|
@ -374,7 +374,7 @@ func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, para
|
||||||
// 4. Generating import report
|
// 4. Generating import report
|
||||||
|
|
||||||
// Simulate work
|
// Simulate work
|
||||||
logger.Info(ctx, "Batch import completed",
|
logger.Info(ctx, "batch import completed",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"task_params", params,
|
"task_params", params,
|
||||||
"db", db,
|
"db", db,
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ func InitTaskWorker(ctx context.Context, config config.ModelRTConfig, db *gorm.D
|
||||||
return nil, fmt.Errorf("failed to create task worker: %w", err)
|
return nil, fmt.Errorf("failed to create task worker: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info(ctx, "Task worker initialized",
|
logger.Info(ctx, "task worker initialized",
|
||||||
"worker_pool_size", workerCfg.PoolSize,
|
"worker_pool_size", workerCfg.PoolSize,
|
||||||
"queue_consumers", workerCfg.QueueConsumerCount,
|
"queue_consumers", workerCfg.QueueConsumerCount,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ func NewMetricsLogger(ctx context.Context) *MetricsLogger {
|
||||||
|
|
||||||
// LogTaskMetrics records task processing metrics
|
// LogTaskMetrics records task processing metrics
|
||||||
func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, processingTime time.Duration, retryCount int) {
|
func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, processingTime time.Duration, retryCount int) {
|
||||||
logger.Info(m.ctx, "Task metrics",
|
logger.Info(m.ctx, "task metrics",
|
||||||
"task_type", taskType,
|
"task_type", taskType,
|
||||||
"status", status,
|
"status", status,
|
||||||
"processing_time_ms", processingTime.Milliseconds(),
|
"processing_time_ms", processingTime.Milliseconds(),
|
||||||
|
|
@ -33,7 +33,7 @@ func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, process
|
||||||
|
|
||||||
// LogQueueMetrics records queue metrics
|
// LogQueueMetrics records queue metrics
|
||||||
func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Duration) {
|
func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Duration) {
|
||||||
logger.Info(m.ctx, "Queue metrics",
|
logger.Info(m.ctx, "queue metrics",
|
||||||
"queue_depth", queueDepth,
|
"queue_depth", queueDepth,
|
||||||
"queue_latency_ms", queueLatency.Milliseconds(),
|
"queue_latency_ms", queueLatency.Milliseconds(),
|
||||||
"metric_type", "queue",
|
"metric_type", "queue",
|
||||||
|
|
@ -43,7 +43,7 @@ func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Durati
|
||||||
|
|
||||||
// LogWorkerMetrics records worker metrics
|
// LogWorkerMetrics records worker metrics
|
||||||
func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorkers int, memoryUsage uint64, cpuLoad float64) {
|
func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorkers int, memoryUsage uint64, cpuLoad float64) {
|
||||||
logger.Info(m.ctx, "Worker metrics",
|
logger.Info(m.ctx, "worker metrics",
|
||||||
"active_workers", activeWorkers,
|
"active_workers", activeWorkers,
|
||||||
"idle_workers", idleWorkers,
|
"idle_workers", idleWorkers,
|
||||||
"total_workers", totalWorkers,
|
"total_workers", totalWorkers,
|
||||||
|
|
@ -56,7 +56,7 @@ func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorker
|
||||||
|
|
||||||
// LogRetryMetrics records retry metrics
|
// LogRetryMetrics records retry metrics
|
||||||
func (m *MetricsLogger) LogRetryMetrics(taskType TaskType, retryCount int, success bool, delay time.Duration) {
|
func (m *MetricsLogger) LogRetryMetrics(taskType TaskType, retryCount int, success bool, delay time.Duration) {
|
||||||
logger.Info(m.ctx, "Retry metrics",
|
logger.Info(m.ctx, "retry metrics",
|
||||||
"task_type", taskType,
|
"task_type", taskType,
|
||||||
"retry_count", retryCount,
|
"retry_count", retryCount,
|
||||||
"retry_success", success,
|
"retry_success", success,
|
||||||
|
|
@ -71,7 +71,7 @@ func (m *MetricsLogger) LogSystemMetrics() {
|
||||||
var memStats runtime.MemStats
|
var memStats runtime.MemStats
|
||||||
runtime.ReadMemStats(&memStats)
|
runtime.ReadMemStats(&memStats)
|
||||||
|
|
||||||
logger.Info(m.ctx, "System metrics",
|
logger.Info(m.ctx, "system metrics",
|
||||||
"metric_type", "system",
|
"metric_type", "system",
|
||||||
"timestamp", time.Now().Unix(),
|
"timestamp", time.Now().Unix(),
|
||||||
"goroutines", runtime.NumGoroutine(),
|
"goroutines", runtime.NumGoroutine(),
|
||||||
|
|
@ -90,7 +90,7 @@ func (m *MetricsLogger) LogSystemMetrics() {
|
||||||
func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string, startTime, endTime time.Time, retryCount int, errorMsg string) {
|
func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string, startTime, endTime time.Time, retryCount int, errorMsg string) {
|
||||||
duration := endTime.Sub(startTime)
|
duration := endTime.Sub(startTime)
|
||||||
|
|
||||||
logger.Info(m.ctx, "Task completion metrics",
|
logger.Info(m.ctx, "task completion metrics",
|
||||||
"metric_type", "task_completion",
|
"metric_type", "task_completion",
|
||||||
"timestamp", time.Now().Unix(),
|
"timestamp", time.Now().Unix(),
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
|
|
@ -107,7 +107,7 @@ func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string
|
||||||
|
|
||||||
// LogHealthCheckMetrics records health check metrics
|
// LogHealthCheckMetrics records health check metrics
|
||||||
func (m *MetricsLogger) LogHealthCheckMetrics(healthy bool, checkDuration time.Duration, components map[string]bool) {
|
func (m *MetricsLogger) LogHealthCheckMetrics(healthy bool, checkDuration time.Duration, components map[string]bool) {
|
||||||
logger.Info(m.ctx, "Health check metrics",
|
logger.Info(m.ctx, "health check metrics",
|
||||||
"metric_type", "health_check",
|
"metric_type", "health_check",
|
||||||
"timestamp", time.Now().Unix(),
|
"timestamp", time.Now().Unix(),
|
||||||
"healthy", healthy,
|
"healthy", healthy,
|
||||||
|
|
|
||||||
|
|
@ -67,12 +67,12 @@ func (p *QueueProducer) declareInfrastructure() error {
|
||||||
// Declare durable direct exchange
|
// Declare durable direct exchange
|
||||||
err := p.ch.ExchangeDeclare(
|
err := p.ch.ExchangeDeclare(
|
||||||
constants.TaskExchangeName, // name
|
constants.TaskExchangeName, // name
|
||||||
"direct", // type
|
"direct", // type
|
||||||
true, // durable
|
true, // durable
|
||||||
false, // auto-deleted
|
false, // auto-deleted
|
||||||
false, // internal
|
false, // internal
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
nil, // arguments
|
nil, // arguments
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to declare exchange: %w", err)
|
return fmt.Errorf("failed to declare exchange: %w", err)
|
||||||
|
|
@ -81,12 +81,12 @@ func (p *QueueProducer) declareInfrastructure() error {
|
||||||
// Declare durable queue with priority support and message TTL
|
// Declare durable queue with priority support and message TTL
|
||||||
_, err = p.ch.QueueDeclare(
|
_, err = p.ch.QueueDeclare(
|
||||||
constants.TaskQueueName, // name
|
constants.TaskQueueName, // name
|
||||||
true, // durable
|
true, // durable
|
||||||
false, // delete when unused
|
false, // delete when unused
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
amqp.Table{
|
amqp.Table{
|
||||||
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
|
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
|
||||||
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), // message TTL
|
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), // message TTL
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
@ -99,8 +99,8 @@ func (p *QueueProducer) declareInfrastructure() error {
|
||||||
constants.TaskQueueName, // queue name
|
constants.TaskQueueName, // queue name
|
||||||
constants.TaskRoutingKey, // routing key
|
constants.TaskRoutingKey, // routing key
|
||||||
constants.TaskExchangeName, // exchange name
|
constants.TaskExchangeName, // exchange name
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
nil, // arguments
|
nil, // arguments
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to bind queue: %w", err)
|
return fmt.Errorf("failed to bind queue: %w", err)
|
||||||
|
|
@ -148,15 +148,15 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT
|
||||||
ctx,
|
ctx,
|
||||||
constants.TaskExchangeName, // exchange
|
constants.TaskExchangeName, // exchange
|
||||||
constants.TaskRoutingKey, // routing key
|
constants.TaskRoutingKey, // routing key
|
||||||
false, // mandatory
|
false, // mandatory
|
||||||
false, // immediate
|
false, // immediate
|
||||||
publishing,
|
publishing,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to publish task message: %w", err)
|
return fmt.Errorf("failed to publish task message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info(ctx, "Task published to queue",
|
logger.Info(ctx, "task published to queue",
|
||||||
"task_id", taskID.String(),
|
"task_id", taskID.String(),
|
||||||
"task_type", taskType,
|
"task_type", taskType,
|
||||||
"priority", priority,
|
"priority", priority,
|
||||||
|
|
@ -180,7 +180,7 @@ func (p *QueueProducer) PublishTaskWithRetry(ctx context.Context, taskID uuid.UU
|
||||||
backoff := time.Duration(1<<uint(i)) * time.Second
|
backoff := time.Duration(1<<uint(i)) * time.Second
|
||||||
backoff = min(backoff, 10*time.Second)
|
backoff = min(backoff, 10*time.Second)
|
||||||
|
|
||||||
logger.Warn(ctx, "Failed to publish task, retrying",
|
logger.Warn(ctx, "failed to publish task, retrying",
|
||||||
"task_id", taskID.String(),
|
"task_id", taskID.String(),
|
||||||
"attempt", i+1,
|
"attempt", i+1,
|
||||||
"max_retries", maxRetries,
|
"max_retries", maxRetries,
|
||||||
|
|
@ -211,10 +211,10 @@ func (p *QueueProducer) Close() error {
|
||||||
func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) {
|
func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) {
|
||||||
queue, err := p.ch.QueueDeclarePassive(
|
queue, err := p.ch.QueueDeclarePassive(
|
||||||
constants.TaskQueueName, // name
|
constants.TaskQueueName, // name
|
||||||
true, // durable
|
true, // durable
|
||||||
false, // delete when unused
|
false, // delete when unused
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
amqp.Table{
|
amqp.Table{
|
||||||
"x-max-priority": constants.TaskMaxPriority,
|
"x-max-priority": constants.TaskMaxPriority,
|
||||||
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(),
|
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(),
|
||||||
|
|
@ -246,20 +246,20 @@ func PushTaskToRabbitMQ(ctx context.Context, cfg config.RabbitMQConfig, taskChan
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Info(ctx, "push task to RabbitMQ stopped by context cancel")
|
logger.Info(ctx, "push task to RabbitMQ stopped by context cancel")
|
||||||
return
|
return
|
||||||
case msg, ok := <-taskChan:
|
case task, ok := <-taskChan:
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.Info(ctx, "task channel closed, exiting push loop")
|
logger.Info(ctx, "task channel closed, exiting push loop")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Restore trace context from the handler that enqueued this message
|
// Restore trace context from the handler that enqueued this message
|
||||||
taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier))
|
taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(task.TraceCarrier))
|
||||||
taskCtx, pubSpan := otel.Tracer("modelRT/task").Start(taskCtx, "task.publish",
|
taskCtx, pubSpan := otel.Tracer("modelRT/task").Start(taskCtx, "task.publish",
|
||||||
oteltrace.WithAttributes(attribute.String("task_id", msg.TaskID.String())),
|
oteltrace.WithAttributes(attribute.String("task_id", task.TaskID.String())),
|
||||||
)
|
)
|
||||||
if err := producer.PublishTaskWithRetry(taskCtx, msg.TaskID, msg.TaskType, msg.Priority, msg.Params, 3); err != nil {
|
if err := producer.PublishTaskWithRetry(taskCtx, task.TaskID, task.TaskType, task.Priority, task.Params, 3); err != nil {
|
||||||
pubSpan.RecordError(err)
|
pubSpan.RecordError(err)
|
||||||
logger.Error(taskCtx, "publish task to RabbitMQ failed",
|
logger.Error(taskCtx, "publish task to RabbitMQ failed",
|
||||||
"task_id", msg.TaskID, "error", err)
|
"task_id", task.TaskID, "error", err)
|
||||||
}
|
}
|
||||||
pubSpan.End()
|
pubSpan.End()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -57,7 +57,7 @@ func NewExponentialBackoffRetry(maxRetries int, initialDelay, maxDelay time.Dura
|
||||||
// ShouldRetry implements exponential backoff with jitter
|
// ShouldRetry implements exponential backoff with jitter
|
||||||
func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string, retryCount int, lastError error) (bool, time.Duration) {
|
func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string, retryCount int, lastError error) (bool, time.Duration) {
|
||||||
if retryCount >= s.MaxRetries {
|
if retryCount >= s.MaxRetries {
|
||||||
logger.Info(ctx, "Task reached maximum retry count",
|
logger.Info(ctx, "task reached maximum retry count",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"retry_count", retryCount,
|
"retry_count", retryCount,
|
||||||
"max_retries", s.MaxRetries,
|
"max_retries", s.MaxRetries,
|
||||||
|
|
@ -86,7 +86,7 @@ func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info(ctx, "Task will be retried",
|
logger.Info(ctx, "task will be retried",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"retry_count", retryCount,
|
"retry_count", retryCount,
|
||||||
"next_retry_in", delay,
|
"next_retry_in", delay,
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
||||||
shouldRetry, delay := q.strategy.ShouldRetry(ctx, taskID.String(), retryCount, lastError)
|
shouldRetry, delay := q.strategy.ShouldRetry(ctx, taskID.String(), retryCount, lastError)
|
||||||
if !shouldRetry {
|
if !shouldRetry {
|
||||||
// Mark task as permanently failed
|
// Mark task as permanently failed
|
||||||
logger.Info(ctx, "Task will not be retried, marking as failed",
|
logger.Info(ctx, "task will not be retried, marking as failed",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"retry_count", retryCount,
|
"retry_count", retryCount,
|
||||||
"max_retries", q.strategy.GetMaxRetries(),
|
"max_retries", q.strategy.GetMaxRetries(),
|
||||||
|
|
@ -63,7 +63,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
||||||
}
|
}
|
||||||
if err := database.UpdateTaskErrorInfo(ctx, tx, taskID, errorMsg, ""); err != nil {
|
if err := database.UpdateTaskErrorInfo(ctx, tx, taskID, errorMsg, ""); err != nil {
|
||||||
// Log but don't fail the whole retry scheduling
|
// Log but don't fail the whole retry scheduling
|
||||||
logger.Warn(ctx, "Failed to update task error info",
|
logger.Warn(ctx, "failed to update task error info",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
|
|
@ -74,7 +74,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "Failed to schedule task retry",
|
logger.Error(ctx, "failed to schedule task retry",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"task_type", taskType,
|
"task_type", taskType,
|
||||||
"retry_count", retryCount,
|
"retry_count", retryCount,
|
||||||
|
|
@ -84,7 +84,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info(ctx, "Task scheduled for retry",
|
logger.Info(ctx, "task scheduled for retry",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"task_type", taskType,
|
"task_type", taskType,
|
||||||
"retry_count", retryCount+1,
|
"retry_count", retryCount+1,
|
||||||
|
|
@ -100,7 +100,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
||||||
// Get tasks due for retry
|
// Get tasks due for retry
|
||||||
tasks, err := database.GetTasksForRetry(ctx, q.db, batchSize)
|
tasks, err := database.GetTasksForRetry(ctx, q.db, batchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "Failed to get tasks for retry", "error", err)
|
logger.Error(ctx, "failed to get tasks for retry", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -108,7 +108,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info(ctx, "Processing retry queue",
|
logger.Info(ctx, "processing retry queue",
|
||||||
"task_count", len(tasks),
|
"task_count", len(tasks),
|
||||||
"batch_size", batchSize,
|
"batch_size", batchSize,
|
||||||
)
|
)
|
||||||
|
|
@ -121,7 +121,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
||||||
// Publish task to queue for immediate processing
|
// Publish task to queue for immediate processing
|
||||||
taskType := TaskType(task.TaskType)
|
taskType := TaskType(task.TaskType)
|
||||||
if err := q.producer.PublishTask(ctx, task.TaskID, taskType, task.Priority, map[string]any(task.Params)); err != nil {
|
if err := q.producer.PublishTask(ctx, task.TaskID, taskType, task.Priority, map[string]any(task.Params)); err != nil {
|
||||||
logger.Error(ctx, "Failed to publish retry task to queue",
|
logger.Error(ctx, "failed to publish retry task to queue",
|
||||||
"task_id", task.TaskID,
|
"task_id", task.TaskID,
|
||||||
"task_type", taskType,
|
"task_type", taskType,
|
||||||
"error", err,
|
"error", err,
|
||||||
|
|
@ -132,7 +132,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
||||||
|
|
||||||
// Update task status back to submitted
|
// Update task status back to submitted
|
||||||
if err := database.UpdateAsyncTaskStatus(ctx, q.db, task.TaskID, "SUBMITTED"); err != nil {
|
if err := database.UpdateAsyncTaskStatus(ctx, q.db, task.TaskID, "SUBMITTED"); err != nil {
|
||||||
logger.Warn(ctx, "Failed to update retry task status",
|
logger.Warn(ctx, "failed to update retry task status",
|
||||||
"task_id", task.TaskID,
|
"task_id", task.TaskID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
|
|
@ -140,13 +140,13 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
||||||
|
|
||||||
// Clear next retry time since task is being retried now
|
// Clear next retry time since task is being retried now
|
||||||
if err := database.UpdateTaskRetryInfo(ctx, q.db, task.TaskID, task.RetryCount, 0); err != nil {
|
if err := database.UpdateTaskRetryInfo(ctx, q.db, task.TaskID, task.RetryCount, 0); err != nil {
|
||||||
logger.Warn(ctx, "Failed to clear next retry time",
|
logger.Warn(ctx, "failed to clear next retry time",
|
||||||
"task_id", task.TaskID,
|
"task_id", task.TaskID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info(ctx, "Retry task resubmitted",
|
logger.Info(ctx, "retry task resubmitted",
|
||||||
"task_id", task.TaskID,
|
"task_id", task.TaskID,
|
||||||
"task_type", taskType,
|
"task_type", taskType,
|
||||||
"retry_count", task.RetryCount,
|
"retry_count", task.RetryCount,
|
||||||
|
|
@ -166,11 +166,11 @@ func (q *RetryQueue) StartRetryScheduler(ctx context.Context, interval time.Dura
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Info(ctx, "Retry scheduler stopping")
|
logger.Info(ctx, "retry scheduler stopping")
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if err := q.ProcessRetryQueue(ctx, batchSize); err != nil {
|
if err := q.ProcessRetryQueue(ctx, batchSize); err != nil {
|
||||||
logger.Error(ctx, "Error processing retry queue", "error", err)
|
logger.Error(ctx, "error processing retry queue", "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -91,7 +91,7 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
|
||||||
return fmt.Errorf("invalid parameter type for TestTask")
|
return fmt.Errorf("invalid parameter type for TestTask")
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info(ctx, "Starting test task executionser",
|
logger.Info(ctx, "starting test task executionser",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"sleep_duration_seconds", params.SleepDuration,
|
"sleep_duration_seconds", params.SleepDuration,
|
||||||
"message", params.Message,
|
"message", params.Message,
|
||||||
|
|
@ -113,14 +113,14 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
|
||||||
|
|
||||||
// Save result to database
|
// Save result to database
|
||||||
if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil {
|
if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil {
|
||||||
logger.Error(ctx, "Failed to save test task result",
|
logger.Error(ctx, "failed to save test task result",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
return fmt.Errorf("failed to save task result: %w", err)
|
return fmt.Errorf("failed to save task result: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info(ctx, "Test task completed successfully",
|
logger.Info(ctx, "test task completed successfully",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"sleep_duration_seconds", params.SleepDuration,
|
"sleep_duration_seconds", params.SleepDuration,
|
||||||
)
|
)
|
||||||
|
|
@ -142,7 +142,7 @@ func NewTestTaskHandler() *TestTaskHandler {
|
||||||
|
|
||||||
// Execute processes a test task using the unified task interface
|
// Execute processes a test task using the unified task interface
|
||||||
func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
||||||
logger.Info(ctx, "Executing test task",
|
logger.Info(ctx, "executing test task",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"task_params", params,
|
"task_params", params,
|
||||||
"db", db,
|
"db", db,
|
||||||
|
|
|
||||||
|
|
@ -178,30 +178,26 @@ func NewTaskWorker(ctx context.Context, cfg WorkerConfig, db *gorm.DB, rabbitCfg
|
||||||
|
|
||||||
// Start begins consuming tasks from the queue
|
// Start begins consuming tasks from the queue
|
||||||
func (w *TaskWorker) Start() error {
|
func (w *TaskWorker) Start() error {
|
||||||
logger.Info(w.ctx, "Starting task worker",
|
logger.Info(w.ctx, "starting task worker",
|
||||||
"pool_size", w.cfg.PoolSize,
|
"pool_size", w.cfg.PoolSize,
|
||||||
"queue_consumers", w.cfg.QueueConsumerCount,
|
"queue_consumers", w.cfg.QueueConsumerCount,
|
||||||
)
|
)
|
||||||
|
|
||||||
// Start multiple consumers for better throughput
|
// Start multiple consumers for better throughput
|
||||||
for i := 0; i < w.cfg.QueueConsumerCount; i++ {
|
for i := 0; i < w.cfg.QueueConsumerCount; i++ {
|
||||||
w.wg.Add(1)
|
w.wg.Go(func() { w.consumerLoop(i) })
|
||||||
go w.consumerLoop(i)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start health check goroutine
|
// Start health check goroutine
|
||||||
w.wg.Add(1)
|
w.wg.Go(w.healthCheckLoop)
|
||||||
go w.healthCheckLoop()
|
|
||||||
|
|
||||||
logger.Info(w.ctx, "Task worker started successfully")
|
logger.Info(w.ctx, "task worker started successfully")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// consumerLoop runs a single RabbitMQ consumer
|
// consumerLoop runs a single RabbitMQ consumer
|
||||||
func (w *TaskWorker) consumerLoop(consumerID int) {
|
func (w *TaskWorker) consumerLoop(consumerID int) {
|
||||||
defer w.wg.Done()
|
logger.Info(w.ctx, "starting consumer", "consumer_id", consumerID)
|
||||||
|
|
||||||
logger.Info(w.ctx, "Starting consumer", "consumer_id", consumerID)
|
|
||||||
|
|
||||||
// Consume messages from the queue
|
// Consume messages from the queue
|
||||||
msgs, err := w.ch.Consume(
|
msgs, err := w.ch.Consume(
|
||||||
|
|
@ -214,7 +210,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
|
||||||
nil, // args
|
nil, // args
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(w.ctx, "Failed to start consumer",
|
logger.Error(w.ctx, "failed to start consumer",
|
||||||
"consumer_id", consumerID,
|
"consumer_id", consumerID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
|
|
@ -224,11 +220,11 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-w.stopChan:
|
case <-w.stopChan:
|
||||||
logger.Info(w.ctx, "Consumer stopping", "consumer_id", consumerID)
|
logger.Info(w.ctx, "consumer stopping", "consumer_id", consumerID)
|
||||||
return
|
return
|
||||||
case msg, ok := <-msgs:
|
case msg, ok := <-msgs:
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.Warn(w.ctx, "Consumer channel closed", "consumer_id", consumerID)
|
logger.Warn(w.ctx, "consumer channel closed", "consumer_id", consumerID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -237,7 +233,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
|
||||||
w.handleMessage(msg)
|
w.handleMessage(msg)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(w.ctx, "Failed to submit task to pool",
|
logger.Error(w.ctx, "failed to submit task to pool",
|
||||||
"consumer_id", consumerID,
|
"consumer_id", consumerID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
|
|
@ -265,7 +261,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
||||||
// Parse task message
|
// Parse task message
|
||||||
var taskMsg TaskQueueMessage
|
var taskMsg TaskQueueMessage
|
||||||
if err := json.Unmarshal(msg.Body, &taskMsg); err != nil {
|
if err := json.Unmarshal(msg.Body, &taskMsg); err != nil {
|
||||||
logger.Error(ctx, "Failed to unmarshal task message", "error", err)
|
logger.Error(ctx, "failed to unmarshal task message", "error", err)
|
||||||
msg.Nack(false, false) // Reject without requeue
|
msg.Nack(false, false) // Reject without requeue
|
||||||
w.metrics.mu.Lock()
|
w.metrics.mu.Lock()
|
||||||
w.metrics.TotalFailed++
|
w.metrics.TotalFailed++
|
||||||
|
|
@ -275,7 +271,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
||||||
|
|
||||||
// Validate message
|
// Validate message
|
||||||
if !taskMsg.Validate() {
|
if !taskMsg.Validate() {
|
||||||
logger.Error(ctx, "Invalid task message",
|
logger.Error(ctx, "invalid task message",
|
||||||
"task_id", taskMsg.TaskID,
|
"task_id", taskMsg.TaskID,
|
||||||
"task_type", taskMsg.TaskType,
|
"task_type", taskMsg.TaskType,
|
||||||
)
|
)
|
||||||
|
|
@ -299,7 +295,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
||||||
defer span.End()
|
defer span.End()
|
||||||
ctx = taskCtx
|
ctx = taskCtx
|
||||||
|
|
||||||
logger.Info(ctx, "Processing task",
|
logger.Info(ctx, "processing task",
|
||||||
"task_id", taskMsg.TaskID,
|
"task_id", taskMsg.TaskID,
|
||||||
"task_type", taskMsg.TaskType,
|
"task_type", taskMsg.TaskType,
|
||||||
"priority", taskMsg.Priority,
|
"priority", taskMsg.Priority,
|
||||||
|
|
@ -307,7 +303,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
||||||
|
|
||||||
// Update task status to RUNNING in database
|
// Update task status to RUNNING in database
|
||||||
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusRunning); err != nil {
|
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusRunning); err != nil {
|
||||||
logger.Error(ctx, "Failed to update task status", "error", err)
|
logger.Error(ctx, "failed to update task status", "error", err)
|
||||||
msg.Nack(false, true) // Reject with requeue
|
msg.Nack(false, true) // Reject with requeue
|
||||||
w.metrics.mu.Lock()
|
w.metrics.mu.Lock()
|
||||||
w.metrics.TotalFailed++
|
w.metrics.TotalFailed++
|
||||||
|
|
@ -326,7 +322,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
||||||
processingTime := time.Since(startTime)
|
processingTime := time.Since(startTime)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "Task execution failed",
|
logger.Error(ctx, "task execution failed",
|
||||||
"task_id", taskMsg.TaskID,
|
"task_id", taskMsg.TaskID,
|
||||||
"task_type", taskMsg.TaskType,
|
"task_type", taskMsg.TaskType,
|
||||||
"processing_time", processingTime,
|
"processing_time", processingTime,
|
||||||
|
|
@ -335,7 +331,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
||||||
|
|
||||||
// Update task status to FAILED
|
// Update task status to FAILED
|
||||||
if updateErr := w.updateTaskWithError(ctx, taskMsg.TaskID, err); updateErr != nil {
|
if updateErr := w.updateTaskWithError(ctx, taskMsg.TaskID, err); updateErr != nil {
|
||||||
logger.Error(ctx, "Failed to update task with error", "error", updateErr)
|
logger.Error(ctx, "failed to update task with error", "error", updateErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil {
|
if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil {
|
||||||
|
|
@ -353,7 +349,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
||||||
|
|
||||||
// Update task status to COMPLETED
|
// Update task status to COMPLETED
|
||||||
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusCompleted); err != nil {
|
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusCompleted); err != nil {
|
||||||
logger.Error(ctx, "Failed to update task status to completed", "error", err)
|
logger.Error(ctx, "failed to update task status to completed", "error", err)
|
||||||
// Still ack the message since task was processed successfully
|
// Still ack the message since task was processed successfully
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -364,7 +360,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
||||||
// Acknowledge message
|
// Acknowledge message
|
||||||
msg.Ack(false)
|
msg.Ack(false)
|
||||||
|
|
||||||
logger.Info(ctx, "Task completed successfully",
|
logger.Info(ctx, "task completed successfully",
|
||||||
"task_id", taskMsg.TaskID,
|
"task_id", taskMsg.TaskID,
|
||||||
"task_type", taskMsg.TaskType,
|
"task_type", taskMsg.TaskType,
|
||||||
"processing_time", processingTime,
|
"processing_time", processingTime,
|
||||||
|
|
@ -398,7 +394,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
||||||
// Update task status in database
|
// Update task status in database
|
||||||
err := database.UpdateAsyncTaskStatus(ctx, w.db, taskID, ormStatus)
|
err := database.UpdateAsyncTaskStatus(ctx, w.db, taskID, ormStatus)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "Failed to update task status in database",
|
logger.Error(ctx, "failed to update task status in database",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"status", status,
|
"status", status,
|
||||||
"error", err,
|
"error", err,
|
||||||
|
|
@ -410,7 +406,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
||||||
if status == StatusRunning {
|
if status == StatusRunning {
|
||||||
startedAt := time.Now().Unix()
|
startedAt := time.Now().Unix()
|
||||||
if err := database.UpdateTaskStarted(ctx, w.db, taskID, startedAt); err != nil {
|
if err := database.UpdateTaskStarted(ctx, w.db, taskID, startedAt); err != nil {
|
||||||
logger.Warn(ctx, "Failed to update task start time",
|
logger.Warn(ctx, "failed to update task start time",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
|
|
@ -423,14 +419,14 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
||||||
finishedAt := time.Now().Unix()
|
finishedAt := time.Now().Unix()
|
||||||
if status == StatusCompleted {
|
if status == StatusCompleted {
|
||||||
if err := database.CompleteAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
|
if err := database.CompleteAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
|
||||||
logger.Warn(ctx, "Failed to mark task as completed",
|
logger.Warn(ctx, "failed to mark task as completed",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := database.FailAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
|
if err := database.FailAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
|
||||||
logger.Warn(ctx, "Failed to mark task as failed",
|
logger.Warn(ctx, "failed to mark task as failed",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
|
|
@ -438,7 +434,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debug(ctx, "Task status updated",
|
logger.Debug(ctx, "task status updated",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"status", status,
|
"status", status,
|
||||||
)
|
)
|
||||||
|
|
@ -451,16 +447,16 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID,
|
||||||
stackTrace := fmt.Sprintf("%+v", err)
|
stackTrace := fmt.Sprintf("%+v", err)
|
||||||
|
|
||||||
if updateErr := database.UpdateTaskErrorInfo(ctx, w.db, taskID, errorMsg, stackTrace); updateErr != nil {
|
if updateErr := database.UpdateTaskErrorInfo(ctx, w.db, taskID, errorMsg, stackTrace); updateErr != nil {
|
||||||
logger.Error(ctx, "Failed to update task error info", "task_id", taskID, "error", updateErr)
|
logger.Error(ctx, "failed to update task error info", "task_id", taskID, "error", updateErr)
|
||||||
return updateErr
|
return updateErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if updateErr := database.UpdateAsyncTaskResultWithError(ctx, w.db, taskID, 500, errorMsg, nil); updateErr != nil {
|
if updateErr := database.UpdateAsyncTaskResultWithError(ctx, w.db, taskID, 500, errorMsg, nil); updateErr != nil {
|
||||||
logger.Error(ctx, "Failed to update task result with error", "task_id", taskID, "error", updateErr)
|
logger.Error(ctx, "failed to update task result with error", "task_id", taskID, "error", updateErr)
|
||||||
return updateErr
|
return updateErr
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Warn(ctx, "Task failed with error", "task_id", taskID, "error", errorMsg)
|
logger.Warn(ctx, "task failed with error", "task_id", taskID, "error", errorMsg)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -469,7 +465,7 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID,
|
||||||
func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uuid.UUID, params map[string]any, msg *amqp.Delivery) error {
|
func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uuid.UUID, params map[string]any, msg *amqp.Delivery) error {
|
||||||
handler, err := w.factory.GetHandler(taskType)
|
handler, err := w.factory.GetHandler(taskType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "No handler for task type", "task_type", taskType)
|
logger.Error(ctx, "no handler for task type", "task_type", taskType)
|
||||||
msg.Nack(false, false)
|
msg.Nack(false, false)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -478,8 +474,6 @@ func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uui
|
||||||
|
|
||||||
// healthCheckLoop periodically checks worker health and metrics
|
// healthCheckLoop periodically checks worker health and metrics
|
||||||
func (w *TaskWorker) healthCheckLoop() {
|
func (w *TaskWorker) healthCheckLoop() {
|
||||||
defer w.wg.Done()
|
|
||||||
|
|
||||||
ticker := time.NewTicker(w.cfg.PollingInterval)
|
ticker := time.NewTicker(w.cfg.PollingInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
|
@ -519,7 +513,7 @@ func (w *TaskWorker) checkHealth() {
|
||||||
w.metrics.WorkersIdle = w.pool.Free()
|
w.metrics.WorkersIdle = w.pool.Free()
|
||||||
w.metrics.LastHealthCheck = time.Now()
|
w.metrics.LastHealthCheck = time.Now()
|
||||||
|
|
||||||
logger.Info(w.ctx, "Worker health check",
|
logger.Info(w.ctx, "worker health check",
|
||||||
"tasks_processed", w.metrics.TotalProcessed,
|
"tasks_processed", w.metrics.TotalProcessed,
|
||||||
"tasks_failed", w.metrics.TotalFailed,
|
"tasks_failed", w.metrics.TotalFailed,
|
||||||
"tasks_success", w.metrics.TotalSuccess,
|
"tasks_success", w.metrics.TotalSuccess,
|
||||||
|
|
@ -536,7 +530,7 @@ func (w *TaskWorker) checkHealth() {
|
||||||
|
|
||||||
// Stop gracefully stops the task worker
|
// Stop gracefully stops the task worker
|
||||||
func (w *TaskWorker) Stop() error {
|
func (w *TaskWorker) Stop() error {
|
||||||
logger.Info(w.ctx, "Stopping task worker")
|
logger.Info(w.ctx, "stopping task worker")
|
||||||
|
|
||||||
// Signal all goroutines to stop
|
// Signal all goroutines to stop
|
||||||
close(w.stopChan)
|
close(w.stopChan)
|
||||||
|
|
|
||||||
|
|
@ -7,15 +7,22 @@ import (
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// MapSlice define func to build a new slice by applying f to every element of s.
|
||||||
|
func MapSlice[T, U any](s []T, f func(T) U) []U {
|
||||||
|
result := make([]U, 0, len(s))
|
||||||
|
for _, item := range s {
|
||||||
|
result = append(result, f(item))
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
// ConvertZSetMembersToFloat64 define func to conver zset member type to float64
|
// ConvertZSetMembersToFloat64 define func to conver zset member type to float64
|
||||||
func ConvertZSetMembersToFloat64(members []redis.Z) []float64 {
|
func ConvertZSetMembersToFloat64(members []redis.Z) []float64 {
|
||||||
dataFloats := make([]float64, 0, len(members))
|
|
||||||
// recovery time sorted in ascending order
|
// recovery time sorted in ascending order
|
||||||
sortRedisZByTimeMemberAscending(members)
|
sortRedisZByTimeMemberAscending(members)
|
||||||
for _, member := range members {
|
return MapSlice(members, func(member redis.Z) float64 {
|
||||||
dataFloats = append(dataFloats, member.Score)
|
return member.Score
|
||||||
}
|
})
|
||||||
return dataFloats
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func sortRedisZByTimeMemberAscending(data []redis.Z) {
|
func sortRedisZByTimeMemberAscending(data []redis.Z) {
|
||||||
|
|
|
||||||
16
util/map.go
16
util/map.go
|
|
@ -1,11 +1,13 @@
|
||||||
// Package util provide some utility functions
|
// Package util provide some utility functions
|
||||||
package util
|
package util
|
||||||
|
|
||||||
// GetKeysFromSet define func to get all keys from a map[string]struct{}
|
import (
|
||||||
func GetKeysFromSet(set map[string]struct{}) []string {
|
"maps"
|
||||||
keys := make([]string, 0, len(set))
|
"slices"
|
||||||
for key := range set {
|
)
|
||||||
keys = append(keys, key)
|
|
||||||
}
|
// GetKeysFromSet define func to get all keys from a set-like map.
|
||||||
return keys
|
// It delegates to the standard library maps/slices helpers.
|
||||||
|
func GetKeysFromSet[K comparable, V any](set map[K]V) []K {
|
||||||
|
return slices.Collect(maps.Keys(set))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,9 @@
|
||||||
// Package util provide some utility functions
|
// Package util provide some utility functions
|
||||||
package util
|
package util
|
||||||
|
|
||||||
// RemoveTargetsFromSliceSimple define func to remove targets from a slice of strings
|
// RemoveTargetsFromSliceSimple define func to remove targets from a slice
|
||||||
func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []string) []string {
|
func RemoveTargetsFromSliceSimple[T comparable](targetsSlice []T, targetsToRemove []T) []T {
|
||||||
targetsToRemoveSet := make(map[string]struct{}, len(targetsToRemove))
|
targetsToRemoveSet := SliceToSet(targetsToRemove)
|
||||||
for _, target := range targetsToRemove {
|
|
||||||
targetsToRemoveSet[target] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := len(targetsSlice) - 1; i >= 0; i-- {
|
for i := len(targetsSlice) - 1; i >= 0; i-- {
|
||||||
if _, found := targetsToRemoveSet[targetsSlice[i]]; found {
|
if _, found := targetsToRemoveSet[targetsSlice[i]]; found {
|
||||||
|
|
@ -17,21 +14,21 @@ func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []strin
|
||||||
return targetsSlice
|
return targetsSlice
|
||||||
}
|
}
|
||||||
|
|
||||||
// SliceToSet define func to convert string slice to set
|
// SliceToSet define func to convert a slice to a set
|
||||||
func SliceToSet(targetsSlice []string) map[string]struct{} {
|
func SliceToSet[T comparable](targetsSlice []T) map[T]struct{} {
|
||||||
set := make(map[string]struct{}, len(targetsSlice))
|
set := make(map[T]struct{}, len(targetsSlice))
|
||||||
for _, target := range targetsSlice {
|
for _, target := range targetsSlice {
|
||||||
set[target] = struct{}{}
|
set[target] = struct{}{}
|
||||||
}
|
}
|
||||||
return set
|
return set
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeduplicateAndReportDuplicates define func to deduplicate a slice of strings and report duplicates
|
// DeduplicateAndReportDuplicates define func to deduplicate a slice and report duplicates
|
||||||
func DeduplicateAndReportDuplicates(targetsSlice []string, sourceSlice []string) (deduplicated []string, duplicates []string) {
|
func DeduplicateAndReportDuplicates[T comparable](targetsSlice []T, sourceSlice []T) (deduplicated []T, duplicates []T) {
|
||||||
targetSet := SliceToSet(targetsSlice)
|
targetSet := SliceToSet(targetsSlice)
|
||||||
deduplicated = make([]string, 0, len(sourceSlice))
|
deduplicated = make([]T, 0, len(sourceSlice))
|
||||||
// duplicate items slice
|
// duplicate items slice
|
||||||
duplicates = make([]string, 0, len(sourceSlice))
|
duplicates = make([]T, 0, len(sourceSlice))
|
||||||
|
|
||||||
for _, source := range sourceSlice {
|
for _, source := range sourceSlice {
|
||||||
if _, found := targetSet[source]; found {
|
if _, found := targetSet[source]; found {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,84 @@
|
||||||
|
// Package util provide some utility functions
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"iter"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TypedMap define a type-safe generic wrapper around sync.Map.
|
||||||
|
// It keeps the concurrency guarantees of sync.Map while removing the
|
||||||
|
// per-call-site type assertions previously required for every Load.
|
||||||
|
type TypedMap[K comparable, V any] struct {
|
||||||
|
m sync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load define func of return the value stored for key, and whether it was found.
|
||||||
|
func (t *TypedMap[K, V]) Load(key K) (V, bool) {
|
||||||
|
value, ok := t.m.Load(key)
|
||||||
|
if !ok {
|
||||||
|
var zero V
|
||||||
|
return zero, false
|
||||||
|
}
|
||||||
|
// safe: only values of type V are ever stored through this wrapper
|
||||||
|
return value.(V), true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store define func of set the value for key.
|
||||||
|
func (t *TypedMap[K, V]) Store(key K, value V) {
|
||||||
|
t.m.Store(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadOrStore define func of return the existing value for key if present.
|
||||||
|
// Otherwise it stores and returns the given value. loaded reports whether the
|
||||||
|
// value was already present.
|
||||||
|
func (t *TypedMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
|
||||||
|
v, loaded := t.m.LoadOrStore(key, value)
|
||||||
|
return v.(V), loaded
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap define func of store value for key and return the previous value (if any).
|
||||||
|
// loaded reports whether a value was already present for key.
|
||||||
|
func (t *TypedMap[K, V]) Swap(key K, value V) (previous V, loaded bool) {
|
||||||
|
prev, loaded := t.m.Swap(key, value)
|
||||||
|
if !loaded {
|
||||||
|
var zero V
|
||||||
|
return zero, false
|
||||||
|
}
|
||||||
|
return prev.(V), true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete define func of remove the value for key.
|
||||||
|
func (t *TypedMap[K, V]) Delete(key K) {
|
||||||
|
t.m.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Range define func of iterate over all key/value pairs.
|
||||||
|
// Iteration stops early if f returns false.
|
||||||
|
func (t *TypedMap[K, V]) Range(f func(key K, value V) bool) {
|
||||||
|
t.m.Range(func(key, value any) bool {
|
||||||
|
return f(key.(K), value.(V))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len define func of return the number of key/value pairs currently stored.
|
||||||
|
// It walks the map, so the cost is O(n).
|
||||||
|
func (t *TypedMap[K, V]) Len() int {
|
||||||
|
count := 0
|
||||||
|
t.m.Range(func(_, _ any) bool {
|
||||||
|
count++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
// All define func of return an iterator over all key/value pairs,
|
||||||
|
// usable directly with range-over-func. Iteration stops early if the
|
||||||
|
// consumer breaks out of the loop.
|
||||||
|
func (t *TypedMap[K, V]) All() iter.Seq2[K, V] {
|
||||||
|
return func(yield func(K, V) bool) {
|
||||||
|
t.m.Range(func(key, value any) bool {
|
||||||
|
return yield(key.(K), value.(V))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue