Compare commits

...

10 Commits

Author SHA1 Message Date
douxu ca68cf6c18 refactor: extend TypedMap and migrate MeasComputeState onto it
- add LoadOrStore, Len, and All (range-over-func) to util.TypedMap
  - embed util.TypedMap in MeasComputeState, dropping its hand-written
    sync.Map wrappers and per-call-site type assertions
  - iterate graphOverview via All() instead of Range in PrintGrapMap
  - remove unused Set/Comparer/OrderedSet/HashSet code from redis_zset.go
  - update deploy.md to replace Promtail with Grafana Alloy in the
    observability stack
2026-06-18 16:06:06 +08:00
douxu c82ad773a3 refactor: lowercase channel name suffixes and rename PS to PF
- change all ChannelSuffix values from uppercase to lowercase
  - rename ChannelSuffixPS ("PS") to ChannelSuffixPF ("pf")
  - align channel suffix naming with downstream measurement keys
2026-06-17 10:47:35 +08:00
douxu 82622d0d85 refactor: add generic helpers and type-safe TypedMap wrapper
- add util.TypedMap, a generic wrapper over sync.Map to drop call-site type assertions
  - add generic util.MapSlice and reuse it in ConvertZSetMembersToFloat64
  - make GetKeysFromSet/SliceToSet/RemoveTargetsFromSliceSimple/DeduplicateAndReportDuplicates generic
  - migrate graphOverview to util.TypedMap[int64, *Graph]
  - build redis suggestions via util.MapSlice in measurement group recommend
2026-06-16 16:15:28 +08:00
douxu 908c713565 chore: add rabbitmq cert secret script and plugins configmap
- add rabbitmq-certs-secret.sh helper to create the server cert secret
  - add rabbitmq-plugins-config.yaml ConfigMap enabling ssl auth, management,
    prometheus, and web dispatch plugins
  - rename rabbitmq Deployment from `eventrt-rabbitmq` to `rabbitmq`
  - document the secret-creation script in deploy.md
2026-06-12 11:20:58 +08:00
douxu 64b6562784 docs: overhaul deploy.md cleanup and pg verification sections
- add pg connection verification commands (pg_isready, psql queries)
  - renumber pg subsections (4.4.2→4.4.5) to accommodate new section
  - remove MongoDB deploy section (section 4.5) from modelRT deploy guide
  - remove MongoDB SSH tunnel port-forward entries (27017/30017)
  - rewrite section 8 cleanup guide: split into local Docker, local run,
    and K8s(Minikube) categories with scale-down and full-delete options
  - add one-liner kubectl delete -f deploy/k8s/ for full teardown
2026-06-10 16:42:29 +08:00
douxu 05c64dda14 chore: add imagePullPolicy and migrate WaitGroup to wg.Go
- add imagePullPolicy: IfNotPresent to all k8s Deployments, DaemonSet
    (grafana, jaeger, loki, rabbitmq, redis, promtail)
  - migrate wg.Add(1)/go/defer wg.Done() pattern to wg.Go() (Go 1.25+)
    in logger/loki_syncer.go and task/worker.go
  - simplify redundant map existence check before delete in diagram/graph.go
  - update deploy.md to reflect pg PVC size (6Gi) and resource limits
2026-06-10 16:40:50 +08:00
douxu c4e892f1c7 fix: correct typo in Jaeger K8s service name
- rename `jaeger-serivce` to `jaeger-service` in jaeger-service.yaml
2026-06-08 17:05:21 +08:00
douxu 195150d9b1 fix: fix K8s service names, deployment command, and GORM logger
- rename all K8s services to xxx-service convention and update
    all configmap references (postgres, mongodb, loki, jaeger)
  - add explicit command: ["/app/modelrt"] to deployment to prevent
    args from being treated as the executable (no ENTRYPOINT in
    Dockerfile)
  - set deploy_env to development to bypass Redis empty-password
    guard in non-production Minikube environment
  - fix GormLogger Info/Warn/Error to use fmt.Sprintf(msg, data...)
    so GORM printf-style messages are formatted correctly and avoid
    json: unsupported type: func() time.Time serialization panic
  - expand pg PVC storage from 2Gi to 6Gi
  - rename loop variable msg to task in PushTaskToRabbitMQ for clarity
  - align comment indentation in queue_producer.go
2026-06-03 17:11:54 +08:00
douxu 3309e53653 docs: document Dockerfile smoke tests and load workflow for Minikube
- add 3-stage build table (builder/certs/scratch) with image size note
  - add build-arg USER_ID override example in section 5.1
  - add section 5.1.1 with smoke-test commands (size check, inspect, dry
    run, full start)
  - add workflow for loading pre-built local images into Minikube
    directly
  - bump builder base image from golang:1.25-alpine to
    golang:1.26-alpine
  - normalize inline Dockerfile comments to lowercase
  - remove example config COPY from final scratch stage
2026-06-02 16:35:13 +08:00
douxu c6545e29ba style: normalize log messages to lowercase across task package
- lowercase first letter of all logger.Info/Warn/Error message strings
    in task/worker.go, task/retry_queue.go, task/handler_factory.go,
    task/metrics_logger.go, task/retry_manager.go, task/queue_producer.go,
    task/initializer.go, task/test_task.go, and main.go
  - fix inline comments in main.go that mixed Chinese and uppercase English
  - align Dockerfile comment casing with project convention
2026-06-01 15:50:11 +08:00
47 changed files with 555 additions and 419 deletions

1
.gitignore vendored
View File

@ -32,6 +32,7 @@ go.work
# ai config # ai config
.cursor/ .cursor/
.claude/ .claude/
.codewhale/
.cursorrules .cursorrules
.copilot/ .copilot/
.chatgpt/ .chatgpt/

View File

@ -19,15 +19,15 @@ const (
// channel name suffix // channel name suffix
const ( const (
ChannelSuffixP = "P" ChannelSuffixP = "p"
ChannelSuffixQ = "Q" ChannelSuffixQ = "q"
ChannelSuffixS = "S" ChannelSuffixS = "s"
ChannelSuffixPS = "PS" ChannelSuffixPF = "pf"
ChannelSuffixF = "F" ChannelSuffixF = "f"
ChannelSuffixDeltaF = "deltaF" ChannelSuffixDeltaF = "df"
ChannelSuffixUAB = "UAB" ChannelSuffixUAB = "uab"
ChannelSuffixUBC = "UBC" ChannelSuffixUBC = "ubc"
ChannelSuffixUCA = "UCA" ChannelSuffixUCA = "uca"
) )
const ( const (

View File

@ -640,6 +640,12 @@ openssl x509 -in eventrt_client_cert.pem -noout -subject
将服务器端三个证书文件打包为 K8s Secret在证书文件所在目录执行 将服务器端三个证书文件打包为 K8s Secret在证书文件所在目录执行
```bash
sh deploy/k8s/rabbitmq-certs-secret.sh
```
该脚本等价于:
```bash ```bash
kubectl create secret generic rabbitmq-certs \ kubectl create secret generic rabbitmq-certs \
--from-file=ca_certificate.pem=./ca_certificate.pem \ --from-file=ca_certificate.pem=./ca_certificate.pem \
@ -695,7 +701,11 @@ kubectl apply -f deploy/k8s/pg-service.yaml
| **数据库** | `demo` | ConfigMap 中 `POSTGRES_DB` | | **数据库** | `demo` | ConfigMap 中 `POSTGRES_DB` |
| **用户名** | `postgres` | ConfigMap 中 `POSTGRES_USER` | | **用户名** | `postgres` | ConfigMap 中 `POSTGRES_USER` |
| **密码** | `coslight` | ConfigMap `postgres-config` 中配置,生产环境迁移至 Secret | | **密码** | `coslight` | ConfigMap `postgres-config` 中配置,生产环境迁移至 Secret |
| **存储** | `2Gi` | PVC `postgres-data` | | **存储** | `6Gi` | PVC `postgres-data` |
| **CPU** | `100m` 请求 / `500m` 上限 | StatefulSet `resources` 字段 |
| **内存** | `256Mi` 请求 / `512Mi` 上限 | StatefulSet `resources` 字段 |
> **注意:** 密码当前以明文形式存储在 `pg-configmap.yaml` 中,生产环境应将其迁移至 K8s Secret并通过环境变量注入容器避免将明文密码提交至版本库。
##### 4.4.1 等待 Pod 就绪 ##### 4.4.1 等待 Pod 就绪
@ -703,7 +713,23 @@ kubectl apply -f deploy/k8s/pg-service.yaml
kubectl wait --for=condition=ready pod -l app=postgres --timeout=120s kubectl wait --for=condition=ready pod -l app=postgres --timeout=120s
``` ```
##### 4.4.2 初始化异步任务表 ##### 4.4.2 连接验证
```bash
# 快速检查 PostgreSQL 是否接受连接
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
-- pg_isready -U postgres -d demo
# 进入 psql 执行简单查询确认数据库可用
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
-- psql -U postgres -d demo -c "SELECT current_database(), version();"
# 列出所有数据库(确认 demo 库已创建)
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
-- psql -U postgres -c "\l"
```
##### 4.4.3 初始化异步任务表
PostgreSQL 就绪后执行 1.4 节的建表 SQL可通过以下方式进入容器执行 PostgreSQL 就绪后执行 1.4 节的建表 SQL可通过以下方式进入容器执行
@ -717,14 +743,14 @@ kubectl exec -i $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metada
-- psql -U postgres -d demo < /path/to/init.sql -- psql -U postgres -d demo < /path/to/init.sql
``` ```
##### 4.4.3 状态检查 ##### 4.4.4 状态检查
```bash ```bash
kubectl get pods -l app=postgres kubectl get pods -l app=postgres
kubectl logs -l app=postgres --tail=30 kubectl logs -l app=postgres --tail=30
``` ```
##### 4.4.4 清理 ##### 4.4.5 清理
```bash ```bash
kubectl delete -f deploy/k8s/pg-service.yaml \ kubectl delete -f deploy/k8s/pg-service.yaml \
@ -733,68 +759,73 @@ kubectl delete -f deploy/k8s/pg-service.yaml \
-f deploy/k8s/pg-configmap.yaml -f deploy/k8s/pg-configmap.yaml
``` ```
#### 4.5 部署 MongoDB
```bash
kubectl apply -f deploy/k8s/mongodb-secret.yaml
kubectl apply -f deploy/k8s/mongodb-pvc.yaml
kubectl apply -f deploy/k8s/mongodb-statefulset.yaml
kubectl apply -f deploy/k8s/mongodb-service.yaml
```
| 参数 | 值 | 说明 |
| :--- | :--- | :--- |
| **镜像** | `mongo:7.0` | MongoDB 7.0 |
| **NodePort** | `30017` | 集群外访问端口 |
| **用户名** | `admin` | Root 管理员 |
| **密码** | `coslight` | Secret `mongodb-secret` 中配置,生产环境请替换强密码 |
| **存储** | `2Gi` | PVC `mongodb-data` |
> **注意:** 密码存储在 `mongodb-secret.yaml``stringData` 中,生产环境应替换为强密码,并避免将明文密码提交至版本库。
##### 4.5.1 等待 Pod 就绪
```bash
kubectl wait --for=condition=ready pod -l app=mongodb --timeout=120s
```
##### 4.5.2 连接验证
```bash
kubectl exec -it $(kubectl get pod -l app=mongodb -o jsonpath='{.items[0].metadata.name}') \
-- mongosh -u admin -p coslight --authenticationDatabase admin
```
##### 4.5.3 状态检查
```bash
kubectl get pods -l app=mongodb
kubectl logs -l app=mongodb --tail=30
```
##### 4.5.4 清理
```bash
kubectl delete -f deploy/k8s/mongodb-service.yaml \
-f deploy/k8s/mongodb-statefulset.yaml \
-f deploy/k8s/mongodb-pvc.yaml \
-f deploy/k8s/mongodb-secret.yaml
```
### 5\. 部署 ModelRTKubernetes ### 5\. 部署 ModelRTKubernetes
所有资源部署在 `default` 命名空间YAML 文件位于 `deploy/k8s/` 所有资源部署在 `default` 命名空间YAML 文件位于 `deploy/k8s/`
#### 5.1 构建并推送镜像 #### 5.1 构建并推送镜像
镜像采用三阶段构建,最终基于 `scratch`
| 阶段 | 基础镜像 | 作用 |
| :--- | :--- | :--- |
| **builder** | `golang:1.26-alpine` | 编译 Go 二进制(`CGO_ENABLED=0``-trimpath -ldflags="-s -w"` |
| **certs** | `alpine:3.21` | 提取 CA 证书、时区数据及非 root 用户定义UID 默认 `1000` |
| **runtime** | `scratch` | 仅含可执行文件与运行时依赖,无 shell、无包管理器 |
**方式一:从源码构建并加载**
```bash ```bash
# 在项目根目录执行 # 在项目根目录执行(默认运行用户 UID=1000
docker build -f deploy/dockerfile/modelrt.Dockerfile -t coslight/modelrt:latest . docker build -f deploy/dockerfile/modelrt.Dockerfile -t coslight/modelrt:latest .
# 推送到镜像仓库(或直接加载到 Minikube # 自定义运行用户 UID
docker build -f deploy/dockerfile/modelrt.Dockerfile \
--build-arg USER_ID=2000 \
-t coslight/modelrt:latest .
# 加载到 Minikube无需私有仓库
minikube image load coslight/modelrt:latest minikube image load coslight/modelrt:latest
``` ```
**方式二:直接加载已有本地镜像**
Ubuntu 宿主机上已存在构建好的镜像(如 `modelrt:v1`)时,无需重新构建,直接导入 Minikube
```bash
# 确认本地镜像存在
docker images modelrt:v1
# 加载到 Minikube
minikube image load modelrt:v1
# 验证镜像已进入 Minikube 缓存
minikube image ls | grep modelrt
```
> **注意:** `deploy/k8s/modelrt-deployment.yaml` 中的 `image` 字段需与加载的镜像名称一致,并将 `imagePullPolicy` 设为 `Never`,防止 Minikube 尝试从远端拉取。
#### 5.1.1 镜像冒烟测试
```bash
# 查看镜像大小scratch 镜像预期 ≤ 25 MB
docker images coslight/modelrt:latest
# 检查镜像元信息(确认 User、Cmd、架构
docker inspect coslight/modelrt:latest
# 验证二进制可执行(无 config 时程序报错退出属预期行为,说明镜像构建正常)
docker run --rm coslight/modelrt:latest
# 挂载示例配置做完整启动验证Ctrl+C 退出)
docker run --rm \
-v "$(pwd)/configs/config.example.yaml:/app/configs/config.yaml" \
-p 8080:8080 \
coslight/modelrt:latest
```
> **注意:** `scratch` 镜像不含 shell无法使用 `docker exec` 进入容器调试;如需排查问题,可临时将最终阶段改为 `alpine` 进行本地调试,确认后再切回 `scratch`
#### 5.2 创建客户端证书 Secret #### 5.2 创建客户端证书 Secret
在 RabbitMQ TLS 证书生成完成后(见 4.2),进入证书文件所在目录执行: 在 RabbitMQ TLS 证书生成完成后(见 4.2),进入证书文件所在目录执行:
@ -864,7 +895,9 @@ kubectl delete secret modelrt-certs
### 6\. 部署可观测性栈Kubernetes ### 6\. 部署可观测性栈Kubernetes
`Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Promtail + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/` `Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Alloy + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`
> **日志采集器说明:** 集群内的日志采集由 `Grafana Alloy`DaemonSet负责它通过 Kubernetes API 抓取带 `app` label 的 Pod 容器日志,解析 `zap` 输出的 JSON 字段后推送到 `Loki`。Alloy 已**替代**早期的 `Promtail`,两者推送目标(`loki-service:3100`)与标签解析完全一致,**不要同时部署**,否则会导致 Loki 中日志翻倍。
#### 6.1 部署 Jaeger #### 6.1 部署 Jaeger
@ -882,14 +915,16 @@ kubectl apply -f deploy/k8s/loki-deployment.yaml
kubectl apply -f deploy/k8s/loki-service.yaml kubectl apply -f deploy/k8s/loki-service.yaml
``` ```
#### 6.3 部署 Promtail #### 6.3 部署 Alloy
```bash ```bash
kubectl apply -f deploy/k8s/promtail-rbac.yaml kubectl apply -f deploy/k8s/alloy-rbac.yaml
kubectl apply -f deploy/k8s/promtail-configmap.yaml kubectl apply -f deploy/k8s/alloy-configmap.yaml
kubectl apply -f deploy/k8s/promtail-daemonset.yaml kubectl apply -f deploy/k8s/alloy-daemonset.yaml
``` ```
> Alloy 以 DaemonSet 形式在每个节点运行,需要 `ServiceAccount` + `ClusterRole``alloy-rbac.yaml`)授予读取 `nodes/pods/pods/log` 的权限。采集与解析规则定义在 `alloy-configmap.yaml``config.alloy` 中。
#### 6.4 部署 Grafana #### 6.4 部署 Grafana
```bash ```bash
@ -907,9 +942,9 @@ kubectl apply -f deploy/k8s/jaeger-deployment.yaml \
-f deploy/k8s/loki-pvc.yaml \ -f deploy/k8s/loki-pvc.yaml \
-f deploy/k8s/loki-deployment.yaml \ -f deploy/k8s/loki-deployment.yaml \
-f deploy/k8s/loki-service.yaml \ -f deploy/k8s/loki-service.yaml \
-f deploy/k8s/promtail-rbac.yaml \ -f deploy/k8s/alloy-rbac.yaml \
-f deploy/k8s/promtail-configmap.yaml \ -f deploy/k8s/alloy-configmap.yaml \
-f deploy/k8s/promtail-daemonset.yaml \ -f deploy/k8s/alloy-daemonset.yaml \
-f deploy/k8s/grafana-configmap.yaml \ -f deploy/k8s/grafana-configmap.yaml \
-f deploy/k8s/grafana-deployment.yaml \ -f deploy/k8s/grafana-deployment.yaml \
-f deploy/k8s/grafana-service.yaml -f deploy/k8s/grafana-service.yaml
@ -955,7 +990,6 @@ Mac 本地端口 ──SSH隧道──▶ Ubuntu 宿主机 (192.168.1.101)
```bash ```bash
ssh -L 5432:192.168.49.2:30432 \ ssh -L 5432:192.168.49.2:30432 \
-L 27017:192.168.49.2:30017 \
-L 5671:192.168.49.2:30671 \ -L 5671:192.168.49.2:30671 \
-L 15671:192.168.49.2:31671 \ -L 15671:192.168.49.2:31671 \
-L 6379:192.168.49.2:30001 \ -L 6379:192.168.49.2:30001 \
@ -971,7 +1005,6 @@ ssh -L 5432:192.168.49.2:30432 \
```bash ```bash
ssh -fN \ ssh -fN \
-L 5432:192.168.49.2:30432 \ -L 5432:192.168.49.2:30432 \
-L 27017:192.168.49.2:30017 \
-L 5671:192.168.49.2:30671 \ -L 5671:192.168.49.2:30671 \
-L 15671:192.168.49.2:31671 \ -L 15671:192.168.49.2:31671 \
-L 6379:192.168.49.2:30001 \ -L 6379:192.168.49.2:30001 \
@ -987,7 +1020,6 @@ ssh -fN \
| Mac 本地端口 | Minikube NodePort | 服务 | 说明 | | Mac 本地端口 | Minikube NodePort | 服务 | 说明 |
| :--- | :--- | :--- | :--- | | :--- | :--- | :--- | :--- |
| `5432` | `30432` | PostgreSQL | 数据库连接 `localhost:5432` | | `5432` | `30432` | PostgreSQL | 数据库连接 `localhost:5432` |
| `27017` | `30017` | MongoDB | 数据库连接 `localhost:27017` |
| `5671` | `30671` | RabbitMQ AMQP | ModelRT / EventRT 消息队列连接 | | `5671` | `30671` | RabbitMQ AMQP | ModelRT / EventRT 消息队列连接 |
| `15671` | `31671` | RabbitMQ Management | RabbitMQ 管理界面 `http://localhost:15671` | | `15671` | `31671` | RabbitMQ Management | RabbitMQ 管理界面 `http://localhost:15671` |
| `6379` | `30001` | Redis | 分布式锁 / 数据存储 | | `6379` | `30001` | Redis | 分布式锁 / 数据存储 |
@ -1011,14 +1043,111 @@ kill <PID>
### 8\. 后续操作(停止与清理) ### 8\. 后续操作(停止与清理)
#### 8.1 停止容器 #### 8.1 本地 Docker 部署清理
适用于第 1、2 节使用 `docker run` 启动的 PostgreSQL 和 Redis 容器。
```bash ```bash
# 停止容器
docker stop postgres redis docker stop postgres redis
```
#### 8.2 删除容器(删除后数据将丢失) # 删除容器(容器内数据将同步丢失)
```bash
docker rm postgres redis docker rm postgres redis
``` ```
#### 8.2 本地运行清理
适用于第 3 节以 `go run` 或编译后二进制方式在本地启动的 ModelRT 服务。
前台运行时直接 `Ctrl+C` 终止;后台运行时查找并终止进程:
```bash
# 终止 go run 启动的进程
pkill -f "go run main.go"
# 或终止编译后的二进制进程
pkill model-rt
```
#### 8.3 K8s(Minikube) 部署清理
适用于第 4、5、6 节在 Minikube 中部署的所有资源。
##### 8.3.1 分服务清理
**仅停止(缩容至 0PVC 数据保留)**
将所有 Deployment 和 StatefulSet 缩容至 0 副本Pod 停止运行但持久卷数据不删除,之后可直接缩容回 1 恢复服务。
```bash
# 停止所有 DeploymentRedis / RabbitMQ / ModelRT / Jaeger / Loki / Grafana
kubectl scale deployment --all --replicas=0
# 停止所有 StatefulSetPostgreSQLPVC 数据保留)
kubectl scale statefulset --all --replicas=0
```
恢复时:
```bash
kubectl scale deployment --all --replicas=1
kubectl scale statefulset --all --replicas=1
```
> **注意:** DaemonSetAlloy无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/alloy-daemonset.yaml`。
---
**永久清理(删除所有资源,包含 PVC数据不可恢复**
按部署顺序反向删除各服务资源:
```bash
# 可观测性栈Grafana / Alloy / Loki / Jaeger
kubectl delete -f deploy/k8s/grafana-service.yaml \
-f deploy/k8s/grafana-deployment.yaml \
-f deploy/k8s/grafana-configmap.yaml \
-f deploy/k8s/alloy-daemonset.yaml \
-f deploy/k8s/alloy-configmap.yaml \
-f deploy/k8s/alloy-rbac.yaml \
-f deploy/k8s/loki-service.yaml \
-f deploy/k8s/loki-deployment.yaml \
-f deploy/k8s/loki-pvc.yaml \
-f deploy/k8s/loki-configmap.yaml \
-f deploy/k8s/jaeger-service.yaml \
-f deploy/k8s/jaeger-deployment.yaml
# ModelRT 应用
kubectl delete -f deploy/k8s/modelrt-service.yaml \
-f deploy/k8s/modelrt-deployment.yaml \
-f deploy/k8s/modelrt-configmap.yaml \
-f deploy/k8s/modelrt-secret.yaml
kubectl delete secret modelrt-certs
# PostgreSQL
kubectl delete -f deploy/k8s/pg-service.yaml \
-f deploy/k8s/pg-statefulset.yaml \
-f deploy/k8s/pg-pvc.yaml \
-f deploy/k8s/pg-configmap.yaml
# RabbitMQ
kubectl delete -f deploy/k8s/rabbitmq-service.yaml \
-f deploy/k8s/rabbitmq-deployment.yaml \
-f deploy/k8s/rabbitmq-users-config.yaml \
-f deploy/k8s/rabbitmq-config.yaml \
-f deploy/k8s/rabbitmq-secret.yaml
kubectl delete secret rabbitmq-certs
# Redis
kubectl delete -f deploy/k8s/redis-service.yaml \
-f deploy/k8s/redis-deployment.yaml
```
##### 8.3.2 一键清理
> **注意:** 此操作会删除 `deploy/k8s/` 下所有 YAML 对应的 K8s 资源,包括 PVC**持久化数据将永久丢失**,请确认后执行。
```bash
kubectl delete -f deploy/k8s/
kubectl delete secret rabbitmq-certs modelrt-certs
```

View File

@ -1,4 +1,4 @@
FROM golang:1.25-alpine AS builder FROM golang:1.26-alpine AS builder
RUN apk --no-cache upgrade RUN apk --no-cache upgrade
WORKDIR /app WORKDIR /app
@ -11,8 +11,8 @@ RUN CGO_ENABLED=0 GOOS=linux go build \
-mod=readonly \ -mod=readonly \
-o modelrt main.go -o modelrt main.go
# Prepare runtime dependencies in a pinned Alpine stage so they can be # prepare runtime dependencies in a pinned alpine stage so they can be
# copied into scratch without pulling any vulnerable OS packages at run time. # copied into scratch without pulling any vulnerable os packages at run time.
FROM alpine:3.21 AS certs FROM alpine:3.21 AS certs
ARG USER_ID=1000 ARG USER_ID=1000
RUN apk --no-cache add ca-certificates tzdata && \ RUN apk --no-cache add ca-certificates tzdata && \
@ -21,15 +21,14 @@ RUN apk --no-cache add ca-certificates tzdata && \
FROM scratch FROM scratch
# CA certificates required for TLS connections (RabbitMQ amqps://) # CA certificates required for TLS connections (RabbitMQ amqps://)
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Timezone data # timezone data
COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo
# Non-root user/group definitions # non-root user/group definitions
COPY --from=certs /etc/passwd /etc/passwd COPY --from=certs /etc/passwd /etc/passwd
COPY --from=certs /etc/group /etc/group COPY --from=certs /etc/group /etc/group
WORKDIR /app WORKDIR /app
COPY --from=builder /app/modelrt ./modelrt COPY --from=builder /app/modelrt ./modelrt
COPY configs/config.example.yaml ./configs/config.example.yaml
USER modelrt USER modelrt
CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"] CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"]

View File

@ -10,7 +10,7 @@ data:
- name: Loki - name: Loki
type: loki type: loki
access: proxy access: proxy
url: http://loki:3100 url: http://loki-service:3100
isDefault: true isDefault: true
jsonData: jsonData:
# derivedFields: 从日志的 traceID 字段生成跳转链接到 Jaeger # derivedFields: 从日志的 traceID 字段生成跳转链接到 Jaeger
@ -23,4 +23,4 @@ data:
type: jaeger type: jaeger
uid: jaeger uid: jaeger
access: proxy access: proxy
url: http://jaeger:16686 url: http://jaeger-service:16686

View File

@ -16,6 +16,7 @@ spec:
containers: containers:
- name: grafana - name: grafana
image: grafana/grafana:10.4.2 image: grafana/grafana:10.4.2
imagePullPolicy: IfNotPresent
ports: ports:
- containerPort: 3000 - containerPort: 3000
env: env:

View File

@ -1,14 +1,14 @@
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: grafana name: grafana-service
namespace: default namespace: default
spec: spec:
ports: ports:
- name: http - name: http
port: 3000 port: 3000
targetPort: 3000 targetPort: 3000
nodePort: 31000 # Grafana UI: http://<NodeIP>:31000 nodePort: 31000 # Grafana UI: http://<NodeIP>:31000
selector: selector:
app: grafana app: grafana
type: NodePort type: NodePort

View File

@ -15,6 +15,7 @@ spec:
containers: containers:
- name: jaeger - name: jaeger
image: jaegertracing/all-in-one:1.56 image: jaegertracing/all-in-one:1.56
imagePullPolicy: IfNotPresent
env: env:
- name: COLLECTOR_OTLP_ENABLED - name: COLLECTOR_OTLP_ENABLED
value: "true" value: "true"

View File

@ -1,7 +1,7 @@
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: jaeger name: jaeger-service
labels: labels:
app: jaeger app: jaeger
spec: spec:
@ -9,19 +9,19 @@ spec:
- name: ui - name: ui
port: 16686 port: 16686
targetPort: 16686 targetPort: 16686
nodePort: 31686 # Jaeger UI浏览器访问 http://<NodeIP>:31686 nodePort: 31686 # Jaeger UI浏览器访问 http://<NodeIP>:31686
- name: collector-http - name: collector-http
port: 14268 port: 14268
targetPort: 14268 targetPort: 14268
nodePort: 31268 # Jaeger 原生 HTTP collector非 OTel nodePort: 31268 # Jaeger 原生 HTTP collector非 OTel
- name: otlp-http - name: otlp-http
port: 4318 port: 4318
targetPort: 4318 targetPort: 4318
nodePort: 31318 # OTLP HTTP集群外使用 <NodeIP>:31318 nodePort: 31318 # OTLP HTTP集群外使用 <NodeIP>:31318
- name: otlp-grpc - name: otlp-grpc
port: 4317 port: 4317
targetPort: 4317 targetPort: 4317
nodePort: 31317 # OTLP gRPC集群外使用 <NodeIP>:31317 nodePort: 31317 # OTLP gRPC集群外使用 <NodeIP>:31317
selector: selector:
app: jaeger app: jaeger
type: NodePort type: NodePort

View File

@ -20,6 +20,7 @@ spec:
containers: containers:
- name: loki - name: loki
image: grafana/loki:2.9.4 image: grafana/loki:2.9.4
imagePullPolicy: IfNotPresent
args: args:
- -config.file=/etc/loki/loki.yaml - -config.file=/etc/loki/loki.yaml
ports: ports:

View File

@ -1,14 +1,14 @@
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: loki name: loki-service
namespace: default namespace: default
spec: spec:
ports: ports:
- name: http - name: http
port: 3100 port: 3100
targetPort: 3100 targetPort: 3100
nodePort: 31100 # 集群外访问: http://<NodeIP>:31100 nodePort: 31100 # 集群外访问: http://<NodeIP>:31100
selector: selector:
app: loki app: loki
type: NodePort type: NodePort

View File

@ -5,7 +5,7 @@ metadata:
data: data:
config.yaml: | config.yaml: |
postgres: postgres:
host: "192.168.1.101" host: "postgres-service"
port: 5432 port: 5432
database: "demo" database: "demo"
user: "postgres" user: "postgres"
@ -35,7 +35,7 @@ data:
endpoint: "" # Promtail handles log collection in K8s, direct push disabled endpoint: "" # Promtail handles log collection in K8s, direct push disabled
otel: otel:
endpoint: "jaeger:4318" endpoint: "jaeger-service:4318"
insecure: true insecure: true
ants: ants:
@ -77,7 +77,7 @@ data:
service_addr: ":8080" service_addr: ":8080"
service_name: "modelRT" service_name: "modelRT"
secret_key: "" # injected via env SERVICE_SECRET_KEY secret_key: "" # injected via env SERVICE_SECRET_KEY
deploy_env: "production" deploy_env: "development"
dataRT: dataRT:
host: "http://127.0.0.1" host: "http://127.0.0.1"

View File

@ -16,8 +16,9 @@ spec:
spec: spec:
containers: containers:
- name: modelrt - name: modelrt
image: coslight/modelrt:latest image: modelrt:v1
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
command: ["/app/modelrt"]
args: args:
- "-modelRT_config_dir=/app/configs" - "-modelRT_config_dir=/app/configs"
- "-modelRT_config_name=config" - "-modelRT_config_name=config"

View File

@ -1,7 +1,7 @@
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: mongodb name: mongodb-service
labels: labels:
app: mongodb app: mongodb
spec: spec:

View File

@ -34,9 +34,9 @@ spec:
- mongosh - mongosh
- --eval - --eval
- "db.adminCommand('ping')" - "db.adminCommand('ping')"
initialDelaySeconds: 10 initialDelaySeconds: 30
periodSeconds: 5 periodSeconds: 10
timeoutSeconds: 3 timeoutSeconds: 10
failureThreshold: 12 failureThreshold: 12
livenessProbe: livenessProbe:
exec: exec:
@ -44,10 +44,10 @@ spec:
- mongosh - mongosh
- --eval - --eval
- "db.adminCommand('ping')" - "db.adminCommand('ping')"
initialDelaySeconds: 30 initialDelaySeconds: 120
periodSeconds: 20 periodSeconds: 10
timeoutSeconds: 3 timeoutSeconds: 30
failureThreshold: 3 failureThreshold: 5
resources: resources:
requests: requests:
cpu: 100m cpu: 100m

View File

@ -7,4 +7,4 @@ spec:
- ReadWriteOnce - ReadWriteOnce
resources: resources:
requests: requests:
storage: 2Gi storage: 6Gi

View File

@ -1,7 +1,7 @@
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: postgres name: postgres-service
labels: labels:
app: postgres app: postgres
spec: spec:

View File

@ -13,7 +13,7 @@ data:
filename: /tmp/positions.yaml filename: /tmp/positions.yaml
clients: clients:
- url: http://loki:3100/loki/api/v1/push - url: http://loki-service:3100/loki/api/v1/push
scrape_configs: scrape_configs:
- job_name: kubernetes-pods - job_name: kubernetes-pods

View File

@ -19,6 +19,7 @@ spec:
containers: containers:
- name: promtail - name: promtail
image: grafana/promtail:2.9.4 image: grafana/promtail:2.9.4
imagePullPolicy: IfNotPresent
args: args:
- -config.file=/etc/promtail/promtail.yaml - -config.file=/etc/promtail/promtail.yaml
ports: ports:

View File

@ -0,0 +1,14 @@
#!/bin/sh
# Create the rabbitmq server certificate secret.
# Run this script from the directory that contains the three cert files,
# or adjust the paths below to point at the actual files.
#
# Expected files (generated during RabbitMQ TLS setup):
# ca_certificate.pem
# server_certificate.pem
# server_key.pem
kubectl create secret generic rabbitmq-certs \
--from-file=ca_certificate.pem=./ca_certificate.pem \
--from-file=server_certificate.pem=./server_certificate.pem \
--from-file=server_key.pem=./server_key.pem

View File

@ -1,7 +1,7 @@
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: eventrt-rabbitmq name: rabbitmq
spec: spec:
replicas: 1 replicas: 1
selector: selector:
@ -15,6 +15,7 @@ spec:
containers: containers:
- name: rabbitmq - name: rabbitmq
image: rabbitmq:4.1.1-management-alpine image: rabbitmq:4.1.1-management-alpine
imagePullPolicy: IfNotPresent
ports: ports:
- containerPort: 4369 - containerPort: 4369
- containerPort: 5671 - containerPort: 5671

View File

@ -0,0 +1,7 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbit-plugins-conf
data:
enabled_plugins: |
[rabbitmq_auth_mechanism_ssl, rabbitmq_management, rabbitmq_management_agent, rabbitmq_prometheus, rabbitmq_web_dispatch].

View File

@ -15,6 +15,7 @@ spec:
containers: containers:
- name: redis - name: redis
image: redis/redis-stack-server:latest image: redis/redis-stack-server:latest
imagePullPolicy: IfNotPresent
resources: resources:
limits: limits:
memory: "128Mi" memory: "128Mi"

View File

@ -2,24 +2,20 @@
package diagram package diagram
import ( import (
"errors"
"fmt" "fmt"
"sync"
"modelRT/util"
) )
// anchorValueOverview define struct of storage all anchor value // anchorValueOverview define struct of storage all anchor value keyed by component uuid
var anchorValueOverview sync.Map var anchorValueOverview util.TypedMap[string, string]
// GetAnchorValue define func of get circuit diagram data by componentID // GetAnchorValue define func of get circuit diagram data by componentID
func GetAnchorValue(componentUUID string) (string, error) { func GetAnchorValue(componentUUID string) (string, error) {
value, ok := diagramsOverview.Load(componentUUID) anchorValue, ok := anchorValueOverview.Load(componentUUID)
if !ok { if !ok {
return "", fmt.Errorf("can not find anchor value by componentUUID:%s", componentUUID) return "", fmt.Errorf("can not find anchor value by componentUUID:%s", componentUUID)
} }
anchorValue, ok := value.(string)
if !ok {
return "", errors.New("convert to string failed")
}
return anchorValue, nil return anchorValue, nil
} }

View File

@ -2,32 +2,27 @@
package diagram package diagram
import ( import (
"errors"
"fmt" "fmt"
"sync"
"modelRT/orm" "modelRT/orm"
"modelRT/util"
) )
// diagramsOverview define struct of storage all circuit diagram data // diagramsOverview define struct of storage all circuit diagram data keyed by component uuid
var diagramsOverview sync.Map var diagramsOverview util.TypedMap[string, *orm.Component]
// GetComponentMap define func of get circuit diagram data by component uuid // GetComponentMap define func of get circuit diagram data by component uuid
func GetComponentMap(componentUUID string) (*orm.Component, error) { func GetComponentMap(componentUUID string) (*orm.Component, error) {
value, ok := diagramsOverview.Load(componentUUID) componentInfo, ok := diagramsOverview.Load(componentUUID)
if !ok { if !ok {
return nil, fmt.Errorf("can not find graph by global uuid:%s", componentUUID) return nil, fmt.Errorf("can not find graph by global uuid:%s", componentUUID)
} }
componentInfo, ok := value.(*orm.Component)
if !ok {
return nil, errors.New("convert to component map struct failed")
}
return componentInfo, nil return componentInfo, nil
} }
// UpdateComponentMap define func of update circuit diagram data by component uuid and component info // UpdateComponentMap define func of update circuit diagram data by component uuid and component info
func UpdateComponentMap(componentID int64, componentInfo *orm.Component) bool { func UpdateComponentMap(componentUUID string, componentInfo *orm.Component) bool {
_, result := diagramsOverview.Swap(componentID, componentInfo) _, result := diagramsOverview.Swap(componentUUID, componentInfo)
return result return result
} }

View File

@ -65,9 +65,7 @@ func (g *Graph) AddEdge(from, to uuid.UUID) {
// 创建新的拓扑信息时,如果被链接的点已经存在于游离节点中 // 创建新的拓扑信息时,如果被链接的点已经存在于游离节点中
// 则将其移除 // 则将其移除
if _, exist := g.FreeVertexs[toKey]; exist { delete(g.FreeVertexs, toKey)
delete(g.FreeVertexs, toKey)
}
} }
// DelNode delete a node to the graph // DelNode delete a node to the graph

View File

@ -3,8 +3,6 @@ package diagram
import ( import (
"context" "context"
"iter"
"maps"
locker "modelRT/distributedlock" locker "modelRT/distributedlock"
"modelRT/logger" "modelRT/logger"
@ -70,55 +68,3 @@ func (rs *RedisZSet) ZRANGE(setKey string, start, stop int64) ([]string, error)
} }
return results, nil return results, nil
} }
type Comparer[T any] interface {
Compare(T) int
}
type ComparableComparer[T any] interface {
Compare(T) int
comparable // 直接嵌入 comparable 约束
}
type methodNode[E Comparer[E]] struct {
value E
left *methodNode[E]
right *methodNode[E]
}
type MethodTree[E Comparer[E]] struct {
root *methodNode[E]
}
type OrderedSet[E interface {
comparable
Comparer[E]
}] struct {
tree MethodTree[E]
elements map[E]bool
}
type ComparableOrderedSet[E ComparableComparer[E]] struct {
tree MethodTree[E]
elements map[E]bool
}
type Set[E any] interface {
Insert(E)
Delete(E)
Has(E) bool
All() iter.Seq[E]
}
func InsertAll[E any](set Set[E], seq iter.Seq[E]) {
for v := range seq {
set.Insert(v)
}
}
type HashSet[E comparable] map[E]bool
func (s HashSet[E]) Insert(v E) { s[v] = true }
func (s HashSet[E]) Delete(v E) { delete(s, v) }
func (s HashSet[E]) Has(v E) bool { return s[v] }
func (s HashSet[E]) All() iter.Seq[E] { return maps.Keys(s) }

View File

@ -2,32 +2,27 @@
package diagram package diagram
import ( import (
"errors"
"fmt" "fmt"
"sync"
"modelRT/util"
) )
// graphOverview define struct of storage all circuit diagram topologic data // graphOverview define struct of storage all circuit diagram topologic data keyed by pageID
var graphOverview sync.Map var graphOverview util.TypedMap[int64, *Graph]
// PrintGrapMap define func of print circuit diagram topologic info data // PrintGrapMap define func of print circuit diagram topologic info data
func PrintGrapMap() { func PrintGrapMap() {
graphOverview.Range(func(key, value any) bool { for pageID, graph := range graphOverview.All() {
fmt.Println(key, value) fmt.Println(pageID, graph)
return true }
})
} }
// GetGraphMap define func of get circuit diagram topologic data by pageID // GetGraphMap define func of get circuit diagram topologic data by pageID
func GetGraphMap(pageID int64) (*Graph, error) { func GetGraphMap(pageID int64) (*Graph, error) {
value, ok := graphOverview.Load(pageID) graph, ok := graphOverview.Load(pageID)
if !ok { if !ok {
return nil, fmt.Errorf("can not find graph by pageID:%d", pageID) return nil, fmt.Errorf("can not find graph by pageID:%d", pageID)
} }
graph, ok := value.(*Graph)
if !ok {
return nil, errors.New("convert to graph struct failed")
}
return graph, nil return graph, nil
} }

View File

@ -137,7 +137,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
c.JSON(http.StatusOK, resp) c.JSON(http.StatusOK, resp)
return return
} }
diagram.UpdateComponentMap(info.ID, component) diagram.UpdateComponentMap(info.UUID, component)
} }
if len(request.FreeVertexs) > 0 { if len(request.FreeVertexs) > 0 {

View File

@ -4,6 +4,7 @@ package logger
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"time" "time"
"gorm.io/gorm" "gorm.io/gorm"
@ -29,17 +30,17 @@ func (l *GormLogger) LogMode(_ gormLogger.LogLevel) gormLogger.Interface {
// Info define func for implementing gormLogger.Interface // Info define func for implementing gormLogger.Interface
func (l *GormLogger) Info(ctx context.Context, msg string, data ...any) { func (l *GormLogger) Info(ctx context.Context, msg string, data ...any) {
Info(ctx, msg, "data", data) Info(ctx, fmt.Sprintf(msg, data...))
} }
// Warn define func for implementing gormLogger.Interface // Warn define func for implementing gormLogger.Interface
func (l *GormLogger) Warn(ctx context.Context, msg string, data ...any) { func (l *GormLogger) Warn(ctx context.Context, msg string, data ...any) {
Warn(ctx, msg, "data", data) Warn(ctx, fmt.Sprintf(msg, data...))
} }
// Error define func for implementing gormLogger.Interface // Error define func for implementing gormLogger.Interface
func (l *GormLogger) Error(ctx context.Context, msg string, data ...any) { func (l *GormLogger) Error(ctx context.Context, msg string, data ...any) {
Error(ctx, msg, "data", data) Error(ctx, fmt.Sprintf(msg, data...))
} }
// Trace define func for implementing gormLogger.Interface // Trace define func for implementing gormLogger.Interface

View File

@ -47,8 +47,7 @@ func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer {
client: &http.Client{Timeout: 5 * time.Second}, client: &http.Client{Timeout: 5 * time.Second},
ch: make(chan string, 512), ch: make(chan string, 512),
} }
ls.wg.Add(1) ls.wg.Go(ls.run)
go ls.run()
return ls return ls
} }
@ -70,7 +69,6 @@ func (ls *lokiSyncer) Sync() error {
} }
func (ls *lokiSyncer) run() { func (ls *lokiSyncer) run() {
defer ls.wg.Done()
ticker := time.NewTicker(2 * time.Second) ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop() defer ticker.Stop()

View File

@ -179,8 +179,8 @@ func main() {
// init async task worker // init async task worker
taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient) taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient)
if err != nil { if err != nil {
logger.Error(ctx, "Failed to initialize task worker", "error", err) logger.Error(ctx, "failed to initialize task worker", "error", err)
// Continue without task worker, but log warning // continue without task worker, but log warning
} else { } else {
go taskWorker.Start() go taskWorker.Start()
defer taskWorker.Stop() defer taskWorker.Stop()
@ -258,7 +258,7 @@ func main() {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
} }
engine := gin.New() engine := gin.New()
// 添加CORS中间件 // add CORS middleware
engine.Use(cors.New(cors.Config{ engine.Use(cors.New(cors.Config{
AllowOrigins: []string{"*"}, // 或指定具体域名 AllowOrigins: []string{"*"}, // 或指定具体域名
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
@ -278,7 +278,7 @@ func main() {
Handler: engine, Handler: engine,
} }
// creating a System Signal Receiver // creating a system signal receiver
done := make(chan os.Signal, 10) done := make(chan os.Signal, 10)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() { go func() {

View File

@ -10,6 +10,7 @@ import (
"modelRT/diagram" "modelRT/diagram"
"modelRT/logger" "modelRT/logger"
"modelRT/orm" "modelRT/orm"
"modelRT/util"
"github.com/RediSearch/redisearch-go/v2/redisearch" "github.com/RediSearch/redisearch-go/v2/redisearch"
) )
@ -63,10 +64,9 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
} }
safeSAdd(constants.RedisAllGridSetKey, measSet.AllGridTags) safeSAdd(constants.RedisAllGridSetKey, measSet.AllGridTags)
gridSug := make([]redisearch.Suggestion, 0, len(measSet.AllGridTags)) gridSug := util.MapSlice(measSet.AllGridTags, func(gridTag string) redisearch.Suggestion {
for _, gridTag := range measSet.AllGridTags { return redisearch.Suggestion{Term: gridTag, Score: constants.DefaultScore}
gridSug = append(gridSug, redisearch.Suggestion{Term: gridTag, Score: constants.DefaultScore}) })
}
ac.AddTerms(gridSug...) ac.AddTerms(gridSug...)
safeSAdd(constants.RedisAllZoneSetKey, measSet.AllZoneTags) safeSAdd(constants.RedisAllZoneSetKey, measSet.AllZoneTags)
@ -78,19 +78,16 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
// building the grid -> zones hierarchy // building the grid -> zones hierarchy
for gridTag, zoneTags := range measSet.GridToZoneTags { for gridTag, zoneTags := range measSet.GridToZoneTags {
sug := make([]redisearch.Suggestion, 0, len(zoneTags)) // add redis fuzzy search suggestion for token1-token7 type
for _, zoneTag := range zoneTags { sug := util.MapSlice(zoneTags, func(zoneTag string) redisearch.Suggestion {
term := fmt.Sprintf("%s.%s", gridTag, zoneTag) return redisearch.Suggestion{Term: fmt.Sprintf("%s.%s", gridTag, zoneTag), Score: constants.DefaultScore}
// add redis fuzzy search suggestion for token1-token7 type })
sug = append(sug, redisearch.Suggestion{Term: term, Score: constants.DefaultScore})
}
safeSAdd(fmt.Sprintf(constants.RedisSpecGridZoneSetKey, gridTag), zoneTags) safeSAdd(fmt.Sprintf(constants.RedisSpecGridZoneSetKey, gridTag), zoneTags)
ac.AddTerms(sug...) ac.AddTerms(sug...)
} }
// building the zone -> stations hierarchy // building the zone -> stations hierarchy
for zoneTag, stationTags := range measSet.ZoneToStationTags { for zoneTag, stationTags := range measSet.ZoneToStationTags {
sug := make([]redisearch.Suggestion, 0, len(stationTags))
gridTag, exists := zoneToGridPath[zoneTag] gridTag, exists := zoneToGridPath[zoneTag]
if !exists { if !exists {
err := fmt.Errorf("zone tag to grid tag mapping not found for zoneTag: %s", zoneTag) err := fmt.Errorf("zone tag to grid tag mapping not found for zoneTag: %s", zoneTag)
@ -98,11 +95,10 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
return nil, nil, err return nil, nil, err
} }
for _, stationTag := range stationTags { // add redis fuzzy search suggestion for token1-token7 type
// add redis fuzzy search suggestion for token1-token7 type sug := util.MapSlice(stationTags, func(stationTag string) redisearch.Suggestion {
term := fmt.Sprintf("%s.%s.%s", gridTag, zoneTag, stationTag) return redisearch.Suggestion{Term: fmt.Sprintf("%s.%s.%s", gridTag, zoneTag, stationTag), Score: constants.DefaultScore}
sug = append(sug, redisearch.Suggestion{Term: term, Score: constants.DefaultScore}) })
}
safeSAdd(fmt.Sprintf(constants.RedisSpecZoneStationSetKey, zoneTag), stationTags) safeSAdd(fmt.Sprintf(constants.RedisSpecZoneStationSetKey, zoneTag), stationTags)
ac.AddTerms(sug...) ac.AddTerms(sug...)

View File

@ -59,36 +59,36 @@ func NewPower104DataSource(station string, packet, offset int) (*MeasurementData
} }
func generateChannelName(prefix string, number int, suffix string) (string, error) { func generateChannelName(prefix string, number int, suffix string) (string, error) {
// shortPrefix is the literal prefix written into the channel name (tm/ts/tc),
// maxNumber is the inclusive upper bound, padWidth is the zero-padded digit width.
var shortPrefix string
var maxNumber, padWidth int
switch prefix { switch prefix {
case constants.ChannelPrefixTelemetry: case constants.ChannelPrefixTelemetry:
if number > 10 { shortPrefix, maxNumber, padWidth = "tm", 8, 1
return "", common.ErrExceedsLimitType
}
var builder strings.Builder
numberStr := strconv.Itoa(number)
builder.Grow(len(prefix) + len(numberStr) + len(suffix))
builder.WriteString(prefix)
builder.WriteString(numberStr)
builder.WriteString(suffix)
channelName := builder.String()
return channelName, nil
case constants.ChannelPrefixTelesignal: case constants.ChannelPrefixTelesignal:
var numberStr string shortPrefix, maxNumber, padWidth = "ts", 16, 2
if number < 10 { case constants.ChannelPrefixTelecommand:
numberStr = "0" + strconv.Itoa(number) shortPrefix, maxNumber, padWidth = "tc", 9, 1
}
numberStr = strconv.Itoa(number)
var builder strings.Builder
builder.Grow(len(prefix) + len(numberStr) + len(suffix))
builder.WriteString(prefix)
builder.WriteString(numberStr)
builder.WriteString(suffix)
channelName := builder.String()
return channelName, nil
default: default:
return "", common.ErrUnsupportedChannelPrefixType return "", common.ErrUnsupportedChannelPrefixType
} }
if number < 1 || number > maxNumber {
return "", common.ErrExceedsLimitType
}
numberStr := strconv.Itoa(number)
if len(numberStr) < padWidth {
numberStr = strings.Repeat("0", padWidth-len(numberStr)) + numberStr
}
var builder strings.Builder
builder.Grow(len(shortPrefix) + len(numberStr) + len(suffix))
builder.WriteString(shortPrefix)
builder.WriteString(numberStr)
builder.WriteString(suffix)
return builder.String(), nil
} }
// NewTelemetryChannel define func of generate telemetry channel CL3611 data source // NewTelemetryChannel define func of generate telemetry channel CL3611 data source
@ -109,6 +109,15 @@ func NewTelesignalChannel(station, device, channelNameSuffix string, channelNumb
return NewCL3611DataSource(station, device, channelName) return NewCL3611DataSource(station, device, channelName)
} }
// NewTelecommandChannel define func of generate telecommand channel CL3611 data source
func NewTelecommandChannel(station, device, channelNameSuffix string, channelNumber int) (*MeasurementDataSource, error) {
channelName, err := generateChannelName(constants.ChannelPrefixTelecommand, channelNumber, channelNameSuffix)
if err != nil {
return nil, fmt.Errorf("failed to generate channel name: %w", err)
}
return NewCL3611DataSource(station, device, channelName)
}
// NewStandardChannel define func of generate standard channel CL3611 data source // NewStandardChannel define func of generate standard channel CL3611 data source
func NewStandardChannel(station, device, channelType string) (*MeasurementDataSource, error) { func NewStandardChannel(station, device, channelType string) (*MeasurementDataSource, error) {
return NewCL3611DataSource(station, device, channelType) return NewCL3611DataSource(station, device, channelType)
@ -264,9 +273,9 @@ func GenerateMeasureIdentifier(source map[string]any) (string, error) {
func concatP104WithPlus(station string, packet int, offset int) string { func concatP104WithPlus(station string, packet int, offset int) string {
packetStr := strconv.Itoa(packet) packetStr := strconv.Itoa(packet)
offsetStr := strconv.Itoa(offset) offsetStr := strconv.Itoa(offset)
return station + ":" + packetStr + ":" + offsetStr return strings.ToLower(station + ":104:" + packetStr + ":" + offsetStr)
} }
func concatCL361WithPlus(station, device, channel string) string { func concatCL361WithPlus(station, device, channel string) string {
return station + ":" + device + ":" + "phasor" + ":" + channel return strings.ToLower(station + ":" + device + ":" + "phasor" + ":" + channel)
} }

View File

@ -2,7 +2,7 @@
package realtimedata package realtimedata
import ( import (
"sync" "modelRT/util"
) )
// ComputeConfig define struct of measurement computation // ComputeConfig define struct of measurement computation
@ -19,54 +19,15 @@ type ComputeConfig struct {
Analyzer RealTimeAnalyzer Analyzer RealTimeAnalyzer
} }
// MeasComputeState define struct of manages the state of measurement computations using sync.Map // MeasComputeState define struct of manages the state of measurement
// computations. It embeds util.TypedMap to inherit the concurrency-safe,
// type-safe Store/Load/Delete/LoadOrStore/Range/All/Len operations without
// per-call-site type assertions.
type MeasComputeState struct { type MeasComputeState struct {
measMap sync.Map util.TypedMap[string, *ComputeConfig]
} }
// NewMeasComputeState define func to create and returns a new instance of MeasComputeState // NewMeasComputeState define func to create and returns a new instance of MeasComputeState
func NewMeasComputeState() *MeasComputeState { func NewMeasComputeState() *MeasComputeState {
return &MeasComputeState{} return &MeasComputeState{}
} }
// Store define func to store a compute configuration for the specified key
func (m *MeasComputeState) Store(key string, config *ComputeConfig) {
m.measMap.Store(key, config)
}
// Load define func to retrieve the compute configuration for the specified key
func (m *MeasComputeState) Load(key string) (*ComputeConfig, bool) {
value, ok := m.measMap.Load(key)
if !ok {
return nil, false
}
return value.(*ComputeConfig), true
}
// Delete define func to remove the compute configuration for the specified key
func (m *MeasComputeState) Delete(key string) {
m.measMap.Delete(key)
}
// LoadOrStore define func to returns the existing compute configuration for the key if present,otherwise stores and returns the given configuration
func (m *MeasComputeState) LoadOrStore(key string, config *ComputeConfig) (*ComputeConfig, bool) {
value, loaded := m.measMap.LoadOrStore(key, config)
return value.(*ComputeConfig), loaded
}
// Range define func to iterate over all key-configuration pairs in the map
func (m *MeasComputeState) Range(f func(key string, config *ComputeConfig) bool) {
m.measMap.Range(func(key, value any) bool {
return f(key.(string), value.(*ComputeConfig))
})
}
// Len define func to return the number of compute configurations in the map
func (m *MeasComputeState) Len() int {
count := 0
m.measMap.Range(func(_, _ any) bool {
count++
return true
})
return count
}

View File

@ -44,7 +44,7 @@ func (f *HandlerFactory) RegisterHandler(ctx context.Context, taskType TaskType,
defer f.mu.Unlock() defer f.mu.Unlock()
f.handlers[taskType] = handler f.handlers[taskType] = handler
logger.Info(ctx, "Handler registered", logger.Info(ctx, "handler registered",
"task_type", taskType, "task_type", taskType,
"handler_name", handler.Name(), "handler_name", handler.Name(),
) )
@ -319,7 +319,7 @@ func NewEventAnalysisHandler() *EventAnalysisHandler {
// Execute processes an event analysis task // Execute processes an event analysis task
func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error { func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
logger.Info(ctx, "Starting event analysis", logger.Info(ctx, "starting event analysis",
"task_id", taskID, "task_id", taskID,
"task_params", params, "task_params", params,
) )
@ -332,7 +332,7 @@ func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, pa
// 4. Storing results in database // 4. Storing results in database
// Simulate work // Simulate work
logger.Info(ctx, "Event analysis completed", logger.Info(ctx, "event analysis completed",
"task_id", taskID, "task_id", taskID,
"task_params", params, "task_params", params,
"db", db, "db", db,
@ -360,7 +360,7 @@ func NewBatchImportHandler() *BatchImportHandler {
// Execute processes a batch import task // Execute processes a batch import task
func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error { func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
logger.Info(ctx, "Starting batch import", logger.Info(ctx, "starting batch import",
"task_id", taskID, "task_id", taskID,
"task_params", params, "task_params", params,
"db", db, "db", db,
@ -374,7 +374,7 @@ func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, para
// 4. Generating import report // 4. Generating import report
// Simulate work // Simulate work
logger.Info(ctx, "Batch import completed", logger.Info(ctx, "batch import completed",
"task_id", taskID, "task_id", taskID,
"task_params", params, "task_params", params,
"db", db, "db", db,

View File

@ -30,7 +30,7 @@ func InitTaskWorker(ctx context.Context, config config.ModelRTConfig, db *gorm.D
return nil, fmt.Errorf("failed to create task worker: %w", err) return nil, fmt.Errorf("failed to create task worker: %w", err)
} }
logger.Info(ctx, "Task worker initialized", logger.Info(ctx, "task worker initialized",
"worker_pool_size", workerCfg.PoolSize, "worker_pool_size", workerCfg.PoolSize,
"queue_consumers", workerCfg.QueueConsumerCount, "queue_consumers", workerCfg.QueueConsumerCount,
) )

View File

@ -21,7 +21,7 @@ func NewMetricsLogger(ctx context.Context) *MetricsLogger {
// LogTaskMetrics records task processing metrics // LogTaskMetrics records task processing metrics
func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, processingTime time.Duration, retryCount int) { func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, processingTime time.Duration, retryCount int) {
logger.Info(m.ctx, "Task metrics", logger.Info(m.ctx, "task metrics",
"task_type", taskType, "task_type", taskType,
"status", status, "status", status,
"processing_time_ms", processingTime.Milliseconds(), "processing_time_ms", processingTime.Milliseconds(),
@ -33,7 +33,7 @@ func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, process
// LogQueueMetrics records queue metrics // LogQueueMetrics records queue metrics
func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Duration) { func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Duration) {
logger.Info(m.ctx, "Queue metrics", logger.Info(m.ctx, "queue metrics",
"queue_depth", queueDepth, "queue_depth", queueDepth,
"queue_latency_ms", queueLatency.Milliseconds(), "queue_latency_ms", queueLatency.Milliseconds(),
"metric_type", "queue", "metric_type", "queue",
@ -43,7 +43,7 @@ func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Durati
// LogWorkerMetrics records worker metrics // LogWorkerMetrics records worker metrics
func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorkers int, memoryUsage uint64, cpuLoad float64) { func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorkers int, memoryUsage uint64, cpuLoad float64) {
logger.Info(m.ctx, "Worker metrics", logger.Info(m.ctx, "worker metrics",
"active_workers", activeWorkers, "active_workers", activeWorkers,
"idle_workers", idleWorkers, "idle_workers", idleWorkers,
"total_workers", totalWorkers, "total_workers", totalWorkers,
@ -56,7 +56,7 @@ func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorker
// LogRetryMetrics records retry metrics // LogRetryMetrics records retry metrics
func (m *MetricsLogger) LogRetryMetrics(taskType TaskType, retryCount int, success bool, delay time.Duration) { func (m *MetricsLogger) LogRetryMetrics(taskType TaskType, retryCount int, success bool, delay time.Duration) {
logger.Info(m.ctx, "Retry metrics", logger.Info(m.ctx, "retry metrics",
"task_type", taskType, "task_type", taskType,
"retry_count", retryCount, "retry_count", retryCount,
"retry_success", success, "retry_success", success,
@ -71,7 +71,7 @@ func (m *MetricsLogger) LogSystemMetrics() {
var memStats runtime.MemStats var memStats runtime.MemStats
runtime.ReadMemStats(&memStats) runtime.ReadMemStats(&memStats)
logger.Info(m.ctx, "System metrics", logger.Info(m.ctx, "system metrics",
"metric_type", "system", "metric_type", "system",
"timestamp", time.Now().Unix(), "timestamp", time.Now().Unix(),
"goroutines", runtime.NumGoroutine(), "goroutines", runtime.NumGoroutine(),
@ -90,7 +90,7 @@ func (m *MetricsLogger) LogSystemMetrics() {
func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string, startTime, endTime time.Time, retryCount int, errorMsg string) { func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string, startTime, endTime time.Time, retryCount int, errorMsg string) {
duration := endTime.Sub(startTime) duration := endTime.Sub(startTime)
logger.Info(m.ctx, "Task completion metrics", logger.Info(m.ctx, "task completion metrics",
"metric_type", "task_completion", "metric_type", "task_completion",
"timestamp", time.Now().Unix(), "timestamp", time.Now().Unix(),
"task_id", taskID, "task_id", taskID,
@ -107,7 +107,7 @@ func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string
// LogHealthCheckMetrics records health check metrics // LogHealthCheckMetrics records health check metrics
func (m *MetricsLogger) LogHealthCheckMetrics(healthy bool, checkDuration time.Duration, components map[string]bool) { func (m *MetricsLogger) LogHealthCheckMetrics(healthy bool, checkDuration time.Duration, components map[string]bool) {
logger.Info(m.ctx, "Health check metrics", logger.Info(m.ctx, "health check metrics",
"metric_type", "health_check", "metric_type", "health_check",
"timestamp", time.Now().Unix(), "timestamp", time.Now().Unix(),
"healthy", healthy, "healthy", healthy,

View File

@ -67,12 +67,12 @@ func (p *QueueProducer) declareInfrastructure() error {
// Declare durable direct exchange // Declare durable direct exchange
err := p.ch.ExchangeDeclare( err := p.ch.ExchangeDeclare(
constants.TaskExchangeName, // name constants.TaskExchangeName, // name
"direct", // type "direct", // type
true, // durable true, // durable
false, // auto-deleted false, // auto-deleted
false, // internal false, // internal
false, // no-wait false, // no-wait
nil, // arguments nil, // arguments
) )
if err != nil { if err != nil {
return fmt.Errorf("failed to declare exchange: %w", err) return fmt.Errorf("failed to declare exchange: %w", err)
@ -81,12 +81,12 @@ func (p *QueueProducer) declareInfrastructure() error {
// Declare durable queue with priority support and message TTL // Declare durable queue with priority support and message TTL
_, err = p.ch.QueueDeclare( _, err = p.ch.QueueDeclare(
constants.TaskQueueName, // name constants.TaskQueueName, // name
true, // durable true, // durable
false, // delete when unused false, // delete when unused
false, // exclusive false, // exclusive
false, // no-wait false, // no-wait
amqp.Table{ amqp.Table{
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10 "x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), // message TTL "x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), // message TTL
}, },
) )
@ -99,8 +99,8 @@ func (p *QueueProducer) declareInfrastructure() error {
constants.TaskQueueName, // queue name constants.TaskQueueName, // queue name
constants.TaskRoutingKey, // routing key constants.TaskRoutingKey, // routing key
constants.TaskExchangeName, // exchange name constants.TaskExchangeName, // exchange name
false, // no-wait false, // no-wait
nil, // arguments nil, // arguments
) )
if err != nil { if err != nil {
return fmt.Errorf("failed to bind queue: %w", err) return fmt.Errorf("failed to bind queue: %w", err)
@ -148,15 +148,15 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT
ctx, ctx,
constants.TaskExchangeName, // exchange constants.TaskExchangeName, // exchange
constants.TaskRoutingKey, // routing key constants.TaskRoutingKey, // routing key
false, // mandatory false, // mandatory
false, // immediate false, // immediate
publishing, publishing,
) )
if err != nil { if err != nil {
return fmt.Errorf("failed to publish task message: %w", err) return fmt.Errorf("failed to publish task message: %w", err)
} }
logger.Info(ctx, "Task published to queue", logger.Info(ctx, "task published to queue",
"task_id", taskID.String(), "task_id", taskID.String(),
"task_type", taskType, "task_type", taskType,
"priority", priority, "priority", priority,
@ -180,7 +180,7 @@ func (p *QueueProducer) PublishTaskWithRetry(ctx context.Context, taskID uuid.UU
backoff := time.Duration(1<<uint(i)) * time.Second backoff := time.Duration(1<<uint(i)) * time.Second
backoff = min(backoff, 10*time.Second) backoff = min(backoff, 10*time.Second)
logger.Warn(ctx, "Failed to publish task, retrying", logger.Warn(ctx, "failed to publish task, retrying",
"task_id", taskID.String(), "task_id", taskID.String(),
"attempt", i+1, "attempt", i+1,
"max_retries", maxRetries, "max_retries", maxRetries,
@ -211,10 +211,10 @@ func (p *QueueProducer) Close() error {
func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) { func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) {
queue, err := p.ch.QueueDeclarePassive( queue, err := p.ch.QueueDeclarePassive(
constants.TaskQueueName, // name constants.TaskQueueName, // name
true, // durable true, // durable
false, // delete when unused false, // delete when unused
false, // exclusive false, // exclusive
false, // no-wait false, // no-wait
amqp.Table{ amqp.Table{
"x-max-priority": constants.TaskMaxPriority, "x-max-priority": constants.TaskMaxPriority,
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), "x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(),
@ -246,20 +246,20 @@ func PushTaskToRabbitMQ(ctx context.Context, cfg config.RabbitMQConfig, taskChan
case <-ctx.Done(): case <-ctx.Done():
logger.Info(ctx, "push task to RabbitMQ stopped by context cancel") logger.Info(ctx, "push task to RabbitMQ stopped by context cancel")
return return
case msg, ok := <-taskChan: case task, ok := <-taskChan:
if !ok { if !ok {
logger.Info(ctx, "task channel closed, exiting push loop") logger.Info(ctx, "task channel closed, exiting push loop")
return return
} }
// Restore trace context from the handler that enqueued this message // Restore trace context from the handler that enqueued this message
taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier)) taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(task.TraceCarrier))
taskCtx, pubSpan := otel.Tracer("modelRT/task").Start(taskCtx, "task.publish", taskCtx, pubSpan := otel.Tracer("modelRT/task").Start(taskCtx, "task.publish",
oteltrace.WithAttributes(attribute.String("task_id", msg.TaskID.String())), oteltrace.WithAttributes(attribute.String("task_id", task.TaskID.String())),
) )
if err := producer.PublishTaskWithRetry(taskCtx, msg.TaskID, msg.TaskType, msg.Priority, msg.Params, 3); err != nil { if err := producer.PublishTaskWithRetry(taskCtx, task.TaskID, task.TaskType, task.Priority, task.Params, 3); err != nil {
pubSpan.RecordError(err) pubSpan.RecordError(err)
logger.Error(taskCtx, "publish task to RabbitMQ failed", logger.Error(taskCtx, "publish task to RabbitMQ failed",
"task_id", msg.TaskID, "error", err) "task_id", task.TaskID, "error", err)
} }
pubSpan.End() pubSpan.End()
} }

View File

@ -57,7 +57,7 @@ func NewExponentialBackoffRetry(maxRetries int, initialDelay, maxDelay time.Dura
// ShouldRetry implements exponential backoff with jitter // ShouldRetry implements exponential backoff with jitter
func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string, retryCount int, lastError error) (bool, time.Duration) { func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string, retryCount int, lastError error) (bool, time.Duration) {
if retryCount >= s.MaxRetries { if retryCount >= s.MaxRetries {
logger.Info(ctx, "Task reached maximum retry count", logger.Info(ctx, "task reached maximum retry count",
"task_id", taskID, "task_id", taskID,
"retry_count", retryCount, "retry_count", retryCount,
"max_retries", s.MaxRetries, "max_retries", s.MaxRetries,
@ -86,7 +86,7 @@ func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string
} }
} }
logger.Info(ctx, "Task will be retried", logger.Info(ctx, "task will be retried",
"task_id", taskID, "task_id", taskID,
"retry_count", retryCount, "retry_count", retryCount,
"next_retry_in", delay, "next_retry_in", delay,

View File

@ -38,7 +38,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
shouldRetry, delay := q.strategy.ShouldRetry(ctx, taskID.String(), retryCount, lastError) shouldRetry, delay := q.strategy.ShouldRetry(ctx, taskID.String(), retryCount, lastError)
if !shouldRetry { if !shouldRetry {
// Mark task as permanently failed // Mark task as permanently failed
logger.Info(ctx, "Task will not be retried, marking as failed", logger.Info(ctx, "task will not be retried, marking as failed",
"task_id", taskID, "task_id", taskID,
"retry_count", retryCount, "retry_count", retryCount,
"max_retries", q.strategy.GetMaxRetries(), "max_retries", q.strategy.GetMaxRetries(),
@ -63,7 +63,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
} }
if err := database.UpdateTaskErrorInfo(ctx, tx, taskID, errorMsg, ""); err != nil { if err := database.UpdateTaskErrorInfo(ctx, tx, taskID, errorMsg, ""); err != nil {
// Log but don't fail the whole retry scheduling // Log but don't fail the whole retry scheduling
logger.Warn(ctx, "Failed to update task error info", logger.Warn(ctx, "failed to update task error info",
"task_id", taskID, "task_id", taskID,
"error", err, "error", err,
) )
@ -74,7 +74,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
}) })
if err != nil { if err != nil {
logger.Error(ctx, "Failed to schedule task retry", logger.Error(ctx, "failed to schedule task retry",
"task_id", taskID, "task_id", taskID,
"task_type", taskType, "task_type", taskType,
"retry_count", retryCount, "retry_count", retryCount,
@ -84,7 +84,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
return err return err
} }
logger.Info(ctx, "Task scheduled for retry", logger.Info(ctx, "task scheduled for retry",
"task_id", taskID, "task_id", taskID,
"task_type", taskType, "task_type", taskType,
"retry_count", retryCount+1, "retry_count", retryCount+1,
@ -100,7 +100,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
// Get tasks due for retry // Get tasks due for retry
tasks, err := database.GetTasksForRetry(ctx, q.db, batchSize) tasks, err := database.GetTasksForRetry(ctx, q.db, batchSize)
if err != nil { if err != nil {
logger.Error(ctx, "Failed to get tasks for retry", "error", err) logger.Error(ctx, "failed to get tasks for retry", "error", err)
return err return err
} }
@ -108,7 +108,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
return nil return nil
} }
logger.Info(ctx, "Processing retry queue", logger.Info(ctx, "processing retry queue",
"task_count", len(tasks), "task_count", len(tasks),
"batch_size", batchSize, "batch_size", batchSize,
) )
@ -121,7 +121,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
// Publish task to queue for immediate processing // Publish task to queue for immediate processing
taskType := TaskType(task.TaskType) taskType := TaskType(task.TaskType)
if err := q.producer.PublishTask(ctx, task.TaskID, taskType, task.Priority, map[string]any(task.Params)); err != nil { if err := q.producer.PublishTask(ctx, task.TaskID, taskType, task.Priority, map[string]any(task.Params)); err != nil {
logger.Error(ctx, "Failed to publish retry task to queue", logger.Error(ctx, "failed to publish retry task to queue",
"task_id", task.TaskID, "task_id", task.TaskID,
"task_type", taskType, "task_type", taskType,
"error", err, "error", err,
@ -132,7 +132,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
// Update task status back to submitted // Update task status back to submitted
if err := database.UpdateAsyncTaskStatus(ctx, q.db, task.TaskID, "SUBMITTED"); err != nil { if err := database.UpdateAsyncTaskStatus(ctx, q.db, task.TaskID, "SUBMITTED"); err != nil {
logger.Warn(ctx, "Failed to update retry task status", logger.Warn(ctx, "failed to update retry task status",
"task_id", task.TaskID, "task_id", task.TaskID,
"error", err, "error", err,
) )
@ -140,13 +140,13 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
// Clear next retry time since task is being retried now // Clear next retry time since task is being retried now
if err := database.UpdateTaskRetryInfo(ctx, q.db, task.TaskID, task.RetryCount, 0); err != nil { if err := database.UpdateTaskRetryInfo(ctx, q.db, task.TaskID, task.RetryCount, 0); err != nil {
logger.Warn(ctx, "Failed to clear next retry time", logger.Warn(ctx, "failed to clear next retry time",
"task_id", task.TaskID, "task_id", task.TaskID,
"error", err, "error", err,
) )
} }
logger.Info(ctx, "Retry task resubmitted", logger.Info(ctx, "retry task resubmitted",
"task_id", task.TaskID, "task_id", task.TaskID,
"task_type", taskType, "task_type", taskType,
"retry_count", task.RetryCount, "retry_count", task.RetryCount,
@ -166,11 +166,11 @@ func (q *RetryQueue) StartRetryScheduler(ctx context.Context, interval time.Dura
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
logger.Info(ctx, "Retry scheduler stopping") logger.Info(ctx, "retry scheduler stopping")
return return
case <-ticker.C: case <-ticker.C:
if err := q.ProcessRetryQueue(ctx, batchSize); err != nil { if err := q.ProcessRetryQueue(ctx, batchSize); err != nil {
logger.Error(ctx, "Error processing retry queue", "error", err) logger.Error(ctx, "error processing retry queue", "error", err)
} }
} }
} }

View File

@ -91,7 +91,7 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
return fmt.Errorf("invalid parameter type for TestTask") return fmt.Errorf("invalid parameter type for TestTask")
} }
logger.Info(ctx, "Starting test task executionser", logger.Info(ctx, "starting test task executionser",
"task_id", taskID, "task_id", taskID,
"sleep_duration_seconds", params.SleepDuration, "sleep_duration_seconds", params.SleepDuration,
"message", params.Message, "message", params.Message,
@ -113,14 +113,14 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
// Save result to database // Save result to database
if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil { if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil {
logger.Error(ctx, "Failed to save test task result", logger.Error(ctx, "failed to save test task result",
"task_id", taskID, "task_id", taskID,
"error", err, "error", err,
) )
return fmt.Errorf("failed to save task result: %w", err) return fmt.Errorf("failed to save task result: %w", err)
} }
logger.Info(ctx, "Test task completed successfully", logger.Info(ctx, "test task completed successfully",
"task_id", taskID, "task_id", taskID,
"sleep_duration_seconds", params.SleepDuration, "sleep_duration_seconds", params.SleepDuration,
) )
@ -142,7 +142,7 @@ func NewTestTaskHandler() *TestTaskHandler {
// Execute processes a test task using the unified task interface // Execute processes a test task using the unified task interface
func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error { func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
logger.Info(ctx, "Executing test task", logger.Info(ctx, "executing test task",
"task_id", taskID, "task_id", taskID,
"task_params", params, "task_params", params,
"db", db, "db", db,

View File

@ -178,30 +178,26 @@ func NewTaskWorker(ctx context.Context, cfg WorkerConfig, db *gorm.DB, rabbitCfg
// Start begins consuming tasks from the queue // Start begins consuming tasks from the queue
func (w *TaskWorker) Start() error { func (w *TaskWorker) Start() error {
logger.Info(w.ctx, "Starting task worker", logger.Info(w.ctx, "starting task worker",
"pool_size", w.cfg.PoolSize, "pool_size", w.cfg.PoolSize,
"queue_consumers", w.cfg.QueueConsumerCount, "queue_consumers", w.cfg.QueueConsumerCount,
) )
// Start multiple consumers for better throughput // Start multiple consumers for better throughput
for i := 0; i < w.cfg.QueueConsumerCount; i++ { for i := 0; i < w.cfg.QueueConsumerCount; i++ {
w.wg.Add(1) w.wg.Go(func() { w.consumerLoop(i) })
go w.consumerLoop(i)
} }
// Start health check goroutine // Start health check goroutine
w.wg.Add(1) w.wg.Go(w.healthCheckLoop)
go w.healthCheckLoop()
logger.Info(w.ctx, "Task worker started successfully") logger.Info(w.ctx, "task worker started successfully")
return nil return nil
} }
// consumerLoop runs a single RabbitMQ consumer // consumerLoop runs a single RabbitMQ consumer
func (w *TaskWorker) consumerLoop(consumerID int) { func (w *TaskWorker) consumerLoop(consumerID int) {
defer w.wg.Done() logger.Info(w.ctx, "starting consumer", "consumer_id", consumerID)
logger.Info(w.ctx, "Starting consumer", "consumer_id", consumerID)
// Consume messages from the queue // Consume messages from the queue
msgs, err := w.ch.Consume( msgs, err := w.ch.Consume(
@ -214,7 +210,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
nil, // args nil, // args
) )
if err != nil { if err != nil {
logger.Error(w.ctx, "Failed to start consumer", logger.Error(w.ctx, "failed to start consumer",
"consumer_id", consumerID, "consumer_id", consumerID,
"error", err, "error", err,
) )
@ -224,11 +220,11 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
for { for {
select { select {
case <-w.stopChan: case <-w.stopChan:
logger.Info(w.ctx, "Consumer stopping", "consumer_id", consumerID) logger.Info(w.ctx, "consumer stopping", "consumer_id", consumerID)
return return
case msg, ok := <-msgs: case msg, ok := <-msgs:
if !ok { if !ok {
logger.Warn(w.ctx, "Consumer channel closed", "consumer_id", consumerID) logger.Warn(w.ctx, "consumer channel closed", "consumer_id", consumerID)
return return
} }
@ -237,7 +233,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
w.handleMessage(msg) w.handleMessage(msg)
}) })
if err != nil { if err != nil {
logger.Error(w.ctx, "Failed to submit task to pool", logger.Error(w.ctx, "failed to submit task to pool",
"consumer_id", consumerID, "consumer_id", consumerID,
"error", err, "error", err,
) )
@ -265,7 +261,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Parse task message // Parse task message
var taskMsg TaskQueueMessage var taskMsg TaskQueueMessage
if err := json.Unmarshal(msg.Body, &taskMsg); err != nil { if err := json.Unmarshal(msg.Body, &taskMsg); err != nil {
logger.Error(ctx, "Failed to unmarshal task message", "error", err) logger.Error(ctx, "failed to unmarshal task message", "error", err)
msg.Nack(false, false) // Reject without requeue msg.Nack(false, false) // Reject without requeue
w.metrics.mu.Lock() w.metrics.mu.Lock()
w.metrics.TotalFailed++ w.metrics.TotalFailed++
@ -275,7 +271,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Validate message // Validate message
if !taskMsg.Validate() { if !taskMsg.Validate() {
logger.Error(ctx, "Invalid task message", logger.Error(ctx, "invalid task message",
"task_id", taskMsg.TaskID, "task_id", taskMsg.TaskID,
"task_type", taskMsg.TaskType, "task_type", taskMsg.TaskType,
) )
@ -299,7 +295,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
defer span.End() defer span.End()
ctx = taskCtx ctx = taskCtx
logger.Info(ctx, "Processing task", logger.Info(ctx, "processing task",
"task_id", taskMsg.TaskID, "task_id", taskMsg.TaskID,
"task_type", taskMsg.TaskType, "task_type", taskMsg.TaskType,
"priority", taskMsg.Priority, "priority", taskMsg.Priority,
@ -307,7 +303,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Update task status to RUNNING in database // Update task status to RUNNING in database
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusRunning); err != nil { if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusRunning); err != nil {
logger.Error(ctx, "Failed to update task status", "error", err) logger.Error(ctx, "failed to update task status", "error", err)
msg.Nack(false, true) // Reject with requeue msg.Nack(false, true) // Reject with requeue
w.metrics.mu.Lock() w.metrics.mu.Lock()
w.metrics.TotalFailed++ w.metrics.TotalFailed++
@ -326,7 +322,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
processingTime := time.Since(startTime) processingTime := time.Since(startTime)
if err != nil { if err != nil {
logger.Error(ctx, "Task execution failed", logger.Error(ctx, "task execution failed",
"task_id", taskMsg.TaskID, "task_id", taskMsg.TaskID,
"task_type", taskMsg.TaskType, "task_type", taskMsg.TaskType,
"processing_time", processingTime, "processing_time", processingTime,
@ -335,7 +331,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Update task status to FAILED // Update task status to FAILED
if updateErr := w.updateTaskWithError(ctx, taskMsg.TaskID, err); updateErr != nil { if updateErr := w.updateTaskWithError(ctx, taskMsg.TaskID, err); updateErr != nil {
logger.Error(ctx, "Failed to update task with error", "error", updateErr) logger.Error(ctx, "failed to update task with error", "error", updateErr)
} }
if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil { if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil {
@ -353,7 +349,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Update task status to COMPLETED // Update task status to COMPLETED
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusCompleted); err != nil { if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusCompleted); err != nil {
logger.Error(ctx, "Failed to update task status to completed", "error", err) logger.Error(ctx, "failed to update task status to completed", "error", err)
// Still ack the message since task was processed successfully // Still ack the message since task was processed successfully
} }
@ -364,7 +360,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Acknowledge message // Acknowledge message
msg.Ack(false) msg.Ack(false)
logger.Info(ctx, "Task completed successfully", logger.Info(ctx, "task completed successfully",
"task_id", taskMsg.TaskID, "task_id", taskMsg.TaskID,
"task_type", taskMsg.TaskType, "task_type", taskMsg.TaskType,
"processing_time", processingTime, "processing_time", processingTime,
@ -398,7 +394,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
// Update task status in database // Update task status in database
err := database.UpdateAsyncTaskStatus(ctx, w.db, taskID, ormStatus) err := database.UpdateAsyncTaskStatus(ctx, w.db, taskID, ormStatus)
if err != nil { if err != nil {
logger.Error(ctx, "Failed to update task status in database", logger.Error(ctx, "failed to update task status in database",
"task_id", taskID, "task_id", taskID,
"status", status, "status", status,
"error", err, "error", err,
@ -410,7 +406,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
if status == StatusRunning { if status == StatusRunning {
startedAt := time.Now().Unix() startedAt := time.Now().Unix()
if err := database.UpdateTaskStarted(ctx, w.db, taskID, startedAt); err != nil { if err := database.UpdateTaskStarted(ctx, w.db, taskID, startedAt); err != nil {
logger.Warn(ctx, "Failed to update task start time", logger.Warn(ctx, "failed to update task start time",
"task_id", taskID, "task_id", taskID,
"error", err, "error", err,
) )
@ -423,14 +419,14 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
finishedAt := time.Now().Unix() finishedAt := time.Now().Unix()
if status == StatusCompleted { if status == StatusCompleted {
if err := database.CompleteAsyncTask(ctx, w.db, taskID, finishedAt); err != nil { if err := database.CompleteAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
logger.Warn(ctx, "Failed to mark task as completed", logger.Warn(ctx, "failed to mark task as completed",
"task_id", taskID, "task_id", taskID,
"error", err, "error", err,
) )
} }
} else { } else {
if err := database.FailAsyncTask(ctx, w.db, taskID, finishedAt); err != nil { if err := database.FailAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
logger.Warn(ctx, "Failed to mark task as failed", logger.Warn(ctx, "failed to mark task as failed",
"task_id", taskID, "task_id", taskID,
"error", err, "error", err,
) )
@ -438,7 +434,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
} }
} }
logger.Debug(ctx, "Task status updated", logger.Debug(ctx, "task status updated",
"task_id", taskID, "task_id", taskID,
"status", status, "status", status,
) )
@ -451,16 +447,16 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID,
stackTrace := fmt.Sprintf("%+v", err) stackTrace := fmt.Sprintf("%+v", err)
if updateErr := database.UpdateTaskErrorInfo(ctx, w.db, taskID, errorMsg, stackTrace); updateErr != nil { if updateErr := database.UpdateTaskErrorInfo(ctx, w.db, taskID, errorMsg, stackTrace); updateErr != nil {
logger.Error(ctx, "Failed to update task error info", "task_id", taskID, "error", updateErr) logger.Error(ctx, "failed to update task error info", "task_id", taskID, "error", updateErr)
return updateErr return updateErr
} }
if updateErr := database.UpdateAsyncTaskResultWithError(ctx, w.db, taskID, 500, errorMsg, nil); updateErr != nil { if updateErr := database.UpdateAsyncTaskResultWithError(ctx, w.db, taskID, 500, errorMsg, nil); updateErr != nil {
logger.Error(ctx, "Failed to update task result with error", "task_id", taskID, "error", updateErr) logger.Error(ctx, "failed to update task result with error", "task_id", taskID, "error", updateErr)
return updateErr return updateErr
} }
logger.Warn(ctx, "Task failed with error", "task_id", taskID, "error", errorMsg) logger.Warn(ctx, "task failed with error", "task_id", taskID, "error", errorMsg)
return nil return nil
} }
@ -469,7 +465,7 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID,
func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uuid.UUID, params map[string]any, msg *amqp.Delivery) error { func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uuid.UUID, params map[string]any, msg *amqp.Delivery) error {
handler, err := w.factory.GetHandler(taskType) handler, err := w.factory.GetHandler(taskType)
if err != nil { if err != nil {
logger.Error(ctx, "No handler for task type", "task_type", taskType) logger.Error(ctx, "no handler for task type", "task_type", taskType)
msg.Nack(false, false) msg.Nack(false, false)
return err return err
} }
@ -478,8 +474,6 @@ func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uui
// healthCheckLoop periodically checks worker health and metrics // healthCheckLoop periodically checks worker health and metrics
func (w *TaskWorker) healthCheckLoop() { func (w *TaskWorker) healthCheckLoop() {
defer w.wg.Done()
ticker := time.NewTicker(w.cfg.PollingInterval) ticker := time.NewTicker(w.cfg.PollingInterval)
defer ticker.Stop() defer ticker.Stop()
@ -519,7 +513,7 @@ func (w *TaskWorker) checkHealth() {
w.metrics.WorkersIdle = w.pool.Free() w.metrics.WorkersIdle = w.pool.Free()
w.metrics.LastHealthCheck = time.Now() w.metrics.LastHealthCheck = time.Now()
logger.Info(w.ctx, "Worker health check", logger.Info(w.ctx, "worker health check",
"tasks_processed", w.metrics.TotalProcessed, "tasks_processed", w.metrics.TotalProcessed,
"tasks_failed", w.metrics.TotalFailed, "tasks_failed", w.metrics.TotalFailed,
"tasks_success", w.metrics.TotalSuccess, "tasks_success", w.metrics.TotalSuccess,
@ -536,7 +530,7 @@ func (w *TaskWorker) checkHealth() {
// Stop gracefully stops the task worker // Stop gracefully stops the task worker
func (w *TaskWorker) Stop() error { func (w *TaskWorker) Stop() error {
logger.Info(w.ctx, "Stopping task worker") logger.Info(w.ctx, "stopping task worker")
// Signal all goroutines to stop // Signal all goroutines to stop
close(w.stopChan) close(w.stopChan)

View File

@ -7,15 +7,22 @@ import (
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
// MapSlice define func to build a new slice by applying f to every element of s.
func MapSlice[T, U any](s []T, f func(T) U) []U {
result := make([]U, 0, len(s))
for _, item := range s {
result = append(result, f(item))
}
return result
}
// ConvertZSetMembersToFloat64 define func to conver zset member type to float64 // ConvertZSetMembersToFloat64 define func to conver zset member type to float64
func ConvertZSetMembersToFloat64(members []redis.Z) []float64 { func ConvertZSetMembersToFloat64(members []redis.Z) []float64 {
dataFloats := make([]float64, 0, len(members))
// recovery time sorted in ascending order // recovery time sorted in ascending order
sortRedisZByTimeMemberAscending(members) sortRedisZByTimeMemberAscending(members)
for _, member := range members { return MapSlice(members, func(member redis.Z) float64 {
dataFloats = append(dataFloats, member.Score) return member.Score
} })
return dataFloats
} }
func sortRedisZByTimeMemberAscending(data []redis.Z) { func sortRedisZByTimeMemberAscending(data []redis.Z) {

View File

@ -1,11 +1,13 @@
// Package util provide some utility functions // Package util provide some utility functions
package util package util
// GetKeysFromSet define func to get all keys from a map[string]struct{} import (
func GetKeysFromSet(set map[string]struct{}) []string { "maps"
keys := make([]string, 0, len(set)) "slices"
for key := range set { )
keys = append(keys, key)
} // GetKeysFromSet define func to get all keys from a set-like map.
return keys // It delegates to the standard library maps/slices helpers.
func GetKeysFromSet[K comparable, V any](set map[K]V) []K {
return slices.Collect(maps.Keys(set))
} }

View File

@ -1,12 +1,9 @@
// Package util provide some utility functions // Package util provide some utility functions
package util package util
// RemoveTargetsFromSliceSimple define func to remove targets from a slice of strings // RemoveTargetsFromSliceSimple define func to remove targets from a slice
func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []string) []string { func RemoveTargetsFromSliceSimple[T comparable](targetsSlice []T, targetsToRemove []T) []T {
targetsToRemoveSet := make(map[string]struct{}, len(targetsToRemove)) targetsToRemoveSet := SliceToSet(targetsToRemove)
for _, target := range targetsToRemove {
targetsToRemoveSet[target] = struct{}{}
}
for i := len(targetsSlice) - 1; i >= 0; i-- { for i := len(targetsSlice) - 1; i >= 0; i-- {
if _, found := targetsToRemoveSet[targetsSlice[i]]; found { if _, found := targetsToRemoveSet[targetsSlice[i]]; found {
@ -17,21 +14,21 @@ func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []strin
return targetsSlice return targetsSlice
} }
// SliceToSet define func to convert string slice to set // SliceToSet define func to convert a slice to a set
func SliceToSet(targetsSlice []string) map[string]struct{} { func SliceToSet[T comparable](targetsSlice []T) map[T]struct{} {
set := make(map[string]struct{}, len(targetsSlice)) set := make(map[T]struct{}, len(targetsSlice))
for _, target := range targetsSlice { for _, target := range targetsSlice {
set[target] = struct{}{} set[target] = struct{}{}
} }
return set return set
} }
// DeduplicateAndReportDuplicates define func to deduplicate a slice of strings and report duplicates // DeduplicateAndReportDuplicates define func to deduplicate a slice and report duplicates
func DeduplicateAndReportDuplicates(targetsSlice []string, sourceSlice []string) (deduplicated []string, duplicates []string) { func DeduplicateAndReportDuplicates[T comparable](targetsSlice []T, sourceSlice []T) (deduplicated []T, duplicates []T) {
targetSet := SliceToSet(targetsSlice) targetSet := SliceToSet(targetsSlice)
deduplicated = make([]string, 0, len(sourceSlice)) deduplicated = make([]T, 0, len(sourceSlice))
// duplicate items slice // duplicate items slice
duplicates = make([]string, 0, len(sourceSlice)) duplicates = make([]T, 0, len(sourceSlice))
for _, source := range sourceSlice { for _, source := range sourceSlice {
if _, found := targetSet[source]; found { if _, found := targetSet[source]; found {

84
util/typed_map.go Normal file
View File

@ -0,0 +1,84 @@
// Package util provide some utility functions
package util
import (
"iter"
"sync"
)
// TypedMap define a type-safe generic wrapper around sync.Map.
// It keeps the concurrency guarantees of sync.Map while removing the
// per-call-site type assertions previously required for every Load.
type TypedMap[K comparable, V any] struct {
m sync.Map
}
// Load define func of return the value stored for key, and whether it was found.
func (t *TypedMap[K, V]) Load(key K) (V, bool) {
value, ok := t.m.Load(key)
if !ok {
var zero V
return zero, false
}
// safe: only values of type V are ever stored through this wrapper
return value.(V), true
}
// Store define func of set the value for key.
func (t *TypedMap[K, V]) Store(key K, value V) {
t.m.Store(key, value)
}
// LoadOrStore define func of return the existing value for key if present.
// Otherwise it stores and returns the given value. loaded reports whether the
// value was already present.
func (t *TypedMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
v, loaded := t.m.LoadOrStore(key, value)
return v.(V), loaded
}
// Swap define func of store value for key and return the previous value (if any).
// loaded reports whether a value was already present for key.
func (t *TypedMap[K, V]) Swap(key K, value V) (previous V, loaded bool) {
prev, loaded := t.m.Swap(key, value)
if !loaded {
var zero V
return zero, false
}
return prev.(V), true
}
// Delete define func of remove the value for key.
func (t *TypedMap[K, V]) Delete(key K) {
t.m.Delete(key)
}
// Range define func of iterate over all key/value pairs.
// Iteration stops early if f returns false.
func (t *TypedMap[K, V]) Range(f func(key K, value V) bool) {
t.m.Range(func(key, value any) bool {
return f(key.(K), value.(V))
})
}
// Len define func of return the number of key/value pairs currently stored.
// It walks the map, so the cost is O(n).
func (t *TypedMap[K, V]) Len() int {
count := 0
t.m.Range(func(_, _ any) bool {
count++
return true
})
return count
}
// All define func of return an iterator over all key/value pairs,
// usable directly with range-over-func. Iteration stops early if the
// consumer breaks out of the loop.
func (t *TypedMap[K, V]) All() iter.Seq2[K, V] {
return func(yield func(K, V) bool) {
t.m.Range(func(key, value any) bool {
return yield(key.(K), value.(V))
})
}
}