Compare commits

...

10 Commits

Author SHA1 Message Date
douxu ca68cf6c18 refactor: extend TypedMap and migrate MeasComputeState onto it
- add LoadOrStore, Len, and All (range-over-func) to util.TypedMap
  - embed util.TypedMap in MeasComputeState, dropping its hand-written
    sync.Map wrappers and per-call-site type assertions
  - iterate graphOverview via All() instead of Range in PrintGrapMap
  - remove unused Set/Comparer/OrderedSet/HashSet code from redis_zset.go
  - update deploy.md to replace Promtail with Grafana Alloy in the
    observability stack
2026-06-18 16:06:06 +08:00
douxu c82ad773a3 refactor: lowercase channel name suffixes and rename PS to PF
- change all ChannelSuffix values from uppercase to lowercase
  - rename ChannelSuffixPS ("PS") to ChannelSuffixPF ("pf")
  - align channel suffix naming with downstream measurement keys
2026-06-17 10:47:35 +08:00
douxu 82622d0d85 refactor: add generic helpers and type-safe TypedMap wrapper
- add util.TypedMap, a generic wrapper over sync.Map to drop call-site type assertions
  - add generic util.MapSlice and reuse it in ConvertZSetMembersToFloat64
  - make GetKeysFromSet/SliceToSet/RemoveTargetsFromSliceSimple/DeduplicateAndReportDuplicates generic
  - migrate graphOverview to util.TypedMap[int64, *Graph]
  - build redis suggestions via util.MapSlice in measurement group recommend
2026-06-16 16:15:28 +08:00
douxu 908c713565 chore: add rabbitmq cert secret script and plugins configmap
- add rabbitmq-certs-secret.sh helper to create the server cert secret
  - add rabbitmq-plugins-config.yaml ConfigMap enabling ssl auth, management,
    prometheus, and web dispatch plugins
  - rename rabbitmq Deployment from `eventrt-rabbitmq` to `rabbitmq`
  - document the secret-creation script in deploy.md
2026-06-12 11:20:58 +08:00
douxu 64b6562784 docs: overhaul deploy.md cleanup and pg verification sections
- add pg connection verification commands (pg_isready, psql queries)
  - renumber pg subsections (4.4.2→4.4.5) to accommodate new section
  - remove MongoDB deploy section (section 4.5) from modelRT deploy guide
  - remove MongoDB SSH tunnel port-forward entries (27017/30017)
  - rewrite section 8 cleanup guide: split into local Docker, local run,
    and K8s(Minikube) categories with scale-down and full-delete options
  - add one-liner kubectl delete -f deploy/k8s/ for full teardown
2026-06-10 16:42:29 +08:00
douxu 05c64dda14 chore: add imagePullPolicy and migrate WaitGroup to wg.Go
- add imagePullPolicy: IfNotPresent to all k8s Deployments, DaemonSet
    (grafana, jaeger, loki, rabbitmq, redis, promtail)
  - migrate wg.Add(1)/go/defer wg.Done() pattern to wg.Go() (Go 1.25+)
    in logger/loki_syncer.go and task/worker.go
  - simplify redundant map existence check before delete in diagram/graph.go
  - update deploy.md to reflect pg PVC size (6Gi) and resource limits
2026-06-10 16:40:50 +08:00
douxu c4e892f1c7 fix: correct typo in Jaeger K8s service name
- rename `jaeger-serivce` to `jaeger-service` in jaeger-service.yaml
2026-06-08 17:05:21 +08:00
douxu 195150d9b1 fix: fix K8s service names, deployment command, and GORM logger
- rename all K8s services to xxx-service convention and update
    all configmap references (postgres, mongodb, loki, jaeger)
  - add explicit command: ["/app/modelrt"] to deployment to prevent
    args from being treated as the executable (no ENTRYPOINT in
    Dockerfile)
  - set deploy_env to development to bypass Redis empty-password
    guard in non-production Minikube environment
  - fix GormLogger Info/Warn/Error to use fmt.Sprintf(msg, data...)
    so GORM printf-style messages are formatted correctly and avoid
    json: unsupported type: func() time.Time serialization panic
  - expand pg PVC storage from 2Gi to 6Gi
  - rename loop variable msg to task in PushTaskToRabbitMQ for clarity
  - align comment indentation in queue_producer.go
2026-06-03 17:11:54 +08:00
douxu 3309e53653 docs: document Dockerfile smoke tests and load workflow for Minikube
- add 3-stage build table (builder/certs/scratch) with image size note
  - add build-arg USER_ID override example in section 5.1
  - add section 5.1.1 with smoke-test commands (size check, inspect, dry
    run, full start)
  - add workflow for loading pre-built local images into Minikube
    directly
  - bump builder base image from golang:1.25-alpine to
    golang:1.26-alpine
  - normalize inline Dockerfile comments to lowercase
  - remove example config COPY from final scratch stage
2026-06-02 16:35:13 +08:00
douxu c6545e29ba style: normalize log messages to lowercase across task package
- lowercase first letter of all logger.Info/Warn/Error message strings
    in task/worker.go, task/retry_queue.go, task/handler_factory.go,
    task/metrics_logger.go, task/retry_manager.go, task/queue_producer.go,
    task/initializer.go, task/test_task.go, and main.go
  - fix inline comments in main.go that mixed Chinese and uppercase English
  - align Dockerfile comment casing with project convention
2026-06-01 15:50:11 +08:00
47 changed files with 555 additions and 419 deletions

1
.gitignore vendored
View File

@ -32,6 +32,7 @@ go.work
# ai config
.cursor/
.claude/
.codewhale/
.cursorrules
.copilot/
.chatgpt/

View File

@ -19,15 +19,15 @@ const (
// channel name suffix
const (
ChannelSuffixP = "P"
ChannelSuffixQ = "Q"
ChannelSuffixS = "S"
ChannelSuffixPS = "PS"
ChannelSuffixF = "F"
ChannelSuffixDeltaF = "deltaF"
ChannelSuffixUAB = "UAB"
ChannelSuffixUBC = "UBC"
ChannelSuffixUCA = "UCA"
ChannelSuffixP = "p"
ChannelSuffixQ = "q"
ChannelSuffixS = "s"
ChannelSuffixPF = "pf"
ChannelSuffixF = "f"
ChannelSuffixDeltaF = "df"
ChannelSuffixUAB = "uab"
ChannelSuffixUBC = "ubc"
ChannelSuffixUCA = "uca"
)
const (

View File

@ -640,6 +640,12 @@ openssl x509 -in eventrt_client_cert.pem -noout -subject
将服务器端三个证书文件打包为 K8s Secret在证书文件所在目录执行
```bash
sh deploy/k8s/rabbitmq-certs-secret.sh
```
该脚本等价于:
```bash
kubectl create secret generic rabbitmq-certs \
--from-file=ca_certificate.pem=./ca_certificate.pem \
@ -695,7 +701,11 @@ kubectl apply -f deploy/k8s/pg-service.yaml
| **数据库** | `demo` | ConfigMap 中 `POSTGRES_DB` |
| **用户名** | `postgres` | ConfigMap 中 `POSTGRES_USER` |
| **密码** | `coslight` | ConfigMap `postgres-config` 中配置,生产环境迁移至 Secret |
| **存储** | `2Gi` | PVC `postgres-data` |
| **存储** | `6Gi` | PVC `postgres-data` |
| **CPU** | `100m` 请求 / `500m` 上限 | StatefulSet `resources` 字段 |
| **内存** | `256Mi` 请求 / `512Mi` 上限 | StatefulSet `resources` 字段 |
> **注意:** 密码当前以明文形式存储在 `pg-configmap.yaml` 中,生产环境应将其迁移至 K8s Secret并通过环境变量注入容器避免将明文密码提交至版本库。
##### 4.4.1 等待 Pod 就绪
@ -703,7 +713,23 @@ kubectl apply -f deploy/k8s/pg-service.yaml
kubectl wait --for=condition=ready pod -l app=postgres --timeout=120s
```
##### 4.4.2 初始化异步任务表
##### 4.4.2 连接验证
```bash
# 快速检查 PostgreSQL 是否接受连接
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
-- pg_isready -U postgres -d demo
# 进入 psql 执行简单查询确认数据库可用
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
-- psql -U postgres -d demo -c "SELECT current_database(), version();"
# 列出所有数据库(确认 demo 库已创建)
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
-- psql -U postgres -c "\l"
```
##### 4.4.3 初始化异步任务表
PostgreSQL 就绪后执行 1.4 节的建表 SQL可通过以下方式进入容器执行
@ -717,14 +743,14 @@ kubectl exec -i $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metada
-- psql -U postgres -d demo < /path/to/init.sql
```
##### 4.4.3 状态检查
##### 4.4.4 状态检查
```bash
kubectl get pods -l app=postgres
kubectl logs -l app=postgres --tail=30
```
##### 4.4.4 清理
##### 4.4.5 清理
```bash
kubectl delete -f deploy/k8s/pg-service.yaml \
@ -733,68 +759,73 @@ kubectl delete -f deploy/k8s/pg-service.yaml \
-f deploy/k8s/pg-configmap.yaml
```
#### 4.5 部署 MongoDB
```bash
kubectl apply -f deploy/k8s/mongodb-secret.yaml
kubectl apply -f deploy/k8s/mongodb-pvc.yaml
kubectl apply -f deploy/k8s/mongodb-statefulset.yaml
kubectl apply -f deploy/k8s/mongodb-service.yaml
```
| 参数 | 值 | 说明 |
| :--- | :--- | :--- |
| **镜像** | `mongo:7.0` | MongoDB 7.0 |
| **NodePort** | `30017` | 集群外访问端口 |
| **用户名** | `admin` | Root 管理员 |
| **密码** | `coslight` | Secret `mongodb-secret` 中配置,生产环境请替换强密码 |
| **存储** | `2Gi` | PVC `mongodb-data` |
> **注意:** 密码存储在 `mongodb-secret.yaml``stringData` 中,生产环境应替换为强密码,并避免将明文密码提交至版本库。
##### 4.5.1 等待 Pod 就绪
```bash
kubectl wait --for=condition=ready pod -l app=mongodb --timeout=120s
```
##### 4.5.2 连接验证
```bash
kubectl exec -it $(kubectl get pod -l app=mongodb -o jsonpath='{.items[0].metadata.name}') \
-- mongosh -u admin -p coslight --authenticationDatabase admin
```
##### 4.5.3 状态检查
```bash
kubectl get pods -l app=mongodb
kubectl logs -l app=mongodb --tail=30
```
##### 4.5.4 清理
```bash
kubectl delete -f deploy/k8s/mongodb-service.yaml \
-f deploy/k8s/mongodb-statefulset.yaml \
-f deploy/k8s/mongodb-pvc.yaml \
-f deploy/k8s/mongodb-secret.yaml
```
### 5\. 部署 ModelRTKubernetes
所有资源部署在 `default` 命名空间YAML 文件位于 `deploy/k8s/`
#### 5.1 构建并推送镜像
镜像采用三阶段构建,最终基于 `scratch`
| 阶段 | 基础镜像 | 作用 |
| :--- | :--- | :--- |
| **builder** | `golang:1.26-alpine` | 编译 Go 二进制(`CGO_ENABLED=0``-trimpath -ldflags="-s -w"` |
| **certs** | `alpine:3.21` | 提取 CA 证书、时区数据及非 root 用户定义UID 默认 `1000` |
| **runtime** | `scratch` | 仅含可执行文件与运行时依赖,无 shell、无包管理器 |
**方式一:从源码构建并加载**
```bash
# 在项目根目录执行
# 在项目根目录执行(默认运行用户 UID=1000
docker build -f deploy/dockerfile/modelrt.Dockerfile -t coslight/modelrt:latest .
# 推送到镜像仓库(或直接加载到 Minikube
# 自定义运行用户 UID
docker build -f deploy/dockerfile/modelrt.Dockerfile \
--build-arg USER_ID=2000 \
-t coslight/modelrt:latest .
# 加载到 Minikube无需私有仓库
minikube image load coslight/modelrt:latest
```
**方式二:直接加载已有本地镜像**
Ubuntu 宿主机上已存在构建好的镜像(如 `modelrt:v1`)时,无需重新构建,直接导入 Minikube
```bash
# 确认本地镜像存在
docker images modelrt:v1
# 加载到 Minikube
minikube image load modelrt:v1
# 验证镜像已进入 Minikube 缓存
minikube image ls | grep modelrt
```
> **注意:** `deploy/k8s/modelrt-deployment.yaml` 中的 `image` 字段需与加载的镜像名称一致,并将 `imagePullPolicy` 设为 `Never`,防止 Minikube 尝试从远端拉取。
#### 5.1.1 镜像冒烟测试
```bash
# 查看镜像大小scratch 镜像预期 ≤ 25 MB
docker images coslight/modelrt:latest
# 检查镜像元信息(确认 User、Cmd、架构
docker inspect coslight/modelrt:latest
# 验证二进制可执行(无 config 时程序报错退出属预期行为,说明镜像构建正常)
docker run --rm coslight/modelrt:latest
# 挂载示例配置做完整启动验证Ctrl+C 退出)
docker run --rm \
-v "$(pwd)/configs/config.example.yaml:/app/configs/config.yaml" \
-p 8080:8080 \
coslight/modelrt:latest
```
> **注意:** `scratch` 镜像不含 shell无法使用 `docker exec` 进入容器调试;如需排查问题,可临时将最终阶段改为 `alpine` 进行本地调试,确认后再切回 `scratch`
#### 5.2 创建客户端证书 Secret
在 RabbitMQ TLS 证书生成完成后(见 4.2),进入证书文件所在目录执行:
@ -864,7 +895,9 @@ kubectl delete secret modelrt-certs
### 6\. 部署可观测性栈Kubernetes
`Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Promtail + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`
`Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Alloy + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`
> **日志采集器说明:** 集群内的日志采集由 `Grafana Alloy`DaemonSet负责它通过 Kubernetes API 抓取带 `app` label 的 Pod 容器日志,解析 `zap` 输出的 JSON 字段后推送到 `Loki`。Alloy 已**替代**早期的 `Promtail`,两者推送目标(`loki-service:3100`)与标签解析完全一致,**不要同时部署**,否则会导致 Loki 中日志翻倍。
#### 6.1 部署 Jaeger
@ -882,14 +915,16 @@ kubectl apply -f deploy/k8s/loki-deployment.yaml
kubectl apply -f deploy/k8s/loki-service.yaml
```
#### 6.3 部署 Promtail
#### 6.3 部署 Alloy
```bash
kubectl apply -f deploy/k8s/promtail-rbac.yaml
kubectl apply -f deploy/k8s/promtail-configmap.yaml
kubectl apply -f deploy/k8s/promtail-daemonset.yaml
kubectl apply -f deploy/k8s/alloy-rbac.yaml
kubectl apply -f deploy/k8s/alloy-configmap.yaml
kubectl apply -f deploy/k8s/alloy-daemonset.yaml
```
> Alloy 以 DaemonSet 形式在每个节点运行,需要 `ServiceAccount` + `ClusterRole``alloy-rbac.yaml`)授予读取 `nodes/pods/pods/log` 的权限。采集与解析规则定义在 `alloy-configmap.yaml``config.alloy` 中。
#### 6.4 部署 Grafana
```bash
@ -907,9 +942,9 @@ kubectl apply -f deploy/k8s/jaeger-deployment.yaml \
-f deploy/k8s/loki-pvc.yaml \
-f deploy/k8s/loki-deployment.yaml \
-f deploy/k8s/loki-service.yaml \
-f deploy/k8s/promtail-rbac.yaml \
-f deploy/k8s/promtail-configmap.yaml \
-f deploy/k8s/promtail-daemonset.yaml \
-f deploy/k8s/alloy-rbac.yaml \
-f deploy/k8s/alloy-configmap.yaml \
-f deploy/k8s/alloy-daemonset.yaml \
-f deploy/k8s/grafana-configmap.yaml \
-f deploy/k8s/grafana-deployment.yaml \
-f deploy/k8s/grafana-service.yaml
@ -955,7 +990,6 @@ Mac 本地端口 ──SSH隧道──▶ Ubuntu 宿主机 (192.168.1.101)
```bash
ssh -L 5432:192.168.49.2:30432 \
-L 27017:192.168.49.2:30017 \
-L 5671:192.168.49.2:30671 \
-L 15671:192.168.49.2:31671 \
-L 6379:192.168.49.2:30001 \
@ -971,7 +1005,6 @@ ssh -L 5432:192.168.49.2:30432 \
```bash
ssh -fN \
-L 5432:192.168.49.2:30432 \
-L 27017:192.168.49.2:30017 \
-L 5671:192.168.49.2:30671 \
-L 15671:192.168.49.2:31671 \
-L 6379:192.168.49.2:30001 \
@ -987,7 +1020,6 @@ ssh -fN \
| Mac 本地端口 | Minikube NodePort | 服务 | 说明 |
| :--- | :--- | :--- | :--- |
| `5432` | `30432` | PostgreSQL | 数据库连接 `localhost:5432` |
| `27017` | `30017` | MongoDB | 数据库连接 `localhost:27017` |
| `5671` | `30671` | RabbitMQ AMQP | ModelRT / EventRT 消息队列连接 |
| `15671` | `31671` | RabbitMQ Management | RabbitMQ 管理界面 `http://localhost:15671` |
| `6379` | `30001` | Redis | 分布式锁 / 数据存储 |
@ -1011,14 +1043,111 @@ kill <PID>
### 8\. 后续操作(停止与清理)
#### 8.1 停止容器
#### 8.1 本地 Docker 部署清理
适用于第 1、2 节使用 `docker run` 启动的 PostgreSQL 和 Redis 容器。
```bash
# 停止容器
docker stop postgres redis
```
#### 8.2 删除容器(删除后数据将丢失)
```bash
# 删除容器(容器内数据将同步丢失)
docker rm postgres redis
```
#### 8.2 本地运行清理
适用于第 3 节以 `go run` 或编译后二进制方式在本地启动的 ModelRT 服务。
前台运行时直接 `Ctrl+C` 终止;后台运行时查找并终止进程:
```bash
# 终止 go run 启动的进程
pkill -f "go run main.go"
# 或终止编译后的二进制进程
pkill model-rt
```
#### 8.3 K8s(Minikube) 部署清理
适用于第 4、5、6 节在 Minikube 中部署的所有资源。
##### 8.3.1 分服务清理
**仅停止(缩容至 0PVC 数据保留)**
将所有 Deployment 和 StatefulSet 缩容至 0 副本Pod 停止运行但持久卷数据不删除,之后可直接缩容回 1 恢复服务。
```bash
# 停止所有 DeploymentRedis / RabbitMQ / ModelRT / Jaeger / Loki / Grafana
kubectl scale deployment --all --replicas=0
# 停止所有 StatefulSetPostgreSQLPVC 数据保留)
kubectl scale statefulset --all --replicas=0
```
恢复时:
```bash
kubectl scale deployment --all --replicas=1
kubectl scale statefulset --all --replicas=1
```
> **注意:** DaemonSetAlloy无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/alloy-daemonset.yaml`。
---
**永久清理(删除所有资源,包含 PVC数据不可恢复**
按部署顺序反向删除各服务资源:
```bash
# 可观测性栈Grafana / Alloy / Loki / Jaeger
kubectl delete -f deploy/k8s/grafana-service.yaml \
-f deploy/k8s/grafana-deployment.yaml \
-f deploy/k8s/grafana-configmap.yaml \
-f deploy/k8s/alloy-daemonset.yaml \
-f deploy/k8s/alloy-configmap.yaml \
-f deploy/k8s/alloy-rbac.yaml \
-f deploy/k8s/loki-service.yaml \
-f deploy/k8s/loki-deployment.yaml \
-f deploy/k8s/loki-pvc.yaml \
-f deploy/k8s/loki-configmap.yaml \
-f deploy/k8s/jaeger-service.yaml \
-f deploy/k8s/jaeger-deployment.yaml
# ModelRT 应用
kubectl delete -f deploy/k8s/modelrt-service.yaml \
-f deploy/k8s/modelrt-deployment.yaml \
-f deploy/k8s/modelrt-configmap.yaml \
-f deploy/k8s/modelrt-secret.yaml
kubectl delete secret modelrt-certs
# PostgreSQL
kubectl delete -f deploy/k8s/pg-service.yaml \
-f deploy/k8s/pg-statefulset.yaml \
-f deploy/k8s/pg-pvc.yaml \
-f deploy/k8s/pg-configmap.yaml
# RabbitMQ
kubectl delete -f deploy/k8s/rabbitmq-service.yaml \
-f deploy/k8s/rabbitmq-deployment.yaml \
-f deploy/k8s/rabbitmq-users-config.yaml \
-f deploy/k8s/rabbitmq-config.yaml \
-f deploy/k8s/rabbitmq-secret.yaml
kubectl delete secret rabbitmq-certs
# Redis
kubectl delete -f deploy/k8s/redis-service.yaml \
-f deploy/k8s/redis-deployment.yaml
```
##### 8.3.2 一键清理
> **注意:** 此操作会删除 `deploy/k8s/` 下所有 YAML 对应的 K8s 资源,包括 PVC**持久化数据将永久丢失**,请确认后执行。
```bash
kubectl delete -f deploy/k8s/
kubectl delete secret rabbitmq-certs modelrt-certs
```

View File

@ -1,4 +1,4 @@
FROM golang:1.25-alpine AS builder
FROM golang:1.26-alpine AS builder
RUN apk --no-cache upgrade
WORKDIR /app
@ -11,8 +11,8 @@ RUN CGO_ENABLED=0 GOOS=linux go build \
-mod=readonly \
-o modelrt main.go
# Prepare runtime dependencies in a pinned Alpine stage so they can be
# copied into scratch without pulling any vulnerable OS packages at run time.
# prepare runtime dependencies in a pinned alpine stage so they can be
# copied into scratch without pulling any vulnerable os packages at run time.
FROM alpine:3.21 AS certs
ARG USER_ID=1000
RUN apk --no-cache add ca-certificates tzdata && \
@ -21,15 +21,14 @@ RUN apk --no-cache add ca-certificates tzdata && \
FROM scratch
# CA certificates required for TLS connections (RabbitMQ amqps://)
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Timezone data
# timezone data
COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo
# Non-root user/group definitions
# non-root user/group definitions
COPY --from=certs /etc/passwd /etc/passwd
COPY --from=certs /etc/group /etc/group
WORKDIR /app
COPY --from=builder /app/modelrt ./modelrt
COPY configs/config.example.yaml ./configs/config.example.yaml
USER modelrt
CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"]

View File

@ -10,7 +10,7 @@ data:
- name: Loki
type: loki
access: proxy
url: http://loki:3100
url: http://loki-service:3100
isDefault: true
jsonData:
# derivedFields: 从日志的 traceID 字段生成跳转链接到 Jaeger
@ -23,4 +23,4 @@ data:
type: jaeger
uid: jaeger
access: proxy
url: http://jaeger:16686
url: http://jaeger-service:16686

View File

@ -16,6 +16,7 @@ spec:
containers:
- name: grafana
image: grafana/grafana:10.4.2
imagePullPolicy: IfNotPresent
ports:
- containerPort: 3000
env:

View File

@ -1,14 +1,14 @@
apiVersion: v1
kind: Service
metadata:
name: grafana
name: grafana-service
namespace: default
spec:
ports:
- name: http
port: 3000
targetPort: 3000
nodePort: 31000 # Grafana UI: http://<NodeIP>:31000
nodePort: 31000 # Grafana UI: http://<NodeIP>:31000
selector:
app: grafana
type: NodePort

View File

@ -15,6 +15,7 @@ spec:
containers:
- name: jaeger
image: jaegertracing/all-in-one:1.56
imagePullPolicy: IfNotPresent
env:
- name: COLLECTOR_OTLP_ENABLED
value: "true"

View File

@ -1,7 +1,7 @@
apiVersion: v1
kind: Service
metadata:
name: jaeger
name: jaeger-service
labels:
app: jaeger
spec:
@ -9,19 +9,19 @@ spec:
- name: ui
port: 16686
targetPort: 16686
nodePort: 31686 # Jaeger UI浏览器访问 http://<NodeIP>:31686
nodePort: 31686 # Jaeger UI浏览器访问 http://<NodeIP>:31686
- name: collector-http
port: 14268
targetPort: 14268
nodePort: 31268 # Jaeger 原生 HTTP collector非 OTel
nodePort: 31268 # Jaeger 原生 HTTP collector非 OTel
- name: otlp-http
port: 4318
targetPort: 4318
nodePort: 31318 # OTLP HTTP集群外使用 <NodeIP>:31318
nodePort: 31318 # OTLP HTTP集群外使用 <NodeIP>:31318
- name: otlp-grpc
port: 4317
targetPort: 4317
nodePort: 31317 # OTLP gRPC集群外使用 <NodeIP>:31317
nodePort: 31317 # OTLP gRPC集群外使用 <NodeIP>:31317
selector:
app: jaeger
type: NodePort

View File

@ -20,6 +20,7 @@ spec:
containers:
- name: loki
image: grafana/loki:2.9.4
imagePullPolicy: IfNotPresent
args:
- -config.file=/etc/loki/loki.yaml
ports:

View File

@ -1,14 +1,14 @@
apiVersion: v1
kind: Service
metadata:
name: loki
name: loki-service
namespace: default
spec:
ports:
- name: http
port: 3100
targetPort: 3100
nodePort: 31100 # 集群外访问: http://<NodeIP>:31100
nodePort: 31100 # 集群外访问: http://<NodeIP>:31100
selector:
app: loki
type: NodePort

View File

@ -5,7 +5,7 @@ metadata:
data:
config.yaml: |
postgres:
host: "192.168.1.101"
host: "postgres-service"
port: 5432
database: "demo"
user: "postgres"
@ -35,7 +35,7 @@ data:
endpoint: "" # Promtail handles log collection in K8s, direct push disabled
otel:
endpoint: "jaeger:4318"
endpoint: "jaeger-service:4318"
insecure: true
ants:
@ -77,7 +77,7 @@ data:
service_addr: ":8080"
service_name: "modelRT"
secret_key: "" # injected via env SERVICE_SECRET_KEY
deploy_env: "production"
deploy_env: "development"
dataRT:
host: "http://127.0.0.1"

View File

@ -16,8 +16,9 @@ spec:
spec:
containers:
- name: modelrt
image: coslight/modelrt:latest
image: modelrt:v1
imagePullPolicy: IfNotPresent
command: ["/app/modelrt"]
args:
- "-modelRT_config_dir=/app/configs"
- "-modelRT_config_name=config"

View File

@ -1,7 +1,7 @@
apiVersion: v1
kind: Service
metadata:
name: mongodb
name: mongodb-service
labels:
app: mongodb
spec:

View File

@ -34,9 +34,9 @@ spec:
- mongosh
- --eval
- "db.adminCommand('ping')"
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 10
failureThreshold: 12
livenessProbe:
exec:
@ -44,10 +44,10 @@ spec:
- mongosh
- --eval
- "db.adminCommand('ping')"
initialDelaySeconds: 30
periodSeconds: 20
timeoutSeconds: 3
failureThreshold: 3
initialDelaySeconds: 120
periodSeconds: 10
timeoutSeconds: 30
failureThreshold: 5
resources:
requests:
cpu: 100m

View File

@ -7,4 +7,4 @@ spec:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
storage: 6Gi

View File

@ -1,7 +1,7 @@
apiVersion: v1
kind: Service
metadata:
name: postgres
name: postgres-service
labels:
app: postgres
spec:

View File

@ -13,7 +13,7 @@ data:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
- url: http://loki-service:3100/loki/api/v1/push
scrape_configs:
- job_name: kubernetes-pods

View File

@ -19,6 +19,7 @@ spec:
containers:
- name: promtail
image: grafana/promtail:2.9.4
imagePullPolicy: IfNotPresent
args:
- -config.file=/etc/promtail/promtail.yaml
ports:

View File

@ -0,0 +1,14 @@
#!/bin/sh
# Create the rabbitmq server certificate secret.
# Run this script from the directory that contains the three cert files,
# or adjust the paths below to point at the actual files.
#
# Expected files (generated during RabbitMQ TLS setup):
# ca_certificate.pem
# server_certificate.pem
# server_key.pem
kubectl create secret generic rabbitmq-certs \
--from-file=ca_certificate.pem=./ca_certificate.pem \
--from-file=server_certificate.pem=./server_certificate.pem \
--from-file=server_key.pem=./server_key.pem

View File

@ -1,7 +1,7 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: eventrt-rabbitmq
name: rabbitmq
spec:
replicas: 1
selector:
@ -15,6 +15,7 @@ spec:
containers:
- name: rabbitmq
image: rabbitmq:4.1.1-management-alpine
imagePullPolicy: IfNotPresent
ports:
- containerPort: 4369
- containerPort: 5671

View File

@ -0,0 +1,7 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbit-plugins-conf
data:
enabled_plugins: |
[rabbitmq_auth_mechanism_ssl, rabbitmq_management, rabbitmq_management_agent, rabbitmq_prometheus, rabbitmq_web_dispatch].

View File

@ -15,6 +15,7 @@ spec:
containers:
- name: redis
image: redis/redis-stack-server:latest
imagePullPolicy: IfNotPresent
resources:
limits:
memory: "128Mi"

View File

@ -2,24 +2,20 @@
package diagram
import (
"errors"
"fmt"
"sync"
"modelRT/util"
)
// anchorValueOverview define struct of storage all anchor value
var anchorValueOverview sync.Map
// anchorValueOverview define struct of storage all anchor value keyed by component uuid
var anchorValueOverview util.TypedMap[string, string]
// GetAnchorValue define func of get circuit diagram data by componentID
func GetAnchorValue(componentUUID string) (string, error) {
value, ok := diagramsOverview.Load(componentUUID)
anchorValue, ok := anchorValueOverview.Load(componentUUID)
if !ok {
return "", fmt.Errorf("can not find anchor value by componentUUID:%s", componentUUID)
}
anchorValue, ok := value.(string)
if !ok {
return "", errors.New("convert to string failed")
}
return anchorValue, nil
}

View File

@ -2,32 +2,27 @@
package diagram
import (
"errors"
"fmt"
"sync"
"modelRT/orm"
"modelRT/util"
)
// diagramsOverview define struct of storage all circuit diagram data
var diagramsOverview sync.Map
// diagramsOverview define struct of storage all circuit diagram data keyed by component uuid
var diagramsOverview util.TypedMap[string, *orm.Component]
// GetComponentMap define func of get circuit diagram data by component uuid
func GetComponentMap(componentUUID string) (*orm.Component, error) {
value, ok := diagramsOverview.Load(componentUUID)
componentInfo, ok := diagramsOverview.Load(componentUUID)
if !ok {
return nil, fmt.Errorf("can not find graph by global uuid:%s", componentUUID)
}
componentInfo, ok := value.(*orm.Component)
if !ok {
return nil, errors.New("convert to component map struct failed")
}
return componentInfo, nil
}
// UpdateComponentMap define func of update circuit diagram data by component uuid and component info
func UpdateComponentMap(componentID int64, componentInfo *orm.Component) bool {
_, result := diagramsOverview.Swap(componentID, componentInfo)
func UpdateComponentMap(componentUUID string, componentInfo *orm.Component) bool {
_, result := diagramsOverview.Swap(componentUUID, componentInfo)
return result
}

View File

@ -65,9 +65,7 @@ func (g *Graph) AddEdge(from, to uuid.UUID) {
// 创建新的拓扑信息时,如果被链接的点已经存在于游离节点中
// 则将其移除
if _, exist := g.FreeVertexs[toKey]; exist {
delete(g.FreeVertexs, toKey)
}
delete(g.FreeVertexs, toKey)
}
// DelNode delete a node to the graph

View File

@ -3,8 +3,6 @@ package diagram
import (
"context"
"iter"
"maps"
locker "modelRT/distributedlock"
"modelRT/logger"
@ -70,55 +68,3 @@ func (rs *RedisZSet) ZRANGE(setKey string, start, stop int64) ([]string, error)
}
return results, nil
}
type Comparer[T any] interface {
Compare(T) int
}
type ComparableComparer[T any] interface {
Compare(T) int
comparable // 直接嵌入 comparable 约束
}
type methodNode[E Comparer[E]] struct {
value E
left *methodNode[E]
right *methodNode[E]
}
type MethodTree[E Comparer[E]] struct {
root *methodNode[E]
}
type OrderedSet[E interface {
comparable
Comparer[E]
}] struct {
tree MethodTree[E]
elements map[E]bool
}
type ComparableOrderedSet[E ComparableComparer[E]] struct {
tree MethodTree[E]
elements map[E]bool
}
type Set[E any] interface {
Insert(E)
Delete(E)
Has(E) bool
All() iter.Seq[E]
}
func InsertAll[E any](set Set[E], seq iter.Seq[E]) {
for v := range seq {
set.Insert(v)
}
}
type HashSet[E comparable] map[E]bool
func (s HashSet[E]) Insert(v E) { s[v] = true }
func (s HashSet[E]) Delete(v E) { delete(s, v) }
func (s HashSet[E]) Has(v E) bool { return s[v] }
func (s HashSet[E]) All() iter.Seq[E] { return maps.Keys(s) }

View File

@ -2,32 +2,27 @@
package diagram
import (
"errors"
"fmt"
"sync"
"modelRT/util"
)
// graphOverview define struct of storage all circuit diagram topologic data
var graphOverview sync.Map
// graphOverview define struct of storage all circuit diagram topologic data keyed by pageID
var graphOverview util.TypedMap[int64, *Graph]
// PrintGrapMap define func of print circuit diagram topologic info data
func PrintGrapMap() {
graphOverview.Range(func(key, value any) bool {
fmt.Println(key, value)
return true
})
for pageID, graph := range graphOverview.All() {
fmt.Println(pageID, graph)
}
}
// GetGraphMap define func of get circuit diagram topologic data by pageID
func GetGraphMap(pageID int64) (*Graph, error) {
value, ok := graphOverview.Load(pageID)
graph, ok := graphOverview.Load(pageID)
if !ok {
return nil, fmt.Errorf("can not find graph by pageID:%d", pageID)
}
graph, ok := value.(*Graph)
if !ok {
return nil, errors.New("convert to graph struct failed")
}
return graph, nil
}

View File

@ -137,7 +137,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
c.JSON(http.StatusOK, resp)
return
}
diagram.UpdateComponentMap(info.ID, component)
diagram.UpdateComponentMap(info.UUID, component)
}
if len(request.FreeVertexs) > 0 {

View File

@ -4,6 +4,7 @@ package logger
import (
"context"
"errors"
"fmt"
"time"
"gorm.io/gorm"
@ -29,17 +30,17 @@ func (l *GormLogger) LogMode(_ gormLogger.LogLevel) gormLogger.Interface {
// Info define func for implementing gormLogger.Interface
func (l *GormLogger) Info(ctx context.Context, msg string, data ...any) {
Info(ctx, msg, "data", data)
Info(ctx, fmt.Sprintf(msg, data...))
}
// Warn define func for implementing gormLogger.Interface
func (l *GormLogger) Warn(ctx context.Context, msg string, data ...any) {
Warn(ctx, msg, "data", data)
Warn(ctx, fmt.Sprintf(msg, data...))
}
// Error define func for implementing gormLogger.Interface
func (l *GormLogger) Error(ctx context.Context, msg string, data ...any) {
Error(ctx, msg, "data", data)
Error(ctx, fmt.Sprintf(msg, data...))
}
// Trace define func for implementing gormLogger.Interface

View File

@ -47,8 +47,7 @@ func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer {
client: &http.Client{Timeout: 5 * time.Second},
ch: make(chan string, 512),
}
ls.wg.Add(1)
go ls.run()
ls.wg.Go(ls.run)
return ls
}
@ -70,7 +69,6 @@ func (ls *lokiSyncer) Sync() error {
}
func (ls *lokiSyncer) run() {
defer ls.wg.Done()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

View File

@ -179,8 +179,8 @@ func main() {
// init async task worker
taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient)
if err != nil {
logger.Error(ctx, "Failed to initialize task worker", "error", err)
// Continue without task worker, but log warning
logger.Error(ctx, "failed to initialize task worker", "error", err)
// continue without task worker, but log warning
} else {
go taskWorker.Start()
defer taskWorker.Stop()
@ -258,7 +258,7 @@ func main() {
gin.SetMode(gin.ReleaseMode)
}
engine := gin.New()
// 添加CORS中间件
// add CORS middleware
engine.Use(cors.New(cors.Config{
AllowOrigins: []string{"*"}, // 或指定具体域名
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
@ -278,7 +278,7 @@ func main() {
Handler: engine,
}
// creating a System Signal Receiver
// creating a system signal receiver
done := make(chan os.Signal, 10)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {

View File

@ -10,6 +10,7 @@ import (
"modelRT/diagram"
"modelRT/logger"
"modelRT/orm"
"modelRT/util"
"github.com/RediSearch/redisearch-go/v2/redisearch"
)
@ -63,10 +64,9 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
}
safeSAdd(constants.RedisAllGridSetKey, measSet.AllGridTags)
gridSug := make([]redisearch.Suggestion, 0, len(measSet.AllGridTags))
for _, gridTag := range measSet.AllGridTags {
gridSug = append(gridSug, redisearch.Suggestion{Term: gridTag, Score: constants.DefaultScore})
}
gridSug := util.MapSlice(measSet.AllGridTags, func(gridTag string) redisearch.Suggestion {
return redisearch.Suggestion{Term: gridTag, Score: constants.DefaultScore}
})
ac.AddTerms(gridSug...)
safeSAdd(constants.RedisAllZoneSetKey, measSet.AllZoneTags)
@ -78,19 +78,16 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
// building the grid -> zones hierarchy
for gridTag, zoneTags := range measSet.GridToZoneTags {
sug := make([]redisearch.Suggestion, 0, len(zoneTags))
for _, zoneTag := range zoneTags {
term := fmt.Sprintf("%s.%s", gridTag, zoneTag)
// add redis fuzzy search suggestion for token1-token7 type
sug = append(sug, redisearch.Suggestion{Term: term, Score: constants.DefaultScore})
}
// add redis fuzzy search suggestion for token1-token7 type
sug := util.MapSlice(zoneTags, func(zoneTag string) redisearch.Suggestion {
return redisearch.Suggestion{Term: fmt.Sprintf("%s.%s", gridTag, zoneTag), Score: constants.DefaultScore}
})
safeSAdd(fmt.Sprintf(constants.RedisSpecGridZoneSetKey, gridTag), zoneTags)
ac.AddTerms(sug...)
}
// building the zone -> stations hierarchy
for zoneTag, stationTags := range measSet.ZoneToStationTags {
sug := make([]redisearch.Suggestion, 0, len(stationTags))
gridTag, exists := zoneToGridPath[zoneTag]
if !exists {
err := fmt.Errorf("zone tag to grid tag mapping not found for zoneTag: %s", zoneTag)
@ -98,11 +95,10 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
return nil, nil, err
}
for _, stationTag := range stationTags {
// add redis fuzzy search suggestion for token1-token7 type
term := fmt.Sprintf("%s.%s.%s", gridTag, zoneTag, stationTag)
sug = append(sug, redisearch.Suggestion{Term: term, Score: constants.DefaultScore})
}
// add redis fuzzy search suggestion for token1-token7 type
sug := util.MapSlice(stationTags, func(stationTag string) redisearch.Suggestion {
return redisearch.Suggestion{Term: fmt.Sprintf("%s.%s.%s", gridTag, zoneTag, stationTag), Score: constants.DefaultScore}
})
safeSAdd(fmt.Sprintf(constants.RedisSpecZoneStationSetKey, zoneTag), stationTags)
ac.AddTerms(sug...)

View File

@ -59,36 +59,36 @@ func NewPower104DataSource(station string, packet, offset int) (*MeasurementData
}
func generateChannelName(prefix string, number int, suffix string) (string, error) {
// shortPrefix is the literal prefix written into the channel name (tm/ts/tc),
// maxNumber is the inclusive upper bound, padWidth is the zero-padded digit width.
var shortPrefix string
var maxNumber, padWidth int
switch prefix {
case constants.ChannelPrefixTelemetry:
if number > 10 {
return "", common.ErrExceedsLimitType
}
var builder strings.Builder
numberStr := strconv.Itoa(number)
builder.Grow(len(prefix) + len(numberStr) + len(suffix))
builder.WriteString(prefix)
builder.WriteString(numberStr)
builder.WriteString(suffix)
channelName := builder.String()
return channelName, nil
shortPrefix, maxNumber, padWidth = "tm", 8, 1
case constants.ChannelPrefixTelesignal:
var numberStr string
if number < 10 {
numberStr = "0" + strconv.Itoa(number)
}
numberStr = strconv.Itoa(number)
var builder strings.Builder
builder.Grow(len(prefix) + len(numberStr) + len(suffix))
builder.WriteString(prefix)
builder.WriteString(numberStr)
builder.WriteString(suffix)
channelName := builder.String()
return channelName, nil
shortPrefix, maxNumber, padWidth = "ts", 16, 2
case constants.ChannelPrefixTelecommand:
shortPrefix, maxNumber, padWidth = "tc", 9, 1
default:
return "", common.ErrUnsupportedChannelPrefixType
}
if number < 1 || number > maxNumber {
return "", common.ErrExceedsLimitType
}
numberStr := strconv.Itoa(number)
if len(numberStr) < padWidth {
numberStr = strings.Repeat("0", padWidth-len(numberStr)) + numberStr
}
var builder strings.Builder
builder.Grow(len(shortPrefix) + len(numberStr) + len(suffix))
builder.WriteString(shortPrefix)
builder.WriteString(numberStr)
builder.WriteString(suffix)
return builder.String(), nil
}
// NewTelemetryChannel define func of generate telemetry channel CL3611 data source
@ -109,6 +109,15 @@ func NewTelesignalChannel(station, device, channelNameSuffix string, channelNumb
return NewCL3611DataSource(station, device, channelName)
}
// NewTelecommandChannel define func of generate telecommand channel CL3611 data source
func NewTelecommandChannel(station, device, channelNameSuffix string, channelNumber int) (*MeasurementDataSource, error) {
channelName, err := generateChannelName(constants.ChannelPrefixTelecommand, channelNumber, channelNameSuffix)
if err != nil {
return nil, fmt.Errorf("failed to generate channel name: %w", err)
}
return NewCL3611DataSource(station, device, channelName)
}
// NewStandardChannel define func of generate standard channel CL3611 data source
func NewStandardChannel(station, device, channelType string) (*MeasurementDataSource, error) {
return NewCL3611DataSource(station, device, channelType)
@ -264,9 +273,9 @@ func GenerateMeasureIdentifier(source map[string]any) (string, error) {
func concatP104WithPlus(station string, packet int, offset int) string {
packetStr := strconv.Itoa(packet)
offsetStr := strconv.Itoa(offset)
return station + ":" + packetStr + ":" + offsetStr
return strings.ToLower(station + ":104:" + packetStr + ":" + offsetStr)
}
func concatCL361WithPlus(station, device, channel string) string {
return station + ":" + device + ":" + "phasor" + ":" + channel
return strings.ToLower(station + ":" + device + ":" + "phasor" + ":" + channel)
}

View File

@ -2,7 +2,7 @@
package realtimedata
import (
"sync"
"modelRT/util"
)
// ComputeConfig define struct of measurement computation
@ -19,54 +19,15 @@ type ComputeConfig struct {
Analyzer RealTimeAnalyzer
}
// MeasComputeState define struct of manages the state of measurement computations using sync.Map
// MeasComputeState define struct of manages the state of measurement
// computations. It embeds util.TypedMap to inherit the concurrency-safe,
// type-safe Store/Load/Delete/LoadOrStore/Range/All/Len operations without
// per-call-site type assertions.
type MeasComputeState struct {
measMap sync.Map
util.TypedMap[string, *ComputeConfig]
}
// NewMeasComputeState define func to create and returns a new instance of MeasComputeState
func NewMeasComputeState() *MeasComputeState {
return &MeasComputeState{}
}
// Store define func to store a compute configuration for the specified key
func (m *MeasComputeState) Store(key string, config *ComputeConfig) {
m.measMap.Store(key, config)
}
// Load define func to retrieve the compute configuration for the specified key
func (m *MeasComputeState) Load(key string) (*ComputeConfig, bool) {
value, ok := m.measMap.Load(key)
if !ok {
return nil, false
}
return value.(*ComputeConfig), true
}
// Delete define func to remove the compute configuration for the specified key
func (m *MeasComputeState) Delete(key string) {
m.measMap.Delete(key)
}
// LoadOrStore define func to returns the existing compute configuration for the key if present,otherwise stores and returns the given configuration
func (m *MeasComputeState) LoadOrStore(key string, config *ComputeConfig) (*ComputeConfig, bool) {
value, loaded := m.measMap.LoadOrStore(key, config)
return value.(*ComputeConfig), loaded
}
// Range define func to iterate over all key-configuration pairs in the map
func (m *MeasComputeState) Range(f func(key string, config *ComputeConfig) bool) {
m.measMap.Range(func(key, value any) bool {
return f(key.(string), value.(*ComputeConfig))
})
}
// Len define func to return the number of compute configurations in the map
func (m *MeasComputeState) Len() int {
count := 0
m.measMap.Range(func(_, _ any) bool {
count++
return true
})
return count
}

View File

@ -44,7 +44,7 @@ func (f *HandlerFactory) RegisterHandler(ctx context.Context, taskType TaskType,
defer f.mu.Unlock()
f.handlers[taskType] = handler
logger.Info(ctx, "Handler registered",
logger.Info(ctx, "handler registered",
"task_type", taskType,
"handler_name", handler.Name(),
)
@ -319,7 +319,7 @@ func NewEventAnalysisHandler() *EventAnalysisHandler {
// Execute processes an event analysis task
func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
logger.Info(ctx, "Starting event analysis",
logger.Info(ctx, "starting event analysis",
"task_id", taskID,
"task_params", params,
)
@ -332,7 +332,7 @@ func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, pa
// 4. Storing results in database
// Simulate work
logger.Info(ctx, "Event analysis completed",
logger.Info(ctx, "event analysis completed",
"task_id", taskID,
"task_params", params,
"db", db,
@ -360,7 +360,7 @@ func NewBatchImportHandler() *BatchImportHandler {
// Execute processes a batch import task
func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
logger.Info(ctx, "Starting batch import",
logger.Info(ctx, "starting batch import",
"task_id", taskID,
"task_params", params,
"db", db,
@ -374,7 +374,7 @@ func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, para
// 4. Generating import report
// Simulate work
logger.Info(ctx, "Batch import completed",
logger.Info(ctx, "batch import completed",
"task_id", taskID,
"task_params", params,
"db", db,

View File

@ -30,7 +30,7 @@ func InitTaskWorker(ctx context.Context, config config.ModelRTConfig, db *gorm.D
return nil, fmt.Errorf("failed to create task worker: %w", err)
}
logger.Info(ctx, "Task worker initialized",
logger.Info(ctx, "task worker initialized",
"worker_pool_size", workerCfg.PoolSize,
"queue_consumers", workerCfg.QueueConsumerCount,
)

View File

@ -21,7 +21,7 @@ func NewMetricsLogger(ctx context.Context) *MetricsLogger {
// LogTaskMetrics records task processing metrics
func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, processingTime time.Duration, retryCount int) {
logger.Info(m.ctx, "Task metrics",
logger.Info(m.ctx, "task metrics",
"task_type", taskType,
"status", status,
"processing_time_ms", processingTime.Milliseconds(),
@ -33,7 +33,7 @@ func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, process
// LogQueueMetrics records queue metrics
func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Duration) {
logger.Info(m.ctx, "Queue metrics",
logger.Info(m.ctx, "queue metrics",
"queue_depth", queueDepth,
"queue_latency_ms", queueLatency.Milliseconds(),
"metric_type", "queue",
@ -43,7 +43,7 @@ func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Durati
// LogWorkerMetrics records worker metrics
func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorkers int, memoryUsage uint64, cpuLoad float64) {
logger.Info(m.ctx, "Worker metrics",
logger.Info(m.ctx, "worker metrics",
"active_workers", activeWorkers,
"idle_workers", idleWorkers,
"total_workers", totalWorkers,
@ -56,7 +56,7 @@ func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorker
// LogRetryMetrics records retry metrics
func (m *MetricsLogger) LogRetryMetrics(taskType TaskType, retryCount int, success bool, delay time.Duration) {
logger.Info(m.ctx, "Retry metrics",
logger.Info(m.ctx, "retry metrics",
"task_type", taskType,
"retry_count", retryCount,
"retry_success", success,
@ -71,7 +71,7 @@ func (m *MetricsLogger) LogSystemMetrics() {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
logger.Info(m.ctx, "System metrics",
logger.Info(m.ctx, "system metrics",
"metric_type", "system",
"timestamp", time.Now().Unix(),
"goroutines", runtime.NumGoroutine(),
@ -90,7 +90,7 @@ func (m *MetricsLogger) LogSystemMetrics() {
func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string, startTime, endTime time.Time, retryCount int, errorMsg string) {
duration := endTime.Sub(startTime)
logger.Info(m.ctx, "Task completion metrics",
logger.Info(m.ctx, "task completion metrics",
"metric_type", "task_completion",
"timestamp", time.Now().Unix(),
"task_id", taskID,
@ -107,7 +107,7 @@ func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string
// LogHealthCheckMetrics records health check metrics
func (m *MetricsLogger) LogHealthCheckMetrics(healthy bool, checkDuration time.Duration, components map[string]bool) {
logger.Info(m.ctx, "Health check metrics",
logger.Info(m.ctx, "health check metrics",
"metric_type", "health_check",
"timestamp", time.Now().Unix(),
"healthy", healthy,

View File

@ -67,12 +67,12 @@ func (p *QueueProducer) declareInfrastructure() error {
// Declare durable direct exchange
err := p.ch.ExchangeDeclare(
constants.TaskExchangeName, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to declare exchange: %w", err)
@ -81,12 +81,12 @@ func (p *QueueProducer) declareInfrastructure() error {
// Declare durable queue with priority support and message TTL
_, err = p.ch.QueueDeclare(
constants.TaskQueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), // message TTL
},
)
@ -99,8 +99,8 @@ func (p *QueueProducer) declareInfrastructure() error {
constants.TaskQueueName, // queue name
constants.TaskRoutingKey, // routing key
constants.TaskExchangeName, // exchange name
false, // no-wait
nil, // arguments
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to bind queue: %w", err)
@ -148,15 +148,15 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT
ctx,
constants.TaskExchangeName, // exchange
constants.TaskRoutingKey, // routing key
false, // mandatory
false, // immediate
false, // mandatory
false, // immediate
publishing,
)
if err != nil {
return fmt.Errorf("failed to publish task message: %w", err)
}
logger.Info(ctx, "Task published to queue",
logger.Info(ctx, "task published to queue",
"task_id", taskID.String(),
"task_type", taskType,
"priority", priority,
@ -180,7 +180,7 @@ func (p *QueueProducer) PublishTaskWithRetry(ctx context.Context, taskID uuid.UU
backoff := time.Duration(1<<uint(i)) * time.Second
backoff = min(backoff, 10*time.Second)
logger.Warn(ctx, "Failed to publish task, retrying",
logger.Warn(ctx, "failed to publish task, retrying",
"task_id", taskID.String(),
"attempt", i+1,
"max_retries", maxRetries,
@ -211,10 +211,10 @@ func (p *QueueProducer) Close() error {
func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) {
queue, err := p.ch.QueueDeclarePassive(
constants.TaskQueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-max-priority": constants.TaskMaxPriority,
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(),
@ -246,20 +246,20 @@ func PushTaskToRabbitMQ(ctx context.Context, cfg config.RabbitMQConfig, taskChan
case <-ctx.Done():
logger.Info(ctx, "push task to RabbitMQ stopped by context cancel")
return
case msg, ok := <-taskChan:
case task, ok := <-taskChan:
if !ok {
logger.Info(ctx, "task channel closed, exiting push loop")
return
}
// Restore trace context from the handler that enqueued this message
taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier))
taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(task.TraceCarrier))
taskCtx, pubSpan := otel.Tracer("modelRT/task").Start(taskCtx, "task.publish",
oteltrace.WithAttributes(attribute.String("task_id", msg.TaskID.String())),
oteltrace.WithAttributes(attribute.String("task_id", task.TaskID.String())),
)
if err := producer.PublishTaskWithRetry(taskCtx, msg.TaskID, msg.TaskType, msg.Priority, msg.Params, 3); err != nil {
if err := producer.PublishTaskWithRetry(taskCtx, task.TaskID, task.TaskType, task.Priority, task.Params, 3); err != nil {
pubSpan.RecordError(err)
logger.Error(taskCtx, "publish task to RabbitMQ failed",
"task_id", msg.TaskID, "error", err)
"task_id", task.TaskID, "error", err)
}
pubSpan.End()
}

View File

@ -57,7 +57,7 @@ func NewExponentialBackoffRetry(maxRetries int, initialDelay, maxDelay time.Dura
// ShouldRetry implements exponential backoff with jitter
func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string, retryCount int, lastError error) (bool, time.Duration) {
if retryCount >= s.MaxRetries {
logger.Info(ctx, "Task reached maximum retry count",
logger.Info(ctx, "task reached maximum retry count",
"task_id", taskID,
"retry_count", retryCount,
"max_retries", s.MaxRetries,
@ -86,7 +86,7 @@ func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string
}
}
logger.Info(ctx, "Task will be retried",
logger.Info(ctx, "task will be retried",
"task_id", taskID,
"retry_count", retryCount,
"next_retry_in", delay,

View File

@ -38,7 +38,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
shouldRetry, delay := q.strategy.ShouldRetry(ctx, taskID.String(), retryCount, lastError)
if !shouldRetry {
// Mark task as permanently failed
logger.Info(ctx, "Task will not be retried, marking as failed",
logger.Info(ctx, "task will not be retried, marking as failed",
"task_id", taskID,
"retry_count", retryCount,
"max_retries", q.strategy.GetMaxRetries(),
@ -63,7 +63,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
}
if err := database.UpdateTaskErrorInfo(ctx, tx, taskID, errorMsg, ""); err != nil {
// Log but don't fail the whole retry scheduling
logger.Warn(ctx, "Failed to update task error info",
logger.Warn(ctx, "failed to update task error info",
"task_id", taskID,
"error", err,
)
@ -74,7 +74,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
})
if err != nil {
logger.Error(ctx, "Failed to schedule task retry",
logger.Error(ctx, "failed to schedule task retry",
"task_id", taskID,
"task_type", taskType,
"retry_count", retryCount,
@ -84,7 +84,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
return err
}
logger.Info(ctx, "Task scheduled for retry",
logger.Info(ctx, "task scheduled for retry",
"task_id", taskID,
"task_type", taskType,
"retry_count", retryCount+1,
@ -100,7 +100,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
// Get tasks due for retry
tasks, err := database.GetTasksForRetry(ctx, q.db, batchSize)
if err != nil {
logger.Error(ctx, "Failed to get tasks for retry", "error", err)
logger.Error(ctx, "failed to get tasks for retry", "error", err)
return err
}
@ -108,7 +108,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
return nil
}
logger.Info(ctx, "Processing retry queue",
logger.Info(ctx, "processing retry queue",
"task_count", len(tasks),
"batch_size", batchSize,
)
@ -121,7 +121,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
// Publish task to queue for immediate processing
taskType := TaskType(task.TaskType)
if err := q.producer.PublishTask(ctx, task.TaskID, taskType, task.Priority, map[string]any(task.Params)); err != nil {
logger.Error(ctx, "Failed to publish retry task to queue",
logger.Error(ctx, "failed to publish retry task to queue",
"task_id", task.TaskID,
"task_type", taskType,
"error", err,
@ -132,7 +132,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
// Update task status back to submitted
if err := database.UpdateAsyncTaskStatus(ctx, q.db, task.TaskID, "SUBMITTED"); err != nil {
logger.Warn(ctx, "Failed to update retry task status",
logger.Warn(ctx, "failed to update retry task status",
"task_id", task.TaskID,
"error", err,
)
@ -140,13 +140,13 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
// Clear next retry time since task is being retried now
if err := database.UpdateTaskRetryInfo(ctx, q.db, task.TaskID, task.RetryCount, 0); err != nil {
logger.Warn(ctx, "Failed to clear next retry time",
logger.Warn(ctx, "failed to clear next retry time",
"task_id", task.TaskID,
"error", err,
)
}
logger.Info(ctx, "Retry task resubmitted",
logger.Info(ctx, "retry task resubmitted",
"task_id", task.TaskID,
"task_type", taskType,
"retry_count", task.RetryCount,
@ -166,11 +166,11 @@ func (q *RetryQueue) StartRetryScheduler(ctx context.Context, interval time.Dura
for {
select {
case <-ctx.Done():
logger.Info(ctx, "Retry scheduler stopping")
logger.Info(ctx, "retry scheduler stopping")
return
case <-ticker.C:
if err := q.ProcessRetryQueue(ctx, batchSize); err != nil {
logger.Error(ctx, "Error processing retry queue", "error", err)
logger.Error(ctx, "error processing retry queue", "error", err)
}
}
}

View File

@ -91,7 +91,7 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
return fmt.Errorf("invalid parameter type for TestTask")
}
logger.Info(ctx, "Starting test task executionser",
logger.Info(ctx, "starting test task executionser",
"task_id", taskID,
"sleep_duration_seconds", params.SleepDuration,
"message", params.Message,
@ -113,14 +113,14 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
// Save result to database
if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil {
logger.Error(ctx, "Failed to save test task result",
logger.Error(ctx, "failed to save test task result",
"task_id", taskID,
"error", err,
)
return fmt.Errorf("failed to save task result: %w", err)
}
logger.Info(ctx, "Test task completed successfully",
logger.Info(ctx, "test task completed successfully",
"task_id", taskID,
"sleep_duration_seconds", params.SleepDuration,
)
@ -142,7 +142,7 @@ func NewTestTaskHandler() *TestTaskHandler {
// Execute processes a test task using the unified task interface
func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
logger.Info(ctx, "Executing test task",
logger.Info(ctx, "executing test task",
"task_id", taskID,
"task_params", params,
"db", db,

View File

@ -178,30 +178,26 @@ func NewTaskWorker(ctx context.Context, cfg WorkerConfig, db *gorm.DB, rabbitCfg
// Start begins consuming tasks from the queue
func (w *TaskWorker) Start() error {
logger.Info(w.ctx, "Starting task worker",
logger.Info(w.ctx, "starting task worker",
"pool_size", w.cfg.PoolSize,
"queue_consumers", w.cfg.QueueConsumerCount,
)
// Start multiple consumers for better throughput
for i := 0; i < w.cfg.QueueConsumerCount; i++ {
w.wg.Add(1)
go w.consumerLoop(i)
w.wg.Go(func() { w.consumerLoop(i) })
}
// Start health check goroutine
w.wg.Add(1)
go w.healthCheckLoop()
w.wg.Go(w.healthCheckLoop)
logger.Info(w.ctx, "Task worker started successfully")
logger.Info(w.ctx, "task worker started successfully")
return nil
}
// consumerLoop runs a single RabbitMQ consumer
func (w *TaskWorker) consumerLoop(consumerID int) {
defer w.wg.Done()
logger.Info(w.ctx, "Starting consumer", "consumer_id", consumerID)
logger.Info(w.ctx, "starting consumer", "consumer_id", consumerID)
// Consume messages from the queue
msgs, err := w.ch.Consume(
@ -214,7 +210,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
nil, // args
)
if err != nil {
logger.Error(w.ctx, "Failed to start consumer",
logger.Error(w.ctx, "failed to start consumer",
"consumer_id", consumerID,
"error", err,
)
@ -224,11 +220,11 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
for {
select {
case <-w.stopChan:
logger.Info(w.ctx, "Consumer stopping", "consumer_id", consumerID)
logger.Info(w.ctx, "consumer stopping", "consumer_id", consumerID)
return
case msg, ok := <-msgs:
if !ok {
logger.Warn(w.ctx, "Consumer channel closed", "consumer_id", consumerID)
logger.Warn(w.ctx, "consumer channel closed", "consumer_id", consumerID)
return
}
@ -237,7 +233,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
w.handleMessage(msg)
})
if err != nil {
logger.Error(w.ctx, "Failed to submit task to pool",
logger.Error(w.ctx, "failed to submit task to pool",
"consumer_id", consumerID,
"error", err,
)
@ -265,7 +261,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Parse task message
var taskMsg TaskQueueMessage
if err := json.Unmarshal(msg.Body, &taskMsg); err != nil {
logger.Error(ctx, "Failed to unmarshal task message", "error", err)
logger.Error(ctx, "failed to unmarshal task message", "error", err)
msg.Nack(false, false) // Reject without requeue
w.metrics.mu.Lock()
w.metrics.TotalFailed++
@ -275,7 +271,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Validate message
if !taskMsg.Validate() {
logger.Error(ctx, "Invalid task message",
logger.Error(ctx, "invalid task message",
"task_id", taskMsg.TaskID,
"task_type", taskMsg.TaskType,
)
@ -299,7 +295,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
defer span.End()
ctx = taskCtx
logger.Info(ctx, "Processing task",
logger.Info(ctx, "processing task",
"task_id", taskMsg.TaskID,
"task_type", taskMsg.TaskType,
"priority", taskMsg.Priority,
@ -307,7 +303,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Update task status to RUNNING in database
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusRunning); err != nil {
logger.Error(ctx, "Failed to update task status", "error", err)
logger.Error(ctx, "failed to update task status", "error", err)
msg.Nack(false, true) // Reject with requeue
w.metrics.mu.Lock()
w.metrics.TotalFailed++
@ -326,7 +322,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
processingTime := time.Since(startTime)
if err != nil {
logger.Error(ctx, "Task execution failed",
logger.Error(ctx, "task execution failed",
"task_id", taskMsg.TaskID,
"task_type", taskMsg.TaskType,
"processing_time", processingTime,
@ -335,7 +331,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Update task status to FAILED
if updateErr := w.updateTaskWithError(ctx, taskMsg.TaskID, err); updateErr != nil {
logger.Error(ctx, "Failed to update task with error", "error", updateErr)
logger.Error(ctx, "failed to update task with error", "error", updateErr)
}
if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil {
@ -353,7 +349,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Update task status to COMPLETED
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusCompleted); err != nil {
logger.Error(ctx, "Failed to update task status to completed", "error", err)
logger.Error(ctx, "failed to update task status to completed", "error", err)
// Still ack the message since task was processed successfully
}
@ -364,7 +360,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Acknowledge message
msg.Ack(false)
logger.Info(ctx, "Task completed successfully",
logger.Info(ctx, "task completed successfully",
"task_id", taskMsg.TaskID,
"task_type", taskMsg.TaskType,
"processing_time", processingTime,
@ -398,7 +394,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
// Update task status in database
err := database.UpdateAsyncTaskStatus(ctx, w.db, taskID, ormStatus)
if err != nil {
logger.Error(ctx, "Failed to update task status in database",
logger.Error(ctx, "failed to update task status in database",
"task_id", taskID,
"status", status,
"error", err,
@ -410,7 +406,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
if status == StatusRunning {
startedAt := time.Now().Unix()
if err := database.UpdateTaskStarted(ctx, w.db, taskID, startedAt); err != nil {
logger.Warn(ctx, "Failed to update task start time",
logger.Warn(ctx, "failed to update task start time",
"task_id", taskID,
"error", err,
)
@ -423,14 +419,14 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
finishedAt := time.Now().Unix()
if status == StatusCompleted {
if err := database.CompleteAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
logger.Warn(ctx, "Failed to mark task as completed",
logger.Warn(ctx, "failed to mark task as completed",
"task_id", taskID,
"error", err,
)
}
} else {
if err := database.FailAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
logger.Warn(ctx, "Failed to mark task as failed",
logger.Warn(ctx, "failed to mark task as failed",
"task_id", taskID,
"error", err,
)
@ -438,7 +434,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
}
}
logger.Debug(ctx, "Task status updated",
logger.Debug(ctx, "task status updated",
"task_id", taskID,
"status", status,
)
@ -451,16 +447,16 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID,
stackTrace := fmt.Sprintf("%+v", err)
if updateErr := database.UpdateTaskErrorInfo(ctx, w.db, taskID, errorMsg, stackTrace); updateErr != nil {
logger.Error(ctx, "Failed to update task error info", "task_id", taskID, "error", updateErr)
logger.Error(ctx, "failed to update task error info", "task_id", taskID, "error", updateErr)
return updateErr
}
if updateErr := database.UpdateAsyncTaskResultWithError(ctx, w.db, taskID, 500, errorMsg, nil); updateErr != nil {
logger.Error(ctx, "Failed to update task result with error", "task_id", taskID, "error", updateErr)
logger.Error(ctx, "failed to update task result with error", "task_id", taskID, "error", updateErr)
return updateErr
}
logger.Warn(ctx, "Task failed with error", "task_id", taskID, "error", errorMsg)
logger.Warn(ctx, "task failed with error", "task_id", taskID, "error", errorMsg)
return nil
}
@ -469,7 +465,7 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID,
func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uuid.UUID, params map[string]any, msg *amqp.Delivery) error {
handler, err := w.factory.GetHandler(taskType)
if err != nil {
logger.Error(ctx, "No handler for task type", "task_type", taskType)
logger.Error(ctx, "no handler for task type", "task_type", taskType)
msg.Nack(false, false)
return err
}
@ -478,8 +474,6 @@ func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uui
// healthCheckLoop periodically checks worker health and metrics
func (w *TaskWorker) healthCheckLoop() {
defer w.wg.Done()
ticker := time.NewTicker(w.cfg.PollingInterval)
defer ticker.Stop()
@ -519,7 +513,7 @@ func (w *TaskWorker) checkHealth() {
w.metrics.WorkersIdle = w.pool.Free()
w.metrics.LastHealthCheck = time.Now()
logger.Info(w.ctx, "Worker health check",
logger.Info(w.ctx, "worker health check",
"tasks_processed", w.metrics.TotalProcessed,
"tasks_failed", w.metrics.TotalFailed,
"tasks_success", w.metrics.TotalSuccess,
@ -536,7 +530,7 @@ func (w *TaskWorker) checkHealth() {
// Stop gracefully stops the task worker
func (w *TaskWorker) Stop() error {
logger.Info(w.ctx, "Stopping task worker")
logger.Info(w.ctx, "stopping task worker")
// Signal all goroutines to stop
close(w.stopChan)

View File

@ -7,15 +7,22 @@ import (
"github.com/redis/go-redis/v9"
)
// MapSlice define func to build a new slice by applying f to every element of s.
func MapSlice[T, U any](s []T, f func(T) U) []U {
result := make([]U, 0, len(s))
for _, item := range s {
result = append(result, f(item))
}
return result
}
// ConvertZSetMembersToFloat64 define func to conver zset member type to float64
func ConvertZSetMembersToFloat64(members []redis.Z) []float64 {
dataFloats := make([]float64, 0, len(members))
// recovery time sorted in ascending order
sortRedisZByTimeMemberAscending(members)
for _, member := range members {
dataFloats = append(dataFloats, member.Score)
}
return dataFloats
return MapSlice(members, func(member redis.Z) float64 {
return member.Score
})
}
func sortRedisZByTimeMemberAscending(data []redis.Z) {

View File

@ -1,11 +1,13 @@
// Package util provide some utility functions
package util
// GetKeysFromSet define func to get all keys from a map[string]struct{}
func GetKeysFromSet(set map[string]struct{}) []string {
keys := make([]string, 0, len(set))
for key := range set {
keys = append(keys, key)
}
return keys
import (
"maps"
"slices"
)
// GetKeysFromSet define func to get all keys from a set-like map.
// It delegates to the standard library maps/slices helpers.
func GetKeysFromSet[K comparable, V any](set map[K]V) []K {
return slices.Collect(maps.Keys(set))
}

View File

@ -1,12 +1,9 @@
// Package util provide some utility functions
package util
// RemoveTargetsFromSliceSimple define func to remove targets from a slice of strings
func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []string) []string {
targetsToRemoveSet := make(map[string]struct{}, len(targetsToRemove))
for _, target := range targetsToRemove {
targetsToRemoveSet[target] = struct{}{}
}
// RemoveTargetsFromSliceSimple define func to remove targets from a slice
func RemoveTargetsFromSliceSimple[T comparable](targetsSlice []T, targetsToRemove []T) []T {
targetsToRemoveSet := SliceToSet(targetsToRemove)
for i := len(targetsSlice) - 1; i >= 0; i-- {
if _, found := targetsToRemoveSet[targetsSlice[i]]; found {
@ -17,21 +14,21 @@ func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []strin
return targetsSlice
}
// SliceToSet define func to convert string slice to set
func SliceToSet(targetsSlice []string) map[string]struct{} {
set := make(map[string]struct{}, len(targetsSlice))
// SliceToSet define func to convert a slice to a set
func SliceToSet[T comparable](targetsSlice []T) map[T]struct{} {
set := make(map[T]struct{}, len(targetsSlice))
for _, target := range targetsSlice {
set[target] = struct{}{}
}
return set
}
// DeduplicateAndReportDuplicates define func to deduplicate a slice of strings and report duplicates
func DeduplicateAndReportDuplicates(targetsSlice []string, sourceSlice []string) (deduplicated []string, duplicates []string) {
// DeduplicateAndReportDuplicates define func to deduplicate a slice and report duplicates
func DeduplicateAndReportDuplicates[T comparable](targetsSlice []T, sourceSlice []T) (deduplicated []T, duplicates []T) {
targetSet := SliceToSet(targetsSlice)
deduplicated = make([]string, 0, len(sourceSlice))
deduplicated = make([]T, 0, len(sourceSlice))
// duplicate items slice
duplicates = make([]string, 0, len(sourceSlice))
duplicates = make([]T, 0, len(sourceSlice))
for _, source := range sourceSlice {
if _, found := targetSet[source]; found {

84
util/typed_map.go Normal file
View File

@ -0,0 +1,84 @@
// Package util provide some utility functions
package util
import (
"iter"
"sync"
)
// TypedMap define a type-safe generic wrapper around sync.Map.
// It keeps the concurrency guarantees of sync.Map while removing the
// per-call-site type assertions previously required for every Load.
type TypedMap[K comparable, V any] struct {
m sync.Map
}
// Load define func of return the value stored for key, and whether it was found.
func (t *TypedMap[K, V]) Load(key K) (V, bool) {
value, ok := t.m.Load(key)
if !ok {
var zero V
return zero, false
}
// safe: only values of type V are ever stored through this wrapper
return value.(V), true
}
// Store define func of set the value for key.
func (t *TypedMap[K, V]) Store(key K, value V) {
t.m.Store(key, value)
}
// LoadOrStore define func of return the existing value for key if present.
// Otherwise it stores and returns the given value. loaded reports whether the
// value was already present.
func (t *TypedMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
v, loaded := t.m.LoadOrStore(key, value)
return v.(V), loaded
}
// Swap define func of store value for key and return the previous value (if any).
// loaded reports whether a value was already present for key.
func (t *TypedMap[K, V]) Swap(key K, value V) (previous V, loaded bool) {
prev, loaded := t.m.Swap(key, value)
if !loaded {
var zero V
return zero, false
}
return prev.(V), true
}
// Delete define func of remove the value for key.
func (t *TypedMap[K, V]) Delete(key K) {
t.m.Delete(key)
}
// Range define func of iterate over all key/value pairs.
// Iteration stops early if f returns false.
func (t *TypedMap[K, V]) Range(f func(key K, value V) bool) {
t.m.Range(func(key, value any) bool {
return f(key.(K), value.(V))
})
}
// Len define func of return the number of key/value pairs currently stored.
// It walks the map, so the cost is O(n).
func (t *TypedMap[K, V]) Len() int {
count := 0
t.m.Range(func(_, _ any) bool {
count++
return true
})
return count
}
// All define func of return an iterator over all key/value pairs,
// usable directly with range-over-func. Iteration stops early if the
// consumer breaks out of the loop.
func (t *TypedMap[K, V]) All() iter.Seq2[K, V] {
return func(yield func(K, V) bool) {
t.m.Range(func(key, value any) bool {
return yield(key.(K), value.(V))
})
}
}