Compare commits
10 Commits
develop
...
chore/mode
| Author | SHA1 | Date |
|---|---|---|
|
|
ca68cf6c18 | |
|
|
c82ad773a3 | |
|
|
82622d0d85 | |
|
|
908c713565 | |
|
|
64b6562784 | |
|
|
05c64dda14 | |
|
|
c4e892f1c7 | |
|
|
195150d9b1 | |
|
|
3309e53653 | |
|
|
c6545e29ba |
|
|
@ -32,6 +32,7 @@ go.work
|
|||
# ai config
|
||||
.cursor/
|
||||
.claude/
|
||||
.codewhale/
|
||||
.cursorrules
|
||||
.copilot/
|
||||
.chatgpt/
|
||||
|
|
|
|||
|
|
@ -19,15 +19,15 @@ const (
|
|||
|
||||
// channel name suffix
|
||||
const (
|
||||
ChannelSuffixP = "P"
|
||||
ChannelSuffixQ = "Q"
|
||||
ChannelSuffixS = "S"
|
||||
ChannelSuffixPS = "PS"
|
||||
ChannelSuffixF = "F"
|
||||
ChannelSuffixDeltaF = "deltaF"
|
||||
ChannelSuffixUAB = "UAB"
|
||||
ChannelSuffixUBC = "UBC"
|
||||
ChannelSuffixUCA = "UCA"
|
||||
ChannelSuffixP = "p"
|
||||
ChannelSuffixQ = "q"
|
||||
ChannelSuffixS = "s"
|
||||
ChannelSuffixPF = "pf"
|
||||
ChannelSuffixF = "f"
|
||||
ChannelSuffixDeltaF = "df"
|
||||
ChannelSuffixUAB = "uab"
|
||||
ChannelSuffixUBC = "ubc"
|
||||
ChannelSuffixUCA = "uca"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
|||
269
deploy/deploy.md
269
deploy/deploy.md
|
|
@ -640,6 +640,12 @@ openssl x509 -in eventrt_client_cert.pem -noout -subject
|
|||
|
||||
将服务器端三个证书文件打包为 K8s Secret(在证书文件所在目录执行):
|
||||
|
||||
```bash
|
||||
sh deploy/k8s/rabbitmq-certs-secret.sh
|
||||
```
|
||||
|
||||
该脚本等价于:
|
||||
|
||||
```bash
|
||||
kubectl create secret generic rabbitmq-certs \
|
||||
--from-file=ca_certificate.pem=./ca_certificate.pem \
|
||||
|
|
@ -695,7 +701,11 @@ kubectl apply -f deploy/k8s/pg-service.yaml
|
|||
| **数据库** | `demo` | ConfigMap 中 `POSTGRES_DB` |
|
||||
| **用户名** | `postgres` | ConfigMap 中 `POSTGRES_USER` |
|
||||
| **密码** | `coslight` | ConfigMap `postgres-config` 中配置,生产环境迁移至 Secret |
|
||||
| **存储** | `2Gi` | PVC `postgres-data` |
|
||||
| **存储** | `6Gi` | PVC `postgres-data` |
|
||||
| **CPU** | `100m` 请求 / `500m` 上限 | StatefulSet `resources` 字段 |
|
||||
| **内存** | `256Mi` 请求 / `512Mi` 上限 | StatefulSet `resources` 字段 |
|
||||
|
||||
> **注意:** 密码当前以明文形式存储在 `pg-configmap.yaml` 中,生产环境应将其迁移至 K8s Secret,并通过环境变量注入容器,避免将明文密码提交至版本库。
|
||||
|
||||
##### 4.4.1 等待 Pod 就绪
|
||||
|
||||
|
|
@ -703,7 +713,23 @@ kubectl apply -f deploy/k8s/pg-service.yaml
|
|||
kubectl wait --for=condition=ready pod -l app=postgres --timeout=120s
|
||||
```
|
||||
|
||||
##### 4.4.2 初始化异步任务表
|
||||
##### 4.4.2 连接验证
|
||||
|
||||
```bash
|
||||
# 快速检查 PostgreSQL 是否接受连接
|
||||
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
|
||||
-- pg_isready -U postgres -d demo
|
||||
|
||||
# 进入 psql 执行简单查询确认数据库可用
|
||||
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
|
||||
-- psql -U postgres -d demo -c "SELECT current_database(), version();"
|
||||
|
||||
# 列出所有数据库(确认 demo 库已创建)
|
||||
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
|
||||
-- psql -U postgres -c "\l"
|
||||
```
|
||||
|
||||
##### 4.4.3 初始化异步任务表
|
||||
|
||||
PostgreSQL 就绪后执行 1.4 节的建表 SQL,可通过以下方式进入容器执行:
|
||||
|
||||
|
|
@ -717,14 +743,14 @@ kubectl exec -i $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metada
|
|||
-- psql -U postgres -d demo < /path/to/init.sql
|
||||
```
|
||||
|
||||
##### 4.4.3 状态检查
|
||||
##### 4.4.4 状态检查
|
||||
|
||||
```bash
|
||||
kubectl get pods -l app=postgres
|
||||
kubectl logs -l app=postgres --tail=30
|
||||
```
|
||||
|
||||
##### 4.4.4 清理
|
||||
##### 4.4.5 清理
|
||||
|
||||
```bash
|
||||
kubectl delete -f deploy/k8s/pg-service.yaml \
|
||||
|
|
@ -733,68 +759,73 @@ kubectl delete -f deploy/k8s/pg-service.yaml \
|
|||
-f deploy/k8s/pg-configmap.yaml
|
||||
```
|
||||
|
||||
#### 4.5 部署 MongoDB
|
||||
|
||||
```bash
|
||||
kubectl apply -f deploy/k8s/mongodb-secret.yaml
|
||||
kubectl apply -f deploy/k8s/mongodb-pvc.yaml
|
||||
kubectl apply -f deploy/k8s/mongodb-statefulset.yaml
|
||||
kubectl apply -f deploy/k8s/mongodb-service.yaml
|
||||
```
|
||||
|
||||
| 参数 | 值 | 说明 |
|
||||
| :--- | :--- | :--- |
|
||||
| **镜像** | `mongo:7.0` | MongoDB 7.0 |
|
||||
| **NodePort** | `30017` | 集群外访问端口 |
|
||||
| **用户名** | `admin` | Root 管理员 |
|
||||
| **密码** | `coslight` | Secret `mongodb-secret` 中配置,生产环境请替换强密码 |
|
||||
| **存储** | `2Gi` | PVC `mongodb-data` |
|
||||
|
||||
> **注意:** 密码存储在 `mongodb-secret.yaml` 的 `stringData` 中,生产环境应替换为强密码,并避免将明文密码提交至版本库。
|
||||
|
||||
##### 4.5.1 等待 Pod 就绪
|
||||
|
||||
```bash
|
||||
kubectl wait --for=condition=ready pod -l app=mongodb --timeout=120s
|
||||
```
|
||||
|
||||
##### 4.5.2 连接验证
|
||||
|
||||
```bash
|
||||
kubectl exec -it $(kubectl get pod -l app=mongodb -o jsonpath='{.items[0].metadata.name}') \
|
||||
-- mongosh -u admin -p coslight --authenticationDatabase admin
|
||||
```
|
||||
|
||||
##### 4.5.3 状态检查
|
||||
|
||||
```bash
|
||||
kubectl get pods -l app=mongodb
|
||||
kubectl logs -l app=mongodb --tail=30
|
||||
```
|
||||
|
||||
##### 4.5.4 清理
|
||||
|
||||
```bash
|
||||
kubectl delete -f deploy/k8s/mongodb-service.yaml \
|
||||
-f deploy/k8s/mongodb-statefulset.yaml \
|
||||
-f deploy/k8s/mongodb-pvc.yaml \
|
||||
-f deploy/k8s/mongodb-secret.yaml
|
||||
```
|
||||
|
||||
### 5\. 部署 ModelRT(Kubernetes)
|
||||
|
||||
所有资源部署在 `default` 命名空间,YAML 文件位于 `deploy/k8s/`。
|
||||
|
||||
#### 5.1 构建并推送镜像
|
||||
|
||||
镜像采用三阶段构建,最终基于 `scratch`:
|
||||
|
||||
| 阶段 | 基础镜像 | 作用 |
|
||||
| :--- | :--- | :--- |
|
||||
| **builder** | `golang:1.26-alpine` | 编译 Go 二进制(`CGO_ENABLED=0`,`-trimpath -ldflags="-s -w"`) |
|
||||
| **certs** | `alpine:3.21` | 提取 CA 证书、时区数据及非 root 用户定义(UID 默认 `1000`) |
|
||||
| **runtime** | `scratch` | 仅含可执行文件与运行时依赖,无 shell、无包管理器 |
|
||||
|
||||
**方式一:从源码构建并加载**
|
||||
|
||||
```bash
|
||||
# 在项目根目录执行
|
||||
# 在项目根目录执行(默认运行用户 UID=1000)
|
||||
docker build -f deploy/dockerfile/modelrt.Dockerfile -t coslight/modelrt:latest .
|
||||
|
||||
# 推送到镜像仓库(或直接加载到 Minikube)
|
||||
# 自定义运行用户 UID
|
||||
docker build -f deploy/dockerfile/modelrt.Dockerfile \
|
||||
--build-arg USER_ID=2000 \
|
||||
-t coslight/modelrt:latest .
|
||||
|
||||
# 加载到 Minikube(无需私有仓库)
|
||||
minikube image load coslight/modelrt:latest
|
||||
```
|
||||
|
||||
**方式二:直接加载已有本地镜像**
|
||||
|
||||
Ubuntu 宿主机上已存在构建好的镜像(如 `modelrt:v1`)时,无需重新构建,直接导入 Minikube:
|
||||
|
||||
```bash
|
||||
# 确认本地镜像存在
|
||||
docker images modelrt:v1
|
||||
|
||||
# 加载到 Minikube
|
||||
minikube image load modelrt:v1
|
||||
|
||||
# 验证镜像已进入 Minikube 缓存
|
||||
minikube image ls | grep modelrt
|
||||
```
|
||||
|
||||
> **注意:** `deploy/k8s/modelrt-deployment.yaml` 中的 `image` 字段需与加载的镜像名称一致,并将 `imagePullPolicy` 设为 `Never`,防止 Minikube 尝试从远端拉取。
|
||||
|
||||
#### 5.1.1 镜像冒烟测试
|
||||
|
||||
```bash
|
||||
# 查看镜像大小(scratch 镜像预期 ≤ 25 MB)
|
||||
docker images coslight/modelrt:latest
|
||||
|
||||
# 检查镜像元信息(确认 User、Cmd、架构)
|
||||
docker inspect coslight/modelrt:latest
|
||||
|
||||
# 验证二进制可执行(无 config 时程序报错退出属预期行为,说明镜像构建正常)
|
||||
docker run --rm coslight/modelrt:latest
|
||||
|
||||
# 挂载示例配置做完整启动验证(Ctrl+C 退出)
|
||||
docker run --rm \
|
||||
-v "$(pwd)/configs/config.example.yaml:/app/configs/config.yaml" \
|
||||
-p 8080:8080 \
|
||||
coslight/modelrt:latest
|
||||
```
|
||||
|
||||
> **注意:** `scratch` 镜像不含 shell,无法使用 `docker exec` 进入容器调试;如需排查问题,可临时将最终阶段改为 `alpine` 进行本地调试,确认后再切回 `scratch`。
|
||||
|
||||
#### 5.2 创建客户端证书 Secret
|
||||
|
||||
在 RabbitMQ TLS 证书生成完成后(见 4.2),进入证书文件所在目录执行:
|
||||
|
|
@ -864,7 +895,9 @@ kubectl delete secret modelrt-certs
|
|||
|
||||
### 6\. 部署可观测性栈(Kubernetes)
|
||||
|
||||
在 `Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Promtail + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`。
|
||||
在 `Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Alloy + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`。
|
||||
|
||||
> **日志采集器说明:** 集群内的日志采集由 `Grafana Alloy`(DaemonSet)负责,它通过 Kubernetes API 抓取带 `app` label 的 Pod 容器日志,解析 `zap` 输出的 JSON 字段后推送到 `Loki`。Alloy 已**替代**早期的 `Promtail`,两者推送目标(`loki-service:3100`)与标签解析完全一致,**不要同时部署**,否则会导致 Loki 中日志翻倍。
|
||||
|
||||
#### 6.1 部署 Jaeger
|
||||
|
||||
|
|
@ -882,14 +915,16 @@ kubectl apply -f deploy/k8s/loki-deployment.yaml
|
|||
kubectl apply -f deploy/k8s/loki-service.yaml
|
||||
```
|
||||
|
||||
#### 6.3 部署 Promtail
|
||||
#### 6.3 部署 Alloy
|
||||
|
||||
```bash
|
||||
kubectl apply -f deploy/k8s/promtail-rbac.yaml
|
||||
kubectl apply -f deploy/k8s/promtail-configmap.yaml
|
||||
kubectl apply -f deploy/k8s/promtail-daemonset.yaml
|
||||
kubectl apply -f deploy/k8s/alloy-rbac.yaml
|
||||
kubectl apply -f deploy/k8s/alloy-configmap.yaml
|
||||
kubectl apply -f deploy/k8s/alloy-daemonset.yaml
|
||||
```
|
||||
|
||||
> Alloy 以 DaemonSet 形式在每个节点运行,需要 `ServiceAccount` + `ClusterRole`(`alloy-rbac.yaml`)授予读取 `nodes/pods/pods/log` 的权限。采集与解析规则定义在 `alloy-configmap.yaml` 的 `config.alloy` 中。
|
||||
|
||||
#### 6.4 部署 Grafana
|
||||
|
||||
```bash
|
||||
|
|
@ -907,9 +942,9 @@ kubectl apply -f deploy/k8s/jaeger-deployment.yaml \
|
|||
-f deploy/k8s/loki-pvc.yaml \
|
||||
-f deploy/k8s/loki-deployment.yaml \
|
||||
-f deploy/k8s/loki-service.yaml \
|
||||
-f deploy/k8s/promtail-rbac.yaml \
|
||||
-f deploy/k8s/promtail-configmap.yaml \
|
||||
-f deploy/k8s/promtail-daemonset.yaml \
|
||||
-f deploy/k8s/alloy-rbac.yaml \
|
||||
-f deploy/k8s/alloy-configmap.yaml \
|
||||
-f deploy/k8s/alloy-daemonset.yaml \
|
||||
-f deploy/k8s/grafana-configmap.yaml \
|
||||
-f deploy/k8s/grafana-deployment.yaml \
|
||||
-f deploy/k8s/grafana-service.yaml
|
||||
|
|
@ -955,7 +990,6 @@ Mac 本地端口 ──SSH隧道──▶ Ubuntu 宿主机 (192.168.1.101)
|
|||
|
||||
```bash
|
||||
ssh -L 5432:192.168.49.2:30432 \
|
||||
-L 27017:192.168.49.2:30017 \
|
||||
-L 5671:192.168.49.2:30671 \
|
||||
-L 15671:192.168.49.2:31671 \
|
||||
-L 6379:192.168.49.2:30001 \
|
||||
|
|
@ -971,7 +1005,6 @@ ssh -L 5432:192.168.49.2:30432 \
|
|||
```bash
|
||||
ssh -fN \
|
||||
-L 5432:192.168.49.2:30432 \
|
||||
-L 27017:192.168.49.2:30017 \
|
||||
-L 5671:192.168.49.2:30671 \
|
||||
-L 15671:192.168.49.2:31671 \
|
||||
-L 6379:192.168.49.2:30001 \
|
||||
|
|
@ -987,7 +1020,6 @@ ssh -fN \
|
|||
| Mac 本地端口 | Minikube NodePort | 服务 | 说明 |
|
||||
| :--- | :--- | :--- | :--- |
|
||||
| `5432` | `30432` | PostgreSQL | 数据库连接 `localhost:5432` |
|
||||
| `27017` | `30017` | MongoDB | 数据库连接 `localhost:27017` |
|
||||
| `5671` | `30671` | RabbitMQ AMQP | ModelRT / EventRT 消息队列连接 |
|
||||
| `15671` | `31671` | RabbitMQ Management | RabbitMQ 管理界面 `http://localhost:15671` |
|
||||
| `6379` | `30001` | Redis | 分布式锁 / 数据存储 |
|
||||
|
|
@ -1011,14 +1043,111 @@ kill <PID>
|
|||
|
||||
### 8\. 后续操作(停止与清理)
|
||||
|
||||
#### 8.1 停止容器
|
||||
#### 8.1 本地 Docker 部署清理
|
||||
|
||||
适用于第 1、2 节使用 `docker run` 启动的 PostgreSQL 和 Redis 容器。
|
||||
|
||||
```bash
|
||||
# 停止容器
|
||||
docker stop postgres redis
|
||||
```
|
||||
|
||||
#### 8.2 删除容器(删除后数据将丢失)
|
||||
|
||||
```bash
|
||||
# 删除容器(容器内数据将同步丢失)
|
||||
docker rm postgres redis
|
||||
```
|
||||
|
||||
#### 8.2 本地运行清理
|
||||
|
||||
适用于第 3 节以 `go run` 或编译后二进制方式在本地启动的 ModelRT 服务。
|
||||
|
||||
前台运行时直接 `Ctrl+C` 终止;后台运行时查找并终止进程:
|
||||
|
||||
```bash
|
||||
# 终止 go run 启动的进程
|
||||
pkill -f "go run main.go"
|
||||
|
||||
# 或终止编译后的二进制进程
|
||||
pkill model-rt
|
||||
```
|
||||
|
||||
#### 8.3 K8s(Minikube) 部署清理
|
||||
|
||||
适用于第 4、5、6 节在 Minikube 中部署的所有资源。
|
||||
|
||||
##### 8.3.1 分服务清理
|
||||
|
||||
**仅停止(缩容至 0,PVC 数据保留)**
|
||||
|
||||
将所有 Deployment 和 StatefulSet 缩容至 0 副本,Pod 停止运行但持久卷数据不删除,之后可直接缩容回 1 恢复服务。
|
||||
|
||||
```bash
|
||||
# 停止所有 Deployment(Redis / RabbitMQ / ModelRT / Jaeger / Loki / Grafana)
|
||||
kubectl scale deployment --all --replicas=0
|
||||
|
||||
# 停止所有 StatefulSet(PostgreSQL,PVC 数据保留)
|
||||
kubectl scale statefulset --all --replicas=0
|
||||
```
|
||||
|
||||
恢复时:
|
||||
|
||||
```bash
|
||||
kubectl scale deployment --all --replicas=1
|
||||
kubectl scale statefulset --all --replicas=1
|
||||
```
|
||||
|
||||
> **注意:** DaemonSet(Alloy)无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/alloy-daemonset.yaml`。
|
||||
|
||||
---
|
||||
|
||||
**永久清理(删除所有资源,包含 PVC,数据不可恢复)**
|
||||
|
||||
按部署顺序反向删除各服务资源:
|
||||
|
||||
```bash
|
||||
# 可观测性栈(Grafana / Alloy / Loki / Jaeger)
|
||||
kubectl delete -f deploy/k8s/grafana-service.yaml \
|
||||
-f deploy/k8s/grafana-deployment.yaml \
|
||||
-f deploy/k8s/grafana-configmap.yaml \
|
||||
-f deploy/k8s/alloy-daemonset.yaml \
|
||||
-f deploy/k8s/alloy-configmap.yaml \
|
||||
-f deploy/k8s/alloy-rbac.yaml \
|
||||
-f deploy/k8s/loki-service.yaml \
|
||||
-f deploy/k8s/loki-deployment.yaml \
|
||||
-f deploy/k8s/loki-pvc.yaml \
|
||||
-f deploy/k8s/loki-configmap.yaml \
|
||||
-f deploy/k8s/jaeger-service.yaml \
|
||||
-f deploy/k8s/jaeger-deployment.yaml
|
||||
|
||||
# ModelRT 应用
|
||||
kubectl delete -f deploy/k8s/modelrt-service.yaml \
|
||||
-f deploy/k8s/modelrt-deployment.yaml \
|
||||
-f deploy/k8s/modelrt-configmap.yaml \
|
||||
-f deploy/k8s/modelrt-secret.yaml
|
||||
kubectl delete secret modelrt-certs
|
||||
|
||||
# PostgreSQL
|
||||
kubectl delete -f deploy/k8s/pg-service.yaml \
|
||||
-f deploy/k8s/pg-statefulset.yaml \
|
||||
-f deploy/k8s/pg-pvc.yaml \
|
||||
-f deploy/k8s/pg-configmap.yaml
|
||||
|
||||
# RabbitMQ
|
||||
kubectl delete -f deploy/k8s/rabbitmq-service.yaml \
|
||||
-f deploy/k8s/rabbitmq-deployment.yaml \
|
||||
-f deploy/k8s/rabbitmq-users-config.yaml \
|
||||
-f deploy/k8s/rabbitmq-config.yaml \
|
||||
-f deploy/k8s/rabbitmq-secret.yaml
|
||||
kubectl delete secret rabbitmq-certs
|
||||
|
||||
# Redis
|
||||
kubectl delete -f deploy/k8s/redis-service.yaml \
|
||||
-f deploy/k8s/redis-deployment.yaml
|
||||
```
|
||||
|
||||
##### 8.3.2 一键清理
|
||||
|
||||
> **注意:** 此操作会删除 `deploy/k8s/` 下所有 YAML 对应的 K8s 资源,包括 PVC,**持久化数据将永久丢失**,请确认后执行。
|
||||
|
||||
```bash
|
||||
kubectl delete -f deploy/k8s/
|
||||
kubectl delete secret rabbitmq-certs modelrt-certs
|
||||
```
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
FROM golang:1.25-alpine AS builder
|
||||
FROM golang:1.26-alpine AS builder
|
||||
RUN apk --no-cache upgrade
|
||||
|
||||
WORKDIR /app
|
||||
|
|
@ -11,8 +11,8 @@ RUN CGO_ENABLED=0 GOOS=linux go build \
|
|||
-mod=readonly \
|
||||
-o modelrt main.go
|
||||
|
||||
# Prepare runtime dependencies in a pinned Alpine stage so they can be
|
||||
# copied into scratch without pulling any vulnerable OS packages at run time.
|
||||
# prepare runtime dependencies in a pinned alpine stage so they can be
|
||||
# copied into scratch without pulling any vulnerable os packages at run time.
|
||||
FROM alpine:3.21 AS certs
|
||||
ARG USER_ID=1000
|
||||
RUN apk --no-cache add ca-certificates tzdata && \
|
||||
|
|
@ -21,15 +21,14 @@ RUN apk --no-cache add ca-certificates tzdata && \
|
|||
FROM scratch
|
||||
# CA certificates required for TLS connections (RabbitMQ amqps://)
|
||||
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
|
||||
# Timezone data
|
||||
# timezone data
|
||||
COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo
|
||||
# Non-root user/group definitions
|
||||
# non-root user/group definitions
|
||||
COPY --from=certs /etc/passwd /etc/passwd
|
||||
COPY --from=certs /etc/group /etc/group
|
||||
|
||||
WORKDIR /app
|
||||
COPY --from=builder /app/modelrt ./modelrt
|
||||
COPY configs/config.example.yaml ./configs/config.example.yaml
|
||||
|
||||
USER modelrt
|
||||
CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"]
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ data:
|
|||
- name: Loki
|
||||
type: loki
|
||||
access: proxy
|
||||
url: http://loki:3100
|
||||
url: http://loki-service:3100
|
||||
isDefault: true
|
||||
jsonData:
|
||||
# derivedFields: 从日志的 traceID 字段生成跳转链接到 Jaeger
|
||||
|
|
@ -23,4 +23,4 @@ data:
|
|||
type: jaeger
|
||||
uid: jaeger
|
||||
access: proxy
|
||||
url: http://jaeger:16686
|
||||
url: http://jaeger-service:16686
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ spec:
|
|||
containers:
|
||||
- name: grafana
|
||||
image: grafana/grafana:10.4.2
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- containerPort: 3000
|
||||
env:
|
||||
|
|
|
|||
|
|
@ -1,14 +1,14 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: grafana
|
||||
name: grafana-service
|
||||
namespace: default
|
||||
spec:
|
||||
ports:
|
||||
- name: http
|
||||
port: 3000
|
||||
targetPort: 3000
|
||||
nodePort: 31000 # Grafana UI: http://<NodeIP>:31000
|
||||
nodePort: 31000 # Grafana UI: http://<NodeIP>:31000
|
||||
selector:
|
||||
app: grafana
|
||||
type: NodePort
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ spec:
|
|||
containers:
|
||||
- name: jaeger
|
||||
image: jaegertracing/all-in-one:1.56
|
||||
imagePullPolicy: IfNotPresent
|
||||
env:
|
||||
- name: COLLECTOR_OTLP_ENABLED
|
||||
value: "true"
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: jaeger
|
||||
name: jaeger-service
|
||||
labels:
|
||||
app: jaeger
|
||||
spec:
|
||||
|
|
@ -9,19 +9,19 @@ spec:
|
|||
- name: ui
|
||||
port: 16686
|
||||
targetPort: 16686
|
||||
nodePort: 31686 # Jaeger UI,浏览器访问 http://<NodeIP>:31686
|
||||
nodePort: 31686 # Jaeger UI,浏览器访问 http://<NodeIP>:31686
|
||||
- name: collector-http
|
||||
port: 14268
|
||||
targetPort: 14268
|
||||
nodePort: 31268 # Jaeger 原生 HTTP collector(非 OTel)
|
||||
nodePort: 31268 # Jaeger 原生 HTTP collector(非 OTel)
|
||||
- name: otlp-http
|
||||
port: 4318
|
||||
targetPort: 4318
|
||||
nodePort: 31318 # OTLP HTTP,集群外使用 <NodeIP>:31318
|
||||
nodePort: 31318 # OTLP HTTP,集群外使用 <NodeIP>:31318
|
||||
- name: otlp-grpc
|
||||
port: 4317
|
||||
targetPort: 4317
|
||||
nodePort: 31317 # OTLP gRPC,集群外使用 <NodeIP>:31317
|
||||
nodePort: 31317 # OTLP gRPC,集群外使用 <NodeIP>:31317
|
||||
selector:
|
||||
app: jaeger
|
||||
type: NodePort
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ spec:
|
|||
containers:
|
||||
- name: loki
|
||||
image: grafana/loki:2.9.4
|
||||
imagePullPolicy: IfNotPresent
|
||||
args:
|
||||
- -config.file=/etc/loki/loki.yaml
|
||||
ports:
|
||||
|
|
|
|||
|
|
@ -1,14 +1,14 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: loki
|
||||
name: loki-service
|
||||
namespace: default
|
||||
spec:
|
||||
ports:
|
||||
- name: http
|
||||
port: 3100
|
||||
targetPort: 3100
|
||||
nodePort: 31100 # 集群外访问: http://<NodeIP>:31100
|
||||
nodePort: 31100 # 集群外访问: http://<NodeIP>:31100
|
||||
selector:
|
||||
app: loki
|
||||
type: NodePort
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ metadata:
|
|||
data:
|
||||
config.yaml: |
|
||||
postgres:
|
||||
host: "192.168.1.101"
|
||||
host: "postgres-service"
|
||||
port: 5432
|
||||
database: "demo"
|
||||
user: "postgres"
|
||||
|
|
@ -35,7 +35,7 @@ data:
|
|||
endpoint: "" # Promtail handles log collection in K8s, direct push disabled
|
||||
|
||||
otel:
|
||||
endpoint: "jaeger:4318"
|
||||
endpoint: "jaeger-service:4318"
|
||||
insecure: true
|
||||
|
||||
ants:
|
||||
|
|
@ -77,7 +77,7 @@ data:
|
|||
service_addr: ":8080"
|
||||
service_name: "modelRT"
|
||||
secret_key: "" # injected via env SERVICE_SECRET_KEY
|
||||
deploy_env: "production"
|
||||
deploy_env: "development"
|
||||
|
||||
dataRT:
|
||||
host: "http://127.0.0.1"
|
||||
|
|
|
|||
|
|
@ -16,8 +16,9 @@ spec:
|
|||
spec:
|
||||
containers:
|
||||
- name: modelrt
|
||||
image: coslight/modelrt:latest
|
||||
image: modelrt:v1
|
||||
imagePullPolicy: IfNotPresent
|
||||
command: ["/app/modelrt"]
|
||||
args:
|
||||
- "-modelRT_config_dir=/app/configs"
|
||||
- "-modelRT_config_name=config"
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: mongodb
|
||||
name: mongodb-service
|
||||
labels:
|
||||
app: mongodb
|
||||
spec:
|
||||
|
|
|
|||
|
|
@ -34,9 +34,9 @@ spec:
|
|||
- mongosh
|
||||
- --eval
|
||||
- "db.adminCommand('ping')"
|
||||
initialDelaySeconds: 10
|
||||
periodSeconds: 5
|
||||
timeoutSeconds: 3
|
||||
initialDelaySeconds: 30
|
||||
periodSeconds: 10
|
||||
timeoutSeconds: 10
|
||||
failureThreshold: 12
|
||||
livenessProbe:
|
||||
exec:
|
||||
|
|
@ -44,10 +44,10 @@ spec:
|
|||
- mongosh
|
||||
- --eval
|
||||
- "db.adminCommand('ping')"
|
||||
initialDelaySeconds: 30
|
||||
periodSeconds: 20
|
||||
timeoutSeconds: 3
|
||||
failureThreshold: 3
|
||||
initialDelaySeconds: 120
|
||||
periodSeconds: 10
|
||||
timeoutSeconds: 30
|
||||
failureThreshold: 5
|
||||
resources:
|
||||
requests:
|
||||
cpu: 100m
|
||||
|
|
|
|||
|
|
@ -7,4 +7,4 @@ spec:
|
|||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: 2Gi
|
||||
storage: 6Gi
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: postgres
|
||||
name: postgres-service
|
||||
labels:
|
||||
app: postgres
|
||||
spec:
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ data:
|
|||
filename: /tmp/positions.yaml
|
||||
|
||||
clients:
|
||||
- url: http://loki:3100/loki/api/v1/push
|
||||
- url: http://loki-service:3100/loki/api/v1/push
|
||||
|
||||
scrape_configs:
|
||||
- job_name: kubernetes-pods
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ spec:
|
|||
containers:
|
||||
- name: promtail
|
||||
image: grafana/promtail:2.9.4
|
||||
imagePullPolicy: IfNotPresent
|
||||
args:
|
||||
- -config.file=/etc/promtail/promtail.yaml
|
||||
ports:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,14 @@
|
|||
#!/bin/sh
|
||||
# Create the rabbitmq server certificate secret.
|
||||
# Run this script from the directory that contains the three cert files,
|
||||
# or adjust the paths below to point at the actual files.
|
||||
#
|
||||
# Expected files (generated during RabbitMQ TLS setup):
|
||||
# ca_certificate.pem
|
||||
# server_certificate.pem
|
||||
# server_key.pem
|
||||
|
||||
kubectl create secret generic rabbitmq-certs \
|
||||
--from-file=ca_certificate.pem=./ca_certificate.pem \
|
||||
--from-file=server_certificate.pem=./server_certificate.pem \
|
||||
--from-file=server_key.pem=./server_key.pem
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: eventrt-rabbitmq
|
||||
name: rabbitmq
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
|
|
@ -15,6 +15,7 @@ spec:
|
|||
containers:
|
||||
- name: rabbitmq
|
||||
image: rabbitmq:4.1.1-management-alpine
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- containerPort: 4369
|
||||
- containerPort: 5671
|
||||
|
|
|
|||
|
|
@ -0,0 +1,7 @@
|
|||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: rabbit-plugins-conf
|
||||
data:
|
||||
enabled_plugins: |
|
||||
[rabbitmq_auth_mechanism_ssl, rabbitmq_management, rabbitmq_management_agent, rabbitmq_prometheus, rabbitmq_web_dispatch].
|
||||
|
|
@ -15,6 +15,7 @@ spec:
|
|||
containers:
|
||||
- name: redis
|
||||
image: redis/redis-stack-server:latest
|
||||
imagePullPolicy: IfNotPresent
|
||||
resources:
|
||||
limits:
|
||||
memory: "128Mi"
|
||||
|
|
|
|||
|
|
@ -2,24 +2,20 @@
|
|||
package diagram
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"modelRT/util"
|
||||
)
|
||||
|
||||
// anchorValueOverview define struct of storage all anchor value
|
||||
var anchorValueOverview sync.Map
|
||||
// anchorValueOverview define struct of storage all anchor value keyed by component uuid
|
||||
var anchorValueOverview util.TypedMap[string, string]
|
||||
|
||||
// GetAnchorValue define func of get circuit diagram data by componentID
|
||||
func GetAnchorValue(componentUUID string) (string, error) {
|
||||
value, ok := diagramsOverview.Load(componentUUID)
|
||||
anchorValue, ok := anchorValueOverview.Load(componentUUID)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("can not find anchor value by componentUUID:%s", componentUUID)
|
||||
}
|
||||
anchorValue, ok := value.(string)
|
||||
if !ok {
|
||||
return "", errors.New("convert to string failed")
|
||||
}
|
||||
return anchorValue, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,32 +2,27 @@
|
|||
package diagram
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"modelRT/orm"
|
||||
"modelRT/util"
|
||||
)
|
||||
|
||||
// diagramsOverview define struct of storage all circuit diagram data
|
||||
var diagramsOverview sync.Map
|
||||
// diagramsOverview define struct of storage all circuit diagram data keyed by component uuid
|
||||
var diagramsOverview util.TypedMap[string, *orm.Component]
|
||||
|
||||
// GetComponentMap define func of get circuit diagram data by component uuid
|
||||
func GetComponentMap(componentUUID string) (*orm.Component, error) {
|
||||
value, ok := diagramsOverview.Load(componentUUID)
|
||||
componentInfo, ok := diagramsOverview.Load(componentUUID)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("can not find graph by global uuid:%s", componentUUID)
|
||||
}
|
||||
componentInfo, ok := value.(*orm.Component)
|
||||
if !ok {
|
||||
return nil, errors.New("convert to component map struct failed")
|
||||
}
|
||||
return componentInfo, nil
|
||||
}
|
||||
|
||||
// UpdateComponentMap define func of update circuit diagram data by component uuid and component info
|
||||
func UpdateComponentMap(componentID int64, componentInfo *orm.Component) bool {
|
||||
_, result := diagramsOverview.Swap(componentID, componentInfo)
|
||||
func UpdateComponentMap(componentUUID string, componentInfo *orm.Component) bool {
|
||||
_, result := diagramsOverview.Swap(componentUUID, componentInfo)
|
||||
return result
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -65,9 +65,7 @@ func (g *Graph) AddEdge(from, to uuid.UUID) {
|
|||
|
||||
// 创建新的拓扑信息时,如果被链接的点已经存在于游离节点中
|
||||
// 则将其移除
|
||||
if _, exist := g.FreeVertexs[toKey]; exist {
|
||||
delete(g.FreeVertexs, toKey)
|
||||
}
|
||||
delete(g.FreeVertexs, toKey)
|
||||
}
|
||||
|
||||
// DelNode delete a node to the graph
|
||||
|
|
|
|||
|
|
@ -3,8 +3,6 @@ package diagram
|
|||
|
||||
import (
|
||||
"context"
|
||||
"iter"
|
||||
"maps"
|
||||
|
||||
locker "modelRT/distributedlock"
|
||||
"modelRT/logger"
|
||||
|
|
@ -70,55 +68,3 @@ func (rs *RedisZSet) ZRANGE(setKey string, start, stop int64) ([]string, error)
|
|||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
type Comparer[T any] interface {
|
||||
Compare(T) int
|
||||
}
|
||||
|
||||
type ComparableComparer[T any] interface {
|
||||
Compare(T) int
|
||||
comparable // 直接嵌入 comparable 约束
|
||||
}
|
||||
|
||||
type methodNode[E Comparer[E]] struct {
|
||||
value E
|
||||
left *methodNode[E]
|
||||
right *methodNode[E]
|
||||
}
|
||||
|
||||
type MethodTree[E Comparer[E]] struct {
|
||||
root *methodNode[E]
|
||||
}
|
||||
|
||||
type OrderedSet[E interface {
|
||||
comparable
|
||||
Comparer[E]
|
||||
}] struct {
|
||||
tree MethodTree[E]
|
||||
elements map[E]bool
|
||||
}
|
||||
|
||||
type ComparableOrderedSet[E ComparableComparer[E]] struct {
|
||||
tree MethodTree[E]
|
||||
elements map[E]bool
|
||||
}
|
||||
|
||||
type Set[E any] interface {
|
||||
Insert(E)
|
||||
Delete(E)
|
||||
Has(E) bool
|
||||
All() iter.Seq[E]
|
||||
}
|
||||
|
||||
func InsertAll[E any](set Set[E], seq iter.Seq[E]) {
|
||||
for v := range seq {
|
||||
set.Insert(v)
|
||||
}
|
||||
}
|
||||
|
||||
type HashSet[E comparable] map[E]bool
|
||||
|
||||
func (s HashSet[E]) Insert(v E) { s[v] = true }
|
||||
func (s HashSet[E]) Delete(v E) { delete(s, v) }
|
||||
func (s HashSet[E]) Has(v E) bool { return s[v] }
|
||||
func (s HashSet[E]) All() iter.Seq[E] { return maps.Keys(s) }
|
||||
|
|
|
|||
|
|
@ -2,32 +2,27 @@
|
|||
package diagram
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"modelRT/util"
|
||||
)
|
||||
|
||||
// graphOverview define struct of storage all circuit diagram topologic data
|
||||
var graphOverview sync.Map
|
||||
// graphOverview define struct of storage all circuit diagram topologic data keyed by pageID
|
||||
var graphOverview util.TypedMap[int64, *Graph]
|
||||
|
||||
// PrintGrapMap define func of print circuit diagram topologic info data
|
||||
func PrintGrapMap() {
|
||||
graphOverview.Range(func(key, value any) bool {
|
||||
fmt.Println(key, value)
|
||||
return true
|
||||
})
|
||||
for pageID, graph := range graphOverview.All() {
|
||||
fmt.Println(pageID, graph)
|
||||
}
|
||||
}
|
||||
|
||||
// GetGraphMap define func of get circuit diagram topologic data by pageID
|
||||
func GetGraphMap(pageID int64) (*Graph, error) {
|
||||
value, ok := graphOverview.Load(pageID)
|
||||
graph, ok := graphOverview.Load(pageID)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("can not find graph by pageID:%d", pageID)
|
||||
}
|
||||
graph, ok := value.(*Graph)
|
||||
if !ok {
|
||||
return nil, errors.New("convert to graph struct failed")
|
||||
}
|
||||
return graph, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
|
|||
c.JSON(http.StatusOK, resp)
|
||||
return
|
||||
}
|
||||
diagram.UpdateComponentMap(info.ID, component)
|
||||
diagram.UpdateComponentMap(info.UUID, component)
|
||||
}
|
||||
|
||||
if len(request.FreeVertexs) > 0 {
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ package logger
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
|
|
@ -29,17 +30,17 @@ func (l *GormLogger) LogMode(_ gormLogger.LogLevel) gormLogger.Interface {
|
|||
|
||||
// Info define func for implementing gormLogger.Interface
|
||||
func (l *GormLogger) Info(ctx context.Context, msg string, data ...any) {
|
||||
Info(ctx, msg, "data", data)
|
||||
Info(ctx, fmt.Sprintf(msg, data...))
|
||||
}
|
||||
|
||||
// Warn define func for implementing gormLogger.Interface
|
||||
func (l *GormLogger) Warn(ctx context.Context, msg string, data ...any) {
|
||||
Warn(ctx, msg, "data", data)
|
||||
Warn(ctx, fmt.Sprintf(msg, data...))
|
||||
}
|
||||
|
||||
// Error define func for implementing gormLogger.Interface
|
||||
func (l *GormLogger) Error(ctx context.Context, msg string, data ...any) {
|
||||
Error(ctx, msg, "data", data)
|
||||
Error(ctx, fmt.Sprintf(msg, data...))
|
||||
}
|
||||
|
||||
// Trace define func for implementing gormLogger.Interface
|
||||
|
|
|
|||
|
|
@ -47,8 +47,7 @@ func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer {
|
|||
client: &http.Client{Timeout: 5 * time.Second},
|
||||
ch: make(chan string, 512),
|
||||
}
|
||||
ls.wg.Add(1)
|
||||
go ls.run()
|
||||
ls.wg.Go(ls.run)
|
||||
return ls
|
||||
}
|
||||
|
||||
|
|
@ -70,7 +69,6 @@ func (ls *lokiSyncer) Sync() error {
|
|||
}
|
||||
|
||||
func (ls *lokiSyncer) run() {
|
||||
defer ls.wg.Done()
|
||||
ticker := time.NewTicker(2 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
|
|
|
|||
8
main.go
8
main.go
|
|
@ -179,8 +179,8 @@ func main() {
|
|||
// init async task worker
|
||||
taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Failed to initialize task worker", "error", err)
|
||||
// Continue without task worker, but log warning
|
||||
logger.Error(ctx, "failed to initialize task worker", "error", err)
|
||||
// continue without task worker, but log warning
|
||||
} else {
|
||||
go taskWorker.Start()
|
||||
defer taskWorker.Stop()
|
||||
|
|
@ -258,7 +258,7 @@ func main() {
|
|||
gin.SetMode(gin.ReleaseMode)
|
||||
}
|
||||
engine := gin.New()
|
||||
// 添加CORS中间件
|
||||
// add CORS middleware
|
||||
engine.Use(cors.New(cors.Config{
|
||||
AllowOrigins: []string{"*"}, // 或指定具体域名
|
||||
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
|
||||
|
|
@ -278,7 +278,7 @@ func main() {
|
|||
Handler: engine,
|
||||
}
|
||||
|
||||
// creating a System Signal Receiver
|
||||
// creating a system signal receiver
|
||||
done := make(chan os.Signal, 10)
|
||||
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import (
|
|||
"modelRT/diagram"
|
||||
"modelRT/logger"
|
||||
"modelRT/orm"
|
||||
"modelRT/util"
|
||||
|
||||
"github.com/RediSearch/redisearch-go/v2/redisearch"
|
||||
)
|
||||
|
|
@ -63,10 +64,9 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
|
|||
}
|
||||
|
||||
safeSAdd(constants.RedisAllGridSetKey, measSet.AllGridTags)
|
||||
gridSug := make([]redisearch.Suggestion, 0, len(measSet.AllGridTags))
|
||||
for _, gridTag := range measSet.AllGridTags {
|
||||
gridSug = append(gridSug, redisearch.Suggestion{Term: gridTag, Score: constants.DefaultScore})
|
||||
}
|
||||
gridSug := util.MapSlice(measSet.AllGridTags, func(gridTag string) redisearch.Suggestion {
|
||||
return redisearch.Suggestion{Term: gridTag, Score: constants.DefaultScore}
|
||||
})
|
||||
ac.AddTerms(gridSug...)
|
||||
|
||||
safeSAdd(constants.RedisAllZoneSetKey, measSet.AllZoneTags)
|
||||
|
|
@ -78,19 +78,16 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
|
|||
|
||||
// building the grid -> zones hierarchy
|
||||
for gridTag, zoneTags := range measSet.GridToZoneTags {
|
||||
sug := make([]redisearch.Suggestion, 0, len(zoneTags))
|
||||
for _, zoneTag := range zoneTags {
|
||||
term := fmt.Sprintf("%s.%s", gridTag, zoneTag)
|
||||
// add redis fuzzy search suggestion for token1-token7 type
|
||||
sug = append(sug, redisearch.Suggestion{Term: term, Score: constants.DefaultScore})
|
||||
}
|
||||
// add redis fuzzy search suggestion for token1-token7 type
|
||||
sug := util.MapSlice(zoneTags, func(zoneTag string) redisearch.Suggestion {
|
||||
return redisearch.Suggestion{Term: fmt.Sprintf("%s.%s", gridTag, zoneTag), Score: constants.DefaultScore}
|
||||
})
|
||||
safeSAdd(fmt.Sprintf(constants.RedisSpecGridZoneSetKey, gridTag), zoneTags)
|
||||
ac.AddTerms(sug...)
|
||||
}
|
||||
|
||||
// building the zone -> stations hierarchy
|
||||
for zoneTag, stationTags := range measSet.ZoneToStationTags {
|
||||
sug := make([]redisearch.Suggestion, 0, len(stationTags))
|
||||
gridTag, exists := zoneToGridPath[zoneTag]
|
||||
if !exists {
|
||||
err := fmt.Errorf("zone tag to grid tag mapping not found for zoneTag: %s", zoneTag)
|
||||
|
|
@ -98,11 +95,10 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
for _, stationTag := range stationTags {
|
||||
// add redis fuzzy search suggestion for token1-token7 type
|
||||
term := fmt.Sprintf("%s.%s.%s", gridTag, zoneTag, stationTag)
|
||||
sug = append(sug, redisearch.Suggestion{Term: term, Score: constants.DefaultScore})
|
||||
}
|
||||
// add redis fuzzy search suggestion for token1-token7 type
|
||||
sug := util.MapSlice(stationTags, func(stationTag string) redisearch.Suggestion {
|
||||
return redisearch.Suggestion{Term: fmt.Sprintf("%s.%s.%s", gridTag, zoneTag, stationTag), Score: constants.DefaultScore}
|
||||
})
|
||||
|
||||
safeSAdd(fmt.Sprintf(constants.RedisSpecZoneStationSetKey, zoneTag), stationTags)
|
||||
ac.AddTerms(sug...)
|
||||
|
|
|
|||
|
|
@ -59,36 +59,36 @@ func NewPower104DataSource(station string, packet, offset int) (*MeasurementData
|
|||
}
|
||||
|
||||
func generateChannelName(prefix string, number int, suffix string) (string, error) {
|
||||
// shortPrefix is the literal prefix written into the channel name (tm/ts/tc),
|
||||
// maxNumber is the inclusive upper bound, padWidth is the zero-padded digit width.
|
||||
var shortPrefix string
|
||||
var maxNumber, padWidth int
|
||||
switch prefix {
|
||||
case constants.ChannelPrefixTelemetry:
|
||||
if number > 10 {
|
||||
return "", common.ErrExceedsLimitType
|
||||
}
|
||||
var builder strings.Builder
|
||||
numberStr := strconv.Itoa(number)
|
||||
builder.Grow(len(prefix) + len(numberStr) + len(suffix))
|
||||
builder.WriteString(prefix)
|
||||
builder.WriteString(numberStr)
|
||||
builder.WriteString(suffix)
|
||||
channelName := builder.String()
|
||||
return channelName, nil
|
||||
shortPrefix, maxNumber, padWidth = "tm", 8, 1
|
||||
case constants.ChannelPrefixTelesignal:
|
||||
var numberStr string
|
||||
if number < 10 {
|
||||
numberStr = "0" + strconv.Itoa(number)
|
||||
}
|
||||
numberStr = strconv.Itoa(number)
|
||||
|
||||
var builder strings.Builder
|
||||
builder.Grow(len(prefix) + len(numberStr) + len(suffix))
|
||||
builder.WriteString(prefix)
|
||||
builder.WriteString(numberStr)
|
||||
builder.WriteString(suffix)
|
||||
channelName := builder.String()
|
||||
return channelName, nil
|
||||
shortPrefix, maxNumber, padWidth = "ts", 16, 2
|
||||
case constants.ChannelPrefixTelecommand:
|
||||
shortPrefix, maxNumber, padWidth = "tc", 9, 1
|
||||
default:
|
||||
return "", common.ErrUnsupportedChannelPrefixType
|
||||
}
|
||||
|
||||
if number < 1 || number > maxNumber {
|
||||
return "", common.ErrExceedsLimitType
|
||||
}
|
||||
|
||||
numberStr := strconv.Itoa(number)
|
||||
if len(numberStr) < padWidth {
|
||||
numberStr = strings.Repeat("0", padWidth-len(numberStr)) + numberStr
|
||||
}
|
||||
|
||||
var builder strings.Builder
|
||||
builder.Grow(len(shortPrefix) + len(numberStr) + len(suffix))
|
||||
builder.WriteString(shortPrefix)
|
||||
builder.WriteString(numberStr)
|
||||
builder.WriteString(suffix)
|
||||
return builder.String(), nil
|
||||
}
|
||||
|
||||
// NewTelemetryChannel define func of generate telemetry channel CL3611 data source
|
||||
|
|
@ -109,6 +109,15 @@ func NewTelesignalChannel(station, device, channelNameSuffix string, channelNumb
|
|||
return NewCL3611DataSource(station, device, channelName)
|
||||
}
|
||||
|
||||
// NewTelecommandChannel define func of generate telecommand channel CL3611 data source
|
||||
func NewTelecommandChannel(station, device, channelNameSuffix string, channelNumber int) (*MeasurementDataSource, error) {
|
||||
channelName, err := generateChannelName(constants.ChannelPrefixTelecommand, channelNumber, channelNameSuffix)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate channel name: %w", err)
|
||||
}
|
||||
return NewCL3611DataSource(station, device, channelName)
|
||||
}
|
||||
|
||||
// NewStandardChannel define func of generate standard channel CL3611 data source
|
||||
func NewStandardChannel(station, device, channelType string) (*MeasurementDataSource, error) {
|
||||
return NewCL3611DataSource(station, device, channelType)
|
||||
|
|
@ -264,9 +273,9 @@ func GenerateMeasureIdentifier(source map[string]any) (string, error) {
|
|||
func concatP104WithPlus(station string, packet int, offset int) string {
|
||||
packetStr := strconv.Itoa(packet)
|
||||
offsetStr := strconv.Itoa(offset)
|
||||
return station + ":" + packetStr + ":" + offsetStr
|
||||
return strings.ToLower(station + ":104:" + packetStr + ":" + offsetStr)
|
||||
}
|
||||
|
||||
func concatCL361WithPlus(station, device, channel string) string {
|
||||
return station + ":" + device + ":" + "phasor" + ":" + channel
|
||||
return strings.ToLower(station + ":" + device + ":" + "phasor" + ":" + channel)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
package realtimedata
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"modelRT/util"
|
||||
)
|
||||
|
||||
// ComputeConfig define struct of measurement computation
|
||||
|
|
@ -19,54 +19,15 @@ type ComputeConfig struct {
|
|||
Analyzer RealTimeAnalyzer
|
||||
}
|
||||
|
||||
// MeasComputeState define struct of manages the state of measurement computations using sync.Map
|
||||
// MeasComputeState define struct of manages the state of measurement
|
||||
// computations. It embeds util.TypedMap to inherit the concurrency-safe,
|
||||
// type-safe Store/Load/Delete/LoadOrStore/Range/All/Len operations without
|
||||
// per-call-site type assertions.
|
||||
type MeasComputeState struct {
|
||||
measMap sync.Map
|
||||
util.TypedMap[string, *ComputeConfig]
|
||||
}
|
||||
|
||||
// NewMeasComputeState define func to create and returns a new instance of MeasComputeState
|
||||
func NewMeasComputeState() *MeasComputeState {
|
||||
return &MeasComputeState{}
|
||||
}
|
||||
|
||||
// Store define func to store a compute configuration for the specified key
|
||||
func (m *MeasComputeState) Store(key string, config *ComputeConfig) {
|
||||
m.measMap.Store(key, config)
|
||||
}
|
||||
|
||||
// Load define func to retrieve the compute configuration for the specified key
|
||||
func (m *MeasComputeState) Load(key string) (*ComputeConfig, bool) {
|
||||
value, ok := m.measMap.Load(key)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return value.(*ComputeConfig), true
|
||||
}
|
||||
|
||||
// Delete define func to remove the compute configuration for the specified key
|
||||
func (m *MeasComputeState) Delete(key string) {
|
||||
m.measMap.Delete(key)
|
||||
}
|
||||
|
||||
// LoadOrStore define func to returns the existing compute configuration for the key if present,otherwise stores and returns the given configuration
|
||||
func (m *MeasComputeState) LoadOrStore(key string, config *ComputeConfig) (*ComputeConfig, bool) {
|
||||
value, loaded := m.measMap.LoadOrStore(key, config)
|
||||
return value.(*ComputeConfig), loaded
|
||||
}
|
||||
|
||||
// Range define func to iterate over all key-configuration pairs in the map
|
||||
func (m *MeasComputeState) Range(f func(key string, config *ComputeConfig) bool) {
|
||||
m.measMap.Range(func(key, value any) bool {
|
||||
return f(key.(string), value.(*ComputeConfig))
|
||||
})
|
||||
}
|
||||
|
||||
// Len define func to return the number of compute configurations in the map
|
||||
func (m *MeasComputeState) Len() int {
|
||||
count := 0
|
||||
m.measMap.Range(func(_, _ any) bool {
|
||||
count++
|
||||
return true
|
||||
})
|
||||
return count
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ func (f *HandlerFactory) RegisterHandler(ctx context.Context, taskType TaskType,
|
|||
defer f.mu.Unlock()
|
||||
|
||||
f.handlers[taskType] = handler
|
||||
logger.Info(ctx, "Handler registered",
|
||||
logger.Info(ctx, "handler registered",
|
||||
"task_type", taskType,
|
||||
"handler_name", handler.Name(),
|
||||
)
|
||||
|
|
@ -319,7 +319,7 @@ func NewEventAnalysisHandler() *EventAnalysisHandler {
|
|||
|
||||
// Execute processes an event analysis task
|
||||
func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
||||
logger.Info(ctx, "Starting event analysis",
|
||||
logger.Info(ctx, "starting event analysis",
|
||||
"task_id", taskID,
|
||||
"task_params", params,
|
||||
)
|
||||
|
|
@ -332,7 +332,7 @@ func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, pa
|
|||
// 4. Storing results in database
|
||||
|
||||
// Simulate work
|
||||
logger.Info(ctx, "Event analysis completed",
|
||||
logger.Info(ctx, "event analysis completed",
|
||||
"task_id", taskID,
|
||||
"task_params", params,
|
||||
"db", db,
|
||||
|
|
@ -360,7 +360,7 @@ func NewBatchImportHandler() *BatchImportHandler {
|
|||
|
||||
// Execute processes a batch import task
|
||||
func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
||||
logger.Info(ctx, "Starting batch import",
|
||||
logger.Info(ctx, "starting batch import",
|
||||
"task_id", taskID,
|
||||
"task_params", params,
|
||||
"db", db,
|
||||
|
|
@ -374,7 +374,7 @@ func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, para
|
|||
// 4. Generating import report
|
||||
|
||||
// Simulate work
|
||||
logger.Info(ctx, "Batch import completed",
|
||||
logger.Info(ctx, "batch import completed",
|
||||
"task_id", taskID,
|
||||
"task_params", params,
|
||||
"db", db,
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ func InitTaskWorker(ctx context.Context, config config.ModelRTConfig, db *gorm.D
|
|||
return nil, fmt.Errorf("failed to create task worker: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Task worker initialized",
|
||||
logger.Info(ctx, "task worker initialized",
|
||||
"worker_pool_size", workerCfg.PoolSize,
|
||||
"queue_consumers", workerCfg.QueueConsumerCount,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ func NewMetricsLogger(ctx context.Context) *MetricsLogger {
|
|||
|
||||
// LogTaskMetrics records task processing metrics
|
||||
func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, processingTime time.Duration, retryCount int) {
|
||||
logger.Info(m.ctx, "Task metrics",
|
||||
logger.Info(m.ctx, "task metrics",
|
||||
"task_type", taskType,
|
||||
"status", status,
|
||||
"processing_time_ms", processingTime.Milliseconds(),
|
||||
|
|
@ -33,7 +33,7 @@ func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, process
|
|||
|
||||
// LogQueueMetrics records queue metrics
|
||||
func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Duration) {
|
||||
logger.Info(m.ctx, "Queue metrics",
|
||||
logger.Info(m.ctx, "queue metrics",
|
||||
"queue_depth", queueDepth,
|
||||
"queue_latency_ms", queueLatency.Milliseconds(),
|
||||
"metric_type", "queue",
|
||||
|
|
@ -43,7 +43,7 @@ func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Durati
|
|||
|
||||
// LogWorkerMetrics records worker metrics
|
||||
func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorkers int, memoryUsage uint64, cpuLoad float64) {
|
||||
logger.Info(m.ctx, "Worker metrics",
|
||||
logger.Info(m.ctx, "worker metrics",
|
||||
"active_workers", activeWorkers,
|
||||
"idle_workers", idleWorkers,
|
||||
"total_workers", totalWorkers,
|
||||
|
|
@ -56,7 +56,7 @@ func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorker
|
|||
|
||||
// LogRetryMetrics records retry metrics
|
||||
func (m *MetricsLogger) LogRetryMetrics(taskType TaskType, retryCount int, success bool, delay time.Duration) {
|
||||
logger.Info(m.ctx, "Retry metrics",
|
||||
logger.Info(m.ctx, "retry metrics",
|
||||
"task_type", taskType,
|
||||
"retry_count", retryCount,
|
||||
"retry_success", success,
|
||||
|
|
@ -71,7 +71,7 @@ func (m *MetricsLogger) LogSystemMetrics() {
|
|||
var memStats runtime.MemStats
|
||||
runtime.ReadMemStats(&memStats)
|
||||
|
||||
logger.Info(m.ctx, "System metrics",
|
||||
logger.Info(m.ctx, "system metrics",
|
||||
"metric_type", "system",
|
||||
"timestamp", time.Now().Unix(),
|
||||
"goroutines", runtime.NumGoroutine(),
|
||||
|
|
@ -90,7 +90,7 @@ func (m *MetricsLogger) LogSystemMetrics() {
|
|||
func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string, startTime, endTime time.Time, retryCount int, errorMsg string) {
|
||||
duration := endTime.Sub(startTime)
|
||||
|
||||
logger.Info(m.ctx, "Task completion metrics",
|
||||
logger.Info(m.ctx, "task completion metrics",
|
||||
"metric_type", "task_completion",
|
||||
"timestamp", time.Now().Unix(),
|
||||
"task_id", taskID,
|
||||
|
|
@ -107,7 +107,7 @@ func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string
|
|||
|
||||
// LogHealthCheckMetrics records health check metrics
|
||||
func (m *MetricsLogger) LogHealthCheckMetrics(healthy bool, checkDuration time.Duration, components map[string]bool) {
|
||||
logger.Info(m.ctx, "Health check metrics",
|
||||
logger.Info(m.ctx, "health check metrics",
|
||||
"metric_type", "health_check",
|
||||
"timestamp", time.Now().Unix(),
|
||||
"healthy", healthy,
|
||||
|
|
|
|||
|
|
@ -67,12 +67,12 @@ func (p *QueueProducer) declareInfrastructure() error {
|
|||
// Declare durable direct exchange
|
||||
err := p.ch.ExchangeDeclare(
|
||||
constants.TaskExchangeName, // name
|
||||
"direct", // type
|
||||
true, // durable
|
||||
false, // auto-deleted
|
||||
false, // internal
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
"direct", // type
|
||||
true, // durable
|
||||
false, // auto-deleted
|
||||
false, // internal
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to declare exchange: %w", err)
|
||||
|
|
@ -81,12 +81,12 @@ func (p *QueueProducer) declareInfrastructure() error {
|
|||
// Declare durable queue with priority support and message TTL
|
||||
_, err = p.ch.QueueDeclare(
|
||||
constants.TaskQueueName, // name
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
amqp.Table{
|
||||
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
|
||||
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
|
||||
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), // message TTL
|
||||
},
|
||||
)
|
||||
|
|
@ -99,8 +99,8 @@ func (p *QueueProducer) declareInfrastructure() error {
|
|||
constants.TaskQueueName, // queue name
|
||||
constants.TaskRoutingKey, // routing key
|
||||
constants.TaskExchangeName, // exchange name
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to bind queue: %w", err)
|
||||
|
|
@ -148,15 +148,15 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT
|
|||
ctx,
|
||||
constants.TaskExchangeName, // exchange
|
||||
constants.TaskRoutingKey, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
publishing,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to publish task message: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Task published to queue",
|
||||
logger.Info(ctx, "task published to queue",
|
||||
"task_id", taskID.String(),
|
||||
"task_type", taskType,
|
||||
"priority", priority,
|
||||
|
|
@ -180,7 +180,7 @@ func (p *QueueProducer) PublishTaskWithRetry(ctx context.Context, taskID uuid.UU
|
|||
backoff := time.Duration(1<<uint(i)) * time.Second
|
||||
backoff = min(backoff, 10*time.Second)
|
||||
|
||||
logger.Warn(ctx, "Failed to publish task, retrying",
|
||||
logger.Warn(ctx, "failed to publish task, retrying",
|
||||
"task_id", taskID.String(),
|
||||
"attempt", i+1,
|
||||
"max_retries", maxRetries,
|
||||
|
|
@ -211,10 +211,10 @@ func (p *QueueProducer) Close() error {
|
|||
func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) {
|
||||
queue, err := p.ch.QueueDeclarePassive(
|
||||
constants.TaskQueueName, // name
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
amqp.Table{
|
||||
"x-max-priority": constants.TaskMaxPriority,
|
||||
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(),
|
||||
|
|
@ -246,20 +246,20 @@ func PushTaskToRabbitMQ(ctx context.Context, cfg config.RabbitMQConfig, taskChan
|
|||
case <-ctx.Done():
|
||||
logger.Info(ctx, "push task to RabbitMQ stopped by context cancel")
|
||||
return
|
||||
case msg, ok := <-taskChan:
|
||||
case task, ok := <-taskChan:
|
||||
if !ok {
|
||||
logger.Info(ctx, "task channel closed, exiting push loop")
|
||||
return
|
||||
}
|
||||
// Restore trace context from the handler that enqueued this message
|
||||
taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier))
|
||||
taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(task.TraceCarrier))
|
||||
taskCtx, pubSpan := otel.Tracer("modelRT/task").Start(taskCtx, "task.publish",
|
||||
oteltrace.WithAttributes(attribute.String("task_id", msg.TaskID.String())),
|
||||
oteltrace.WithAttributes(attribute.String("task_id", task.TaskID.String())),
|
||||
)
|
||||
if err := producer.PublishTaskWithRetry(taskCtx, msg.TaskID, msg.TaskType, msg.Priority, msg.Params, 3); err != nil {
|
||||
if err := producer.PublishTaskWithRetry(taskCtx, task.TaskID, task.TaskType, task.Priority, task.Params, 3); err != nil {
|
||||
pubSpan.RecordError(err)
|
||||
logger.Error(taskCtx, "publish task to RabbitMQ failed",
|
||||
"task_id", msg.TaskID, "error", err)
|
||||
"task_id", task.TaskID, "error", err)
|
||||
}
|
||||
pubSpan.End()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ func NewExponentialBackoffRetry(maxRetries int, initialDelay, maxDelay time.Dura
|
|||
// ShouldRetry implements exponential backoff with jitter
|
||||
func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string, retryCount int, lastError error) (bool, time.Duration) {
|
||||
if retryCount >= s.MaxRetries {
|
||||
logger.Info(ctx, "Task reached maximum retry count",
|
||||
logger.Info(ctx, "task reached maximum retry count",
|
||||
"task_id", taskID,
|
||||
"retry_count", retryCount,
|
||||
"max_retries", s.MaxRetries,
|
||||
|
|
@ -86,7 +86,7 @@ func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string
|
|||
}
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Task will be retried",
|
||||
logger.Info(ctx, "task will be retried",
|
||||
"task_id", taskID,
|
||||
"retry_count", retryCount,
|
||||
"next_retry_in", delay,
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
|||
shouldRetry, delay := q.strategy.ShouldRetry(ctx, taskID.String(), retryCount, lastError)
|
||||
if !shouldRetry {
|
||||
// Mark task as permanently failed
|
||||
logger.Info(ctx, "Task will not be retried, marking as failed",
|
||||
logger.Info(ctx, "task will not be retried, marking as failed",
|
||||
"task_id", taskID,
|
||||
"retry_count", retryCount,
|
||||
"max_retries", q.strategy.GetMaxRetries(),
|
||||
|
|
@ -63,7 +63,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
|||
}
|
||||
if err := database.UpdateTaskErrorInfo(ctx, tx, taskID, errorMsg, ""); err != nil {
|
||||
// Log but don't fail the whole retry scheduling
|
||||
logger.Warn(ctx, "Failed to update task error info",
|
||||
logger.Warn(ctx, "failed to update task error info",
|
||||
"task_id", taskID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -74,7 +74,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
|||
})
|
||||
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Failed to schedule task retry",
|
||||
logger.Error(ctx, "failed to schedule task retry",
|
||||
"task_id", taskID,
|
||||
"task_type", taskType,
|
||||
"retry_count", retryCount,
|
||||
|
|
@ -84,7 +84,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
|||
return err
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Task scheduled for retry",
|
||||
logger.Info(ctx, "task scheduled for retry",
|
||||
"task_id", taskID,
|
||||
"task_type", taskType,
|
||||
"retry_count", retryCount+1,
|
||||
|
|
@ -100,7 +100,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
|||
// Get tasks due for retry
|
||||
tasks, err := database.GetTasksForRetry(ctx, q.db, batchSize)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Failed to get tasks for retry", "error", err)
|
||||
logger.Error(ctx, "failed to get tasks for retry", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -108,7 +108,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
|||
return nil
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Processing retry queue",
|
||||
logger.Info(ctx, "processing retry queue",
|
||||
"task_count", len(tasks),
|
||||
"batch_size", batchSize,
|
||||
)
|
||||
|
|
@ -121,7 +121,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
|||
// Publish task to queue for immediate processing
|
||||
taskType := TaskType(task.TaskType)
|
||||
if err := q.producer.PublishTask(ctx, task.TaskID, taskType, task.Priority, map[string]any(task.Params)); err != nil {
|
||||
logger.Error(ctx, "Failed to publish retry task to queue",
|
||||
logger.Error(ctx, "failed to publish retry task to queue",
|
||||
"task_id", task.TaskID,
|
||||
"task_type", taskType,
|
||||
"error", err,
|
||||
|
|
@ -132,7 +132,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
|||
|
||||
// Update task status back to submitted
|
||||
if err := database.UpdateAsyncTaskStatus(ctx, q.db, task.TaskID, "SUBMITTED"); err != nil {
|
||||
logger.Warn(ctx, "Failed to update retry task status",
|
||||
logger.Warn(ctx, "failed to update retry task status",
|
||||
"task_id", task.TaskID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -140,13 +140,13 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
|||
|
||||
// Clear next retry time since task is being retried now
|
||||
if err := database.UpdateTaskRetryInfo(ctx, q.db, task.TaskID, task.RetryCount, 0); err != nil {
|
||||
logger.Warn(ctx, "Failed to clear next retry time",
|
||||
logger.Warn(ctx, "failed to clear next retry time",
|
||||
"task_id", task.TaskID,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Retry task resubmitted",
|
||||
logger.Info(ctx, "retry task resubmitted",
|
||||
"task_id", task.TaskID,
|
||||
"task_type", taskType,
|
||||
"retry_count", task.RetryCount,
|
||||
|
|
@ -166,11 +166,11 @@ func (q *RetryQueue) StartRetryScheduler(ctx context.Context, interval time.Dura
|
|||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info(ctx, "Retry scheduler stopping")
|
||||
logger.Info(ctx, "retry scheduler stopping")
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := q.ProcessRetryQueue(ctx, batchSize); err != nil {
|
||||
logger.Error(ctx, "Error processing retry queue", "error", err)
|
||||
logger.Error(ctx, "error processing retry queue", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
|
|||
return fmt.Errorf("invalid parameter type for TestTask")
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Starting test task executionser",
|
||||
logger.Info(ctx, "starting test task executionser",
|
||||
"task_id", taskID,
|
||||
"sleep_duration_seconds", params.SleepDuration,
|
||||
"message", params.Message,
|
||||
|
|
@ -113,14 +113,14 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
|
|||
|
||||
// Save result to database
|
||||
if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil {
|
||||
logger.Error(ctx, "Failed to save test task result",
|
||||
logger.Error(ctx, "failed to save test task result",
|
||||
"task_id", taskID,
|
||||
"error", err,
|
||||
)
|
||||
return fmt.Errorf("failed to save task result: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Test task completed successfully",
|
||||
logger.Info(ctx, "test task completed successfully",
|
||||
"task_id", taskID,
|
||||
"sleep_duration_seconds", params.SleepDuration,
|
||||
)
|
||||
|
|
@ -142,7 +142,7 @@ func NewTestTaskHandler() *TestTaskHandler {
|
|||
|
||||
// Execute processes a test task using the unified task interface
|
||||
func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
||||
logger.Info(ctx, "Executing test task",
|
||||
logger.Info(ctx, "executing test task",
|
||||
"task_id", taskID,
|
||||
"task_params", params,
|
||||
"db", db,
|
||||
|
|
|
|||
|
|
@ -178,30 +178,26 @@ func NewTaskWorker(ctx context.Context, cfg WorkerConfig, db *gorm.DB, rabbitCfg
|
|||
|
||||
// Start begins consuming tasks from the queue
|
||||
func (w *TaskWorker) Start() error {
|
||||
logger.Info(w.ctx, "Starting task worker",
|
||||
logger.Info(w.ctx, "starting task worker",
|
||||
"pool_size", w.cfg.PoolSize,
|
||||
"queue_consumers", w.cfg.QueueConsumerCount,
|
||||
)
|
||||
|
||||
// Start multiple consumers for better throughput
|
||||
for i := 0; i < w.cfg.QueueConsumerCount; i++ {
|
||||
w.wg.Add(1)
|
||||
go w.consumerLoop(i)
|
||||
w.wg.Go(func() { w.consumerLoop(i) })
|
||||
}
|
||||
|
||||
// Start health check goroutine
|
||||
w.wg.Add(1)
|
||||
go w.healthCheckLoop()
|
||||
w.wg.Go(w.healthCheckLoop)
|
||||
|
||||
logger.Info(w.ctx, "Task worker started successfully")
|
||||
logger.Info(w.ctx, "task worker started successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
// consumerLoop runs a single RabbitMQ consumer
|
||||
func (w *TaskWorker) consumerLoop(consumerID int) {
|
||||
defer w.wg.Done()
|
||||
|
||||
logger.Info(w.ctx, "Starting consumer", "consumer_id", consumerID)
|
||||
logger.Info(w.ctx, "starting consumer", "consumer_id", consumerID)
|
||||
|
||||
// Consume messages from the queue
|
||||
msgs, err := w.ch.Consume(
|
||||
|
|
@ -214,7 +210,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
|
|||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
logger.Error(w.ctx, "Failed to start consumer",
|
||||
logger.Error(w.ctx, "failed to start consumer",
|
||||
"consumer_id", consumerID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -224,11 +220,11 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
|
|||
for {
|
||||
select {
|
||||
case <-w.stopChan:
|
||||
logger.Info(w.ctx, "Consumer stopping", "consumer_id", consumerID)
|
||||
logger.Info(w.ctx, "consumer stopping", "consumer_id", consumerID)
|
||||
return
|
||||
case msg, ok := <-msgs:
|
||||
if !ok {
|
||||
logger.Warn(w.ctx, "Consumer channel closed", "consumer_id", consumerID)
|
||||
logger.Warn(w.ctx, "consumer channel closed", "consumer_id", consumerID)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -237,7 +233,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
|
|||
w.handleMessage(msg)
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(w.ctx, "Failed to submit task to pool",
|
||||
logger.Error(w.ctx, "failed to submit task to pool",
|
||||
"consumer_id", consumerID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -265,7 +261,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
// Parse task message
|
||||
var taskMsg TaskQueueMessage
|
||||
if err := json.Unmarshal(msg.Body, &taskMsg); err != nil {
|
||||
logger.Error(ctx, "Failed to unmarshal task message", "error", err)
|
||||
logger.Error(ctx, "failed to unmarshal task message", "error", err)
|
||||
msg.Nack(false, false) // Reject without requeue
|
||||
w.metrics.mu.Lock()
|
||||
w.metrics.TotalFailed++
|
||||
|
|
@ -275,7 +271,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
|
||||
// Validate message
|
||||
if !taskMsg.Validate() {
|
||||
logger.Error(ctx, "Invalid task message",
|
||||
logger.Error(ctx, "invalid task message",
|
||||
"task_id", taskMsg.TaskID,
|
||||
"task_type", taskMsg.TaskType,
|
||||
)
|
||||
|
|
@ -299,7 +295,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
defer span.End()
|
||||
ctx = taskCtx
|
||||
|
||||
logger.Info(ctx, "Processing task",
|
||||
logger.Info(ctx, "processing task",
|
||||
"task_id", taskMsg.TaskID,
|
||||
"task_type", taskMsg.TaskType,
|
||||
"priority", taskMsg.Priority,
|
||||
|
|
@ -307,7 +303,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
|
||||
// Update task status to RUNNING in database
|
||||
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusRunning); err != nil {
|
||||
logger.Error(ctx, "Failed to update task status", "error", err)
|
||||
logger.Error(ctx, "failed to update task status", "error", err)
|
||||
msg.Nack(false, true) // Reject with requeue
|
||||
w.metrics.mu.Lock()
|
||||
w.metrics.TotalFailed++
|
||||
|
|
@ -326,7 +322,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
processingTime := time.Since(startTime)
|
||||
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Task execution failed",
|
||||
logger.Error(ctx, "task execution failed",
|
||||
"task_id", taskMsg.TaskID,
|
||||
"task_type", taskMsg.TaskType,
|
||||
"processing_time", processingTime,
|
||||
|
|
@ -335,7 +331,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
|
||||
// Update task status to FAILED
|
||||
if updateErr := w.updateTaskWithError(ctx, taskMsg.TaskID, err); updateErr != nil {
|
||||
logger.Error(ctx, "Failed to update task with error", "error", updateErr)
|
||||
logger.Error(ctx, "failed to update task with error", "error", updateErr)
|
||||
}
|
||||
|
||||
if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil {
|
||||
|
|
@ -353,7 +349,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
|
||||
// Update task status to COMPLETED
|
||||
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusCompleted); err != nil {
|
||||
logger.Error(ctx, "Failed to update task status to completed", "error", err)
|
||||
logger.Error(ctx, "failed to update task status to completed", "error", err)
|
||||
// Still ack the message since task was processed successfully
|
||||
}
|
||||
|
||||
|
|
@ -364,7 +360,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
// Acknowledge message
|
||||
msg.Ack(false)
|
||||
|
||||
logger.Info(ctx, "Task completed successfully",
|
||||
logger.Info(ctx, "task completed successfully",
|
||||
"task_id", taskMsg.TaskID,
|
||||
"task_type", taskMsg.TaskType,
|
||||
"processing_time", processingTime,
|
||||
|
|
@ -398,7 +394,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
|||
// Update task status in database
|
||||
err := database.UpdateAsyncTaskStatus(ctx, w.db, taskID, ormStatus)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Failed to update task status in database",
|
||||
logger.Error(ctx, "failed to update task status in database",
|
||||
"task_id", taskID,
|
||||
"status", status,
|
||||
"error", err,
|
||||
|
|
@ -410,7 +406,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
|||
if status == StatusRunning {
|
||||
startedAt := time.Now().Unix()
|
||||
if err := database.UpdateTaskStarted(ctx, w.db, taskID, startedAt); err != nil {
|
||||
logger.Warn(ctx, "Failed to update task start time",
|
||||
logger.Warn(ctx, "failed to update task start time",
|
||||
"task_id", taskID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -423,14 +419,14 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
|||
finishedAt := time.Now().Unix()
|
||||
if status == StatusCompleted {
|
||||
if err := database.CompleteAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
|
||||
logger.Warn(ctx, "Failed to mark task as completed",
|
||||
logger.Warn(ctx, "failed to mark task as completed",
|
||||
"task_id", taskID,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
if err := database.FailAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
|
||||
logger.Warn(ctx, "Failed to mark task as failed",
|
||||
logger.Warn(ctx, "failed to mark task as failed",
|
||||
"task_id", taskID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -438,7 +434,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
|||
}
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "Task status updated",
|
||||
logger.Debug(ctx, "task status updated",
|
||||
"task_id", taskID,
|
||||
"status", status,
|
||||
)
|
||||
|
|
@ -451,16 +447,16 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID,
|
|||
stackTrace := fmt.Sprintf("%+v", err)
|
||||
|
||||
if updateErr := database.UpdateTaskErrorInfo(ctx, w.db, taskID, errorMsg, stackTrace); updateErr != nil {
|
||||
logger.Error(ctx, "Failed to update task error info", "task_id", taskID, "error", updateErr)
|
||||
logger.Error(ctx, "failed to update task error info", "task_id", taskID, "error", updateErr)
|
||||
return updateErr
|
||||
}
|
||||
|
||||
if updateErr := database.UpdateAsyncTaskResultWithError(ctx, w.db, taskID, 500, errorMsg, nil); updateErr != nil {
|
||||
logger.Error(ctx, "Failed to update task result with error", "task_id", taskID, "error", updateErr)
|
||||
logger.Error(ctx, "failed to update task result with error", "task_id", taskID, "error", updateErr)
|
||||
return updateErr
|
||||
}
|
||||
|
||||
logger.Warn(ctx, "Task failed with error", "task_id", taskID, "error", errorMsg)
|
||||
logger.Warn(ctx, "task failed with error", "task_id", taskID, "error", errorMsg)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -469,7 +465,7 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID,
|
|||
func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uuid.UUID, params map[string]any, msg *amqp.Delivery) error {
|
||||
handler, err := w.factory.GetHandler(taskType)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "No handler for task type", "task_type", taskType)
|
||||
logger.Error(ctx, "no handler for task type", "task_type", taskType)
|
||||
msg.Nack(false, false)
|
||||
return err
|
||||
}
|
||||
|
|
@ -478,8 +474,6 @@ func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uui
|
|||
|
||||
// healthCheckLoop periodically checks worker health and metrics
|
||||
func (w *TaskWorker) healthCheckLoop() {
|
||||
defer w.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(w.cfg.PollingInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
|
|
@ -519,7 +513,7 @@ func (w *TaskWorker) checkHealth() {
|
|||
w.metrics.WorkersIdle = w.pool.Free()
|
||||
w.metrics.LastHealthCheck = time.Now()
|
||||
|
||||
logger.Info(w.ctx, "Worker health check",
|
||||
logger.Info(w.ctx, "worker health check",
|
||||
"tasks_processed", w.metrics.TotalProcessed,
|
||||
"tasks_failed", w.metrics.TotalFailed,
|
||||
"tasks_success", w.metrics.TotalSuccess,
|
||||
|
|
@ -536,7 +530,7 @@ func (w *TaskWorker) checkHealth() {
|
|||
|
||||
// Stop gracefully stops the task worker
|
||||
func (w *TaskWorker) Stop() error {
|
||||
logger.Info(w.ctx, "Stopping task worker")
|
||||
logger.Info(w.ctx, "stopping task worker")
|
||||
|
||||
// Signal all goroutines to stop
|
||||
close(w.stopChan)
|
||||
|
|
|
|||
|
|
@ -7,15 +7,22 @@ import (
|
|||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// MapSlice define func to build a new slice by applying f to every element of s.
|
||||
func MapSlice[T, U any](s []T, f func(T) U) []U {
|
||||
result := make([]U, 0, len(s))
|
||||
for _, item := range s {
|
||||
result = append(result, f(item))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// ConvertZSetMembersToFloat64 define func to conver zset member type to float64
|
||||
func ConvertZSetMembersToFloat64(members []redis.Z) []float64 {
|
||||
dataFloats := make([]float64, 0, len(members))
|
||||
// recovery time sorted in ascending order
|
||||
sortRedisZByTimeMemberAscending(members)
|
||||
for _, member := range members {
|
||||
dataFloats = append(dataFloats, member.Score)
|
||||
}
|
||||
return dataFloats
|
||||
return MapSlice(members, func(member redis.Z) float64 {
|
||||
return member.Score
|
||||
})
|
||||
}
|
||||
|
||||
func sortRedisZByTimeMemberAscending(data []redis.Z) {
|
||||
|
|
|
|||
16
util/map.go
16
util/map.go
|
|
@ -1,11 +1,13 @@
|
|||
// Package util provide some utility functions
|
||||
package util
|
||||
|
||||
// GetKeysFromSet define func to get all keys from a map[string]struct{}
|
||||
func GetKeysFromSet(set map[string]struct{}) []string {
|
||||
keys := make([]string, 0, len(set))
|
||||
for key := range set {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
return keys
|
||||
import (
|
||||
"maps"
|
||||
"slices"
|
||||
)
|
||||
|
||||
// GetKeysFromSet define func to get all keys from a set-like map.
|
||||
// It delegates to the standard library maps/slices helpers.
|
||||
func GetKeysFromSet[K comparable, V any](set map[K]V) []K {
|
||||
return slices.Collect(maps.Keys(set))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,12 +1,9 @@
|
|||
// Package util provide some utility functions
|
||||
package util
|
||||
|
||||
// RemoveTargetsFromSliceSimple define func to remove targets from a slice of strings
|
||||
func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []string) []string {
|
||||
targetsToRemoveSet := make(map[string]struct{}, len(targetsToRemove))
|
||||
for _, target := range targetsToRemove {
|
||||
targetsToRemoveSet[target] = struct{}{}
|
||||
}
|
||||
// RemoveTargetsFromSliceSimple define func to remove targets from a slice
|
||||
func RemoveTargetsFromSliceSimple[T comparable](targetsSlice []T, targetsToRemove []T) []T {
|
||||
targetsToRemoveSet := SliceToSet(targetsToRemove)
|
||||
|
||||
for i := len(targetsSlice) - 1; i >= 0; i-- {
|
||||
if _, found := targetsToRemoveSet[targetsSlice[i]]; found {
|
||||
|
|
@ -17,21 +14,21 @@ func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []strin
|
|||
return targetsSlice
|
||||
}
|
||||
|
||||
// SliceToSet define func to convert string slice to set
|
||||
func SliceToSet(targetsSlice []string) map[string]struct{} {
|
||||
set := make(map[string]struct{}, len(targetsSlice))
|
||||
// SliceToSet define func to convert a slice to a set
|
||||
func SliceToSet[T comparable](targetsSlice []T) map[T]struct{} {
|
||||
set := make(map[T]struct{}, len(targetsSlice))
|
||||
for _, target := range targetsSlice {
|
||||
set[target] = struct{}{}
|
||||
}
|
||||
return set
|
||||
}
|
||||
|
||||
// DeduplicateAndReportDuplicates define func to deduplicate a slice of strings and report duplicates
|
||||
func DeduplicateAndReportDuplicates(targetsSlice []string, sourceSlice []string) (deduplicated []string, duplicates []string) {
|
||||
// DeduplicateAndReportDuplicates define func to deduplicate a slice and report duplicates
|
||||
func DeduplicateAndReportDuplicates[T comparable](targetsSlice []T, sourceSlice []T) (deduplicated []T, duplicates []T) {
|
||||
targetSet := SliceToSet(targetsSlice)
|
||||
deduplicated = make([]string, 0, len(sourceSlice))
|
||||
deduplicated = make([]T, 0, len(sourceSlice))
|
||||
// duplicate items slice
|
||||
duplicates = make([]string, 0, len(sourceSlice))
|
||||
duplicates = make([]T, 0, len(sourceSlice))
|
||||
|
||||
for _, source := range sourceSlice {
|
||||
if _, found := targetSet[source]; found {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,84 @@
|
|||
// Package util provide some utility functions
|
||||
package util
|
||||
|
||||
import (
|
||||
"iter"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// TypedMap define a type-safe generic wrapper around sync.Map.
|
||||
// It keeps the concurrency guarantees of sync.Map while removing the
|
||||
// per-call-site type assertions previously required for every Load.
|
||||
type TypedMap[K comparable, V any] struct {
|
||||
m sync.Map
|
||||
}
|
||||
|
||||
// Load define func of return the value stored for key, and whether it was found.
|
||||
func (t *TypedMap[K, V]) Load(key K) (V, bool) {
|
||||
value, ok := t.m.Load(key)
|
||||
if !ok {
|
||||
var zero V
|
||||
return zero, false
|
||||
}
|
||||
// safe: only values of type V are ever stored through this wrapper
|
||||
return value.(V), true
|
||||
}
|
||||
|
||||
// Store define func of set the value for key.
|
||||
func (t *TypedMap[K, V]) Store(key K, value V) {
|
||||
t.m.Store(key, value)
|
||||
}
|
||||
|
||||
// LoadOrStore define func of return the existing value for key if present.
|
||||
// Otherwise it stores and returns the given value. loaded reports whether the
|
||||
// value was already present.
|
||||
func (t *TypedMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
|
||||
v, loaded := t.m.LoadOrStore(key, value)
|
||||
return v.(V), loaded
|
||||
}
|
||||
|
||||
// Swap define func of store value for key and return the previous value (if any).
|
||||
// loaded reports whether a value was already present for key.
|
||||
func (t *TypedMap[K, V]) Swap(key K, value V) (previous V, loaded bool) {
|
||||
prev, loaded := t.m.Swap(key, value)
|
||||
if !loaded {
|
||||
var zero V
|
||||
return zero, false
|
||||
}
|
||||
return prev.(V), true
|
||||
}
|
||||
|
||||
// Delete define func of remove the value for key.
|
||||
func (t *TypedMap[K, V]) Delete(key K) {
|
||||
t.m.Delete(key)
|
||||
}
|
||||
|
||||
// Range define func of iterate over all key/value pairs.
|
||||
// Iteration stops early if f returns false.
|
||||
func (t *TypedMap[K, V]) Range(f func(key K, value V) bool) {
|
||||
t.m.Range(func(key, value any) bool {
|
||||
return f(key.(K), value.(V))
|
||||
})
|
||||
}
|
||||
|
||||
// Len define func of return the number of key/value pairs currently stored.
|
||||
// It walks the map, so the cost is O(n).
|
||||
func (t *TypedMap[K, V]) Len() int {
|
||||
count := 0
|
||||
t.m.Range(func(_, _ any) bool {
|
||||
count++
|
||||
return true
|
||||
})
|
||||
return count
|
||||
}
|
||||
|
||||
// All define func of return an iterator over all key/value pairs,
|
||||
// usable directly with range-over-func. Iteration stops early if the
|
||||
// consumer breaks out of the loop.
|
||||
func (t *TypedMap[K, V]) All() iter.Seq2[K, V] {
|
||||
return func(yield func(K, V) bool) {
|
||||
t.m.Range(func(key, value any) bool {
|
||||
return yield(key.(K), value.(V))
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue