feat: add dedicated message-exchange for task lifecycle notifications

- add constants/message.go with MessageTask* categories and message-exchange /
    message-queue / dead-letter routing constants
  - add mq/publish_message.go with PushMessageToRabbitMQ (confirm mode,
    dead-letter queue) separate from the existing event-exchange publisher
  - add mq/emit.go with TryEmitMessage for non-blocking, OTel-traced dispatch
  - add mq/event/task_event_gen.go with NewTaskSubmitted/Running/Completed/
    Failed/CancelledMessage constructors
  - wire TryEmitMessage into task worker and create/cancel handlers so all 5
    lifecycle transitions are published (previously task.* routed to
    event-exchange with no matching binding, causing silent drops)
  - harden Dockerfile: scratch final image, pinned alpine:3.21 certs stage,
    apk upgrade in builder, add -trimpath -mod=readonly go build flags
  - add full K8s manifests under deploy/k8s/ for Redis, RabbitMQ (mTLS),
    ModelRT (Downward API, scratch image, readOnlyRootFilesystem), Jaeger,
    Loki, Promtail, Grafana
  - expand deploy.md with async_task SQL schema, TLS cert generation steps,
    K8s deployment procedures, and SSH tunnel configuration
This commit is contained in:
douxu 2026-05-13 16:58:36 +08:00
parent cccd4becdc
commit 42956d1793
35 changed files with 1688 additions and 40 deletions

View File

@ -88,3 +88,10 @@ const (
// EventCriticalUpDownLimitCategroy define category for critical up and down limit event
EventCriticalUpDownLimitCategroy = "event.critical.updown.limit"
)
const (
// EventTaskGeneralTestCategory define category for test task event
EventTaskGeneralTestCategory = "event.general.task.test"
// EventTaskGeneralTopologyAnalyzeCategory define category for topology analyze task event
EventTaskGeneralTopologyAnalyzeCategory = "event.general.task.topology_analyze"
)

33
constants/message.go Normal file
View File

@ -0,0 +1,33 @@
// Package constants define constant variable
package constants
const (
// MessageExchangeName define exchange name for message
MessageExchangeName = "message-exchange"
// MessageDeadExchangeName define dead letter exchange name for message
MessageDeadExchangeName = "message-dead-letter-exchange"
)
const (
// MessageRoutingKey define binding routing key pattern for the message queue (matches all message.* categories)
MessageRoutingKey = "message.#"
// MessageDeadRoutingKey define binding routing key for the message dead letter queue
MessageDeadRoutingKey = "#"
// MessageQueueName define queue name for message
MessageQueueName = "message-queue"
// MessageDeadQueueName define dead letter queue name for message
MessageDeadQueueName = "message-dead-letter-queue"
)
const (
// MessageTaskSubmittedCategory define category for task submitted message
MessageTaskSubmittedCategory = "message.task.submitted"
// MessageTaskRunningCategory define category for task running message
MessageTaskRunningCategory = "message.task.running"
// MessageTaskCompletedCategory define category for task completed message
MessageTaskCompletedCategory = "message.task.completed"
// MessageTaskFailedCategory define category for task failed message
MessageTaskFailedCategory = "message.task.failed"
// MessageTaskCancelledCategory define category for task cancelled message
MessageTaskCancelledCategory = "message.task.cancelled"
)

View File

@ -45,6 +45,68 @@ docker ps -a grep postgres
docker logs postgres
```
#### 1.4 初始化异步任务表
$\text{PostgreSQL}$ 启动后执行以下建表语句,创建异步任务系统所需的两张表:
```sql
-- ==========================================
-- 表: async_task
-- 说明: 存储异步任务的生命周期跟踪信息
-- ==========================================
CREATE TABLE IF NOT EXISTS async_task (
task_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_type VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
params JSONB,
created_at BIGINT NOT NULL,
finished_at BIGINT,
started_at BIGINT,
execution_time BIGINT,
progress INTEGER,
retry_count INTEGER DEFAULT 0,
max_retry_count INTEGER DEFAULT 3,
next_retry_time BIGINT,
retry_delay INTEGER DEFAULT 5000,
priority INTEGER DEFAULT 5,
queue_name VARCHAR(100) DEFAULT 'default',
worker_id VARCHAR(50),
failure_reason TEXT,
stack_trace TEXT,
created_by VARCHAR(100)
);
CREATE INDEX IF NOT EXISTS idx_async_task_task_type ON async_task(task_type);
CREATE INDEX IF NOT EXISTS idx_async_task_status ON async_task(status);
CREATE INDEX IF NOT EXISTS idx_async_task_created_at ON async_task(created_at);
CREATE INDEX IF NOT EXISTS idx_async_task_finished_at ON async_task(finished_at);
CREATE INDEX IF NOT EXISTS idx_async_task_started_at ON async_task(started_at);
CREATE INDEX IF NOT EXISTS idx_async_task_next_retry_time ON async_task(next_retry_time);
CREATE INDEX IF NOT EXISTS idx_async_task_priority ON async_task(priority);
CREATE INDEX IF NOT EXISTS idx_async_task_status_retry ON async_task(status, next_retry_time)
WHERE status = 'FAILED' AND next_retry_time IS NOT NULL;
-- ==========================================
-- 表: async_task_result
-- 说明: 存储异步任务的执行结果
-- ==========================================
CREATE TABLE IF NOT EXISTS async_task_result (
task_id UUID PRIMARY KEY,
result JSONB,
error_code INTEGER,
error_message TEXT,
error_detail JSONB,
execution_time BIGINT NOT NULL DEFAULT 0,
memory_usage BIGINT,
cpu_usage DOUBLE PRECISION,
retry_count INTEGER DEFAULT 0,
completed_at BIGINT NOT NULL
);
COMMENT ON TABLE async_task IS '异步任务生命周期跟踪表';
COMMENT ON TABLE async_task_result IS '异步任务执行结果表';
```
### 2\. 部署 Redis Stack Server
我们将使用 `redis/redis-stack-server:latest` 镜像该镜像内置了 $\text{Redisearch}$ 模块,用于 $\text{ModelRT}$ 项目中补全功能
@ -401,15 +463,453 @@ go build -o model-rt main.go
在发现控制台输出如下信息`starting ModelRT server`
后即代表服务启动成功
### 4\. 后续操作(停止与清理
### 4\. 部署基础依赖Kubernetes
#### 4.1 停止容器
Redis 和 RabbitMQ 部署在 Minikube 中YAML 文件位于 `deploy/k8s/`。RabbitMQ 启用双向 TLSmTLS客户端以 X.509 证书的 CN 字段作为用户名进行认证。
#### 4.1 部署 Redis
```bash
kubectl apply -f deploy/k8s/redis-deployment.yaml
kubectl apply -f deploy/k8s/redis-service.yaml
```
| 参数 | 值 | 说明 |
| :--- | :--- | :--- |
| **镜像** | `redis/redis-stack-server:latest` | 内置 Redisearch 模块 |
| **NodePort** | `30001` | 集群外访问端口 |
#### 4.2 RabbitMQ TLS 证书生成
RabbitMQ 配置为仅允许 TLS 连接(`listeners.tcp = none`),所有客户端须持有由同一 CA 签发的证书。
##### 4.2.1 生成根 CA
```bash
# 克隆 tls-gen 工具
git clone https://github.com/rabbitmq/tls-gen.git
cd tls-gen/basic
# 生成根 CA结果在 result/ 目录)
make CN=rabbitmq-server
# ca_certificate.pem 和 ca_key.pem 生成于 result/
```
##### 4.2.2 生成服务器证书
服务器证书需包含 SANSubject Alternative Name使其同时匹配集群内 DNS 和 Minikube IP。
创建 `server.cnf`
```text
[req]
distinguished_name = req_distinguished_name
prompt = no
[req_distinguished_name]
C = CN
ST = Beijing
L = Beijing
O = coslight
CN = rabbitmq-server
[v3_server]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth, clientAuth
subjectAltName = @alt_names
[alt_names]
DNS.1 = rabbitmq-server
DNS.2 = rabbitmq-service.default.svc.cluster.local
DNS.3 = localhost
IP.1 = 192.168.49.2
IP.2 = 127.0.0.1
```
生成证书:
```bash
# 将 ca_certificate.pem 和 ca_key.pem即 cakey.pem放在当前目录
openssl genrsa -out server_key.pem 2048
openssl req -new -key server_key.pem -out server_cert.csr -config server.cnf
openssl x509 -req -in server_cert.csr \
-CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \
-out server_certificate.pem -days 730 -sha256 \
-extfile server.cnf -extensions v3_server
rm server_cert.csr
```
##### 4.2.3 生成 ModelRT 客户端证书
CN 必须与 RabbitMQ 中注册的用户名一致(`modelrt-client`)。
创建 `modelrt.cnf`
```text
[req]
distinguished_name = req_distinguished_name
prompt = no
[req_distinguished_name]
C = CN
ST = Beijing
L = Beijing
O = coslight
CN = modelrt-client
[v3_client]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth
```
生成证书:
```bash
openssl genrsa -out modelrt_client_key.pem 2048
openssl req -new -key modelrt_client_key.pem \
-out modelrt_client.csr -config modelrt.cnf
openssl x509 -req -in modelrt_client.csr \
-CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \
-out modelrt_client_cert.pem -days 365 \
-extensions v3_client -extfile modelrt.cnf
rm modelrt_client.csr
```
##### 4.2.4 生成 EventRT 客户端证书
创建 `eventrt.cnf`CN 改为 `eventrt-client`
```text
[req]
distinguished_name = req_distinguished_name
prompt = no
[req_distinguished_name]
C = CN
ST = Beijing
L = Beijing
O = coslight
CN = eventrt-client
[v3_client]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth
```
生成证书:
```bash
openssl genrsa -out eventrt_client_key.pem 2048
openssl req -new -key eventrt_client_key.pem \
-out eventrt_client.csr -config eventrt.cnf
openssl x509 -req -in eventrt_client.csr \
-CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \
-out eventrt_client_cert.pem -days 365 \
-extensions v3_client -extfile eventrt.cnf
rm eventrt_client.csr
```
##### 4.2.5 验证证书
```bash
# 验证服务器证书
openssl verify -CAfile ca_certificate.pem server_certificate.pem
# 验证客户端证书
openssl verify -CAfile ca_certificate.pem modelrt_client_cert.pem
openssl verify -CAfile ca_certificate.pem eventrt_client_cert.pem
# 查看证书详情(确认 CN 和 SAN
openssl x509 -in server_certificate.pem -noout -subject -ext subjectAltName
openssl x509 -in modelrt_client_cert.pem -noout -subject
openssl x509 -in eventrt_client_cert.pem -noout -subject
```
#### 4.3 部署 RabbitMQ
##### 4.3.1 创建证书 Secret
将服务器端三个证书文件打包为 K8s Secret在证书文件所在目录执行
```bash
kubectl create secret generic rabbitmq-certs \
--from-file=ca_certificate.pem=./ca_certificate.pem \
--from-file=server_certificate.pem=./server_certificate.pem \
--from-file=server_key.pem=./server_key.pem
```
##### 4.3.2 部署
```bash
kubectl apply -f deploy/k8s/rabbitmq-secret.yaml
kubectl apply -f deploy/k8s/rabbitmq-config.yaml
kubectl apply -f deploy/k8s/rabbitmq-users-config.yaml
kubectl apply -f deploy/k8s/rabbitmq-deployment.yaml
kubectl apply -f deploy/k8s/rabbitmq-service.yaml
```
##### 4.3.3 端口汇总
| 端口 | NodePort | 说明 |
| :--- | :--- | :--- |
| `5671` | `30671` | AMQP over TLS客户端连接 |
| `5672` | `30672` | AMQP 明文(内部备用,生产禁用) |
| `15671` | `31671` | Management UI over TLS |
| `15672` | `31672` | Management UI 明文(内部备用) |
##### 4.3.4 用户与权限说明
用户定义在 `rabbitmq-users-config.yaml``definitions.json` 中,通过 `load_definitions` 启动时自动加载:
| 用户 | 认证方式 | 权限 | 说明 |
| :--- | :--- | :--- | :--- |
| `coslight` | 密码 | administrator | 管理员,密码在 rabbitmq-secret.yaml |
| `modelrt-client` | X.509 证书CN | configure/read/write | ModelRT 服务专用 |
| `eventrt-client` | X.509 证书CN | configure/read/write | EventRT 服务专用 |
| `web-client` | X.509 证书CN | read/write | Web 客户端 |
> **注意:** 证书认证用户的 `password_hash` 留空RabbitMQ 通过 `ssl_cert_login_from = common_name` 将证书 CN 映射为用户名。
### 5\. 部署 ModelRTKubernetes
所有资源部署在 `default` 命名空间YAML 文件位于 `deploy/k8s/`
#### 5.1 构建并推送镜像
```bash
# 在项目根目录执行
docker build -f deploy/dockerfile/modelrt.Dockerfile -t coslight/modelrt:latest .
# 推送到镜像仓库(或直接加载到 Minikube
minikube image load coslight/modelrt:latest
```
#### 5.2 创建客户端证书 Secret
在 RabbitMQ TLS 证书生成完成后(见 4.2),进入证书文件所在目录执行:
```bash
sh deploy/k8s/modelrt-certs-secret.sh
```
该脚本等价于:
```bash
kubectl create secret generic modelrt-certs \
--from-file=ca_certificate.pem=./ca_certificate.pem \
--from-file=modelrt_client_cert.pem=./modelrt_client_cert.pem \
--from-file=modelrt_client_key.pem=./modelrt_client_key.pem
```
#### 5.3 部署
```bash
kubectl apply -f deploy/k8s/modelrt-secret.yaml
kubectl apply -f deploy/k8s/modelrt-configmap.yaml
kubectl apply -f deploy/k8s/modelrt-deployment.yaml
kubectl apply -f deploy/k8s/modelrt-service.yaml
```
#### 5.4 配置说明
| 配置项 | 方式 | 说明 |
| :--- | :--- | :--- |
| `postgres.password` | Secret `modelrt-secret` | 不写入 ConfigMap |
| `service.secret_key` | Secret `modelrt-secret` | 不写入 ConfigMap |
| RabbitMQ 客户端证书 | Secret `modelrt-certs` | 挂载至 `/app/configs/certs/` |
| `config.yaml` 其余配置 | ConfigMap `modelrt-config` | 所有 host 已替换为 K8s service 名 |
| `K8S_NAMESPACE` / `K8S_NODE_NAME` | Downward API | 注入至日志全局字段 |
> **注意:** `modelrt-configmap.yaml``postgres.password``service.secret_key` 留空,实际值由容器启动时的环境变量 `POSTGRES_PASSWORD` / `SERVICE_SECRET_KEY` 注入,应用需读取这两个环境变量覆盖 config 中的空值。若应用当前仅读取文件配置,可直接将值填入 `modelrt-secret.yaml` 并在 ConfigMap 中引用,或在 ConfigMap 中直接填写。
#### 5.5 状态检查
```bash
# 查看 Pod 状态
kubectl get pods -l app=modelrt
# 查看启动日志
kubectl logs -l app=modelrt --tail=50
# 查看 Service
kubectl get svc modelrt-service
```
#### 5.6 端口汇总
| NodePort | 说明 |
| :--- | :--- |
| `30080` | ModelRT HTTP APISSH 隧道本地端口 `8080` |
#### 5.7 清理
```bash
kubectl delete -f deploy/k8s/modelrt-service.yaml \
-f deploy/k8s/modelrt-deployment.yaml \
-f deploy/k8s/modelrt-configmap.yaml \
-f deploy/k8s/modelrt-secret.yaml
kubectl delete secret modelrt-certs
```
### 6\. 部署可观测性栈Kubernetes
在 $\text{Kubernetes}$ 集群中部署 $\text{Jaeger}$(链路追踪)+ $\text{Loki + Promtail + Grafana}$(日志可视化)。所有资源部署在 `default` 命名空间,$\text{YAML}$ 文件位于 `deploy/k8s/`
#### 6.1 部署 Jaeger
```bash
kubectl apply -f deploy/k8s/jaeger-deployment.yaml
kubectl apply -f deploy/k8s/jaeger-service.yaml
```
#### 6.2 部署 Loki
```bash
kubectl apply -f deploy/k8s/loki-configmap.yaml
kubectl apply -f deploy/k8s/loki-pvc.yaml
kubectl apply -f deploy/k8s/loki-deployment.yaml
kubectl apply -f deploy/k8s/loki-service.yaml
```
#### 6.3 部署 Promtail
```bash
kubectl apply -f deploy/k8s/promtail-rbac.yaml
kubectl apply -f deploy/k8s/promtail-configmap.yaml
kubectl apply -f deploy/k8s/promtail-daemonset.yaml
```
#### 6.4 部署 Grafana
```bash
kubectl apply -f deploy/k8s/grafana-configmap.yaml
kubectl apply -f deploy/k8s/grafana-deployment.yaml
kubectl apply -f deploy/k8s/grafana-service.yaml
```
#### 6.5 一键部署
```bash
kubectl apply -f deploy/k8s/jaeger-deployment.yaml \
-f deploy/k8s/jaeger-service.yaml \
-f deploy/k8s/loki-configmap.yaml \
-f deploy/k8s/loki-pvc.yaml \
-f deploy/k8s/loki-deployment.yaml \
-f deploy/k8s/loki-service.yaml \
-f deploy/k8s/promtail-rbac.yaml \
-f deploy/k8s/promtail-configmap.yaml \
-f deploy/k8s/promtail-daemonset.yaml \
-f deploy/k8s/grafana-configmap.yaml \
-f deploy/k8s/grafana-deployment.yaml \
-f deploy/k8s/grafana-service.yaml
```
#### 6.6 状态检查
```bash
# 查看所有 Pod 状态
kubectl get pods
# 查看所有 Service 及 NodePort
kubectl get svc
```
#### 6.7 端口汇总
| 服务 | NodePort | 访问地址 | 说明 |
| :--- | :--- | :--- | :--- |
| **Jaeger UI** | `31686` | `http://<NodeIP>:31686` | 链路追踪查询界面 |
| **Loki** | `31100` | `http://<NodeIP>:31100` | 日志 HTTP API |
| **Grafana** | `31000` | `http://<NodeIP>:31000` | 可视化界面,账号 `admin / coslight` |
| **OTLP gRPC** | `31317` | `<NodeIP>:31317` | ModelRT OTel 上报地址gRPC |
| **OTLP HTTP** | `31318` | `http://<NodeIP>:31318` | ModelRT OTel 上报地址HTTP |
#### 6.8 清理
```bash
kubectl delete -f deploy/k8s/
```
### 7\. Mac 本地访问SSH 隧道)
$\text{ModelRT / EventRT}$ 在 $\text{Mac}$ 本地运行时,依赖的 $\text{RabbitMQ}$、$\text{Redis}$、$\text{Jaeger}$、$\text{Loki}$、$\text{Grafana}$ 均部署在 $\text{Ubuntu}$ 宿主机(`192.168.1.101`)上的 $\text{Minikube}$`192.168.49.2`)中。由于 $\text{Minikube}$ 网络不直接对外暴露,需通过 $\text{SSH}$ 本地端口转发建立访问隧道。
#### 7.1 网络拓扑
``` text
Mac 本地端口 ──SSH隧道──▶ Ubuntu 宿主机 (192.168.1.101) ──▶ Minikube NodePort (192.168.49.2)
```
#### 7.2 建立隧道
```bash
ssh -L 5671:192.168.49.2:30671 \
-L 15671:192.168.49.2:31671 \
-L 6379:192.168.49.2:30001 \
-L 4318:192.168.49.2:31318 \
-L 16686:192.168.49.2:31686 \
-L 3100:192.168.49.2:31100 \
-L 3000:192.168.49.2:31000 \
douxu@192.168.1.101
```
如需后台静默运行(不占用终端):
```bash
ssh -fN \
-L 5671:192.168.49.2:30671 \
-L 15671:192.168.49.2:31671 \
-L 6379:192.168.49.2:30001 \
-L 4318:192.168.49.2:31318 \
-L 16686:192.168.49.2:31686 \
-L 3100:192.168.49.2:31100 \
-L 3000:192.168.49.2:31000 \
douxu@192.168.1.101
```
#### 7.3 端口映射说明
| Mac 本地端口 | Minikube NodePort | 服务 | 说明 |
| :--- | :--- | :--- | :--- |
| `5671` | `30671` | RabbitMQ AMQP | ModelRT / EventRT 消息队列连接 |
| `15671` | `31671` | RabbitMQ Management | RabbitMQ 管理界面 `http://localhost:15671` |
| `6379` | `30001` | Redis | 分布式锁 / 数据存储 |
| `4318` | `31318` | OTLP HTTP | OTel Trace 上报Jaeger Collector |
| `16686` | `31686` | Jaeger UI | 链路追踪查询 `http://localhost:16686` |
| `3100` | `31100` | Loki | 日志查询 API |
| `3000` | `31000` | Grafana | 可视化界面 `http://localhost:3000` |
> **注意:** 隧道建立后,本地配置文件中所有服务地址均填 `localhost:<本地端口>`,无需修改即可在 $\text{Mac}$ 上直接运行服务。
#### 7.4 关闭隧道
前台运行时直接 `Ctrl+C`;后台运行时查找并终止进程:
```bash
# 找到 ssh 隧道进程
ps aux | grep "ssh -fN"
# 终止(替换为实际 PID
kill <PID>
```
### 8\. 后续操作(停止与清理)
#### 8.1 停止容器
```bash
docker stop postgres redis
```
#### 4.2 删除容器(删除后数据将丢失)
#### 8.2 删除容器(删除后数据将丢失)
```bash
docker rm postgres redis

View File

@ -1,19 +1,35 @@
FROM golang:1.24-alpine AS builder
RUN apk --no-cache upgrade
WORKDIR /app
COPY go.mod .
COPY go.sum .
COPY go.mod go.sum ./
RUN GOPROXY="https://goproxy.cn,direct" go mod download
COPY . .
RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o modelrt main.go
RUN CGO_ENABLED=0 GOOS=linux go build \
-ldflags="-s -w" \
-trimpath \
-mod=readonly \
-o modelrt main.go
FROM alpine:latest
WORKDIR /app
# Prepare runtime dependencies in a pinned Alpine stage so they can be
# copied into scratch without pulling any vulnerable OS packages at run time.
FROM alpine:3.21 AS certs
ARG USER_ID=1000
RUN adduser -D -u ${USER_ID} modelrt
RUN apk --no-cache add ca-certificates tzdata && \
adduser -D -u ${USER_ID} modelrt
FROM scratch
# CA certificates required for TLS connections (RabbitMQ amqps://)
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Timezone data
COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo
# Non-root user/group definitions
COPY --from=certs /etc/passwd /etc/passwd
COPY --from=certs /etc/group /etc/group
WORKDIR /app
COPY --from=builder /app/modelrt ./modelrt
COPY configs/config.example.yaml ./configs/config.example.yaml
RUN chown -R modelrt:modelrt /app
RUN chmod +x /app/modelrt
USER modelrt
CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"]
CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"]

View File

@ -0,0 +1,26 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-datasources
namespace: default
data:
datasources.yaml: |
apiVersion: 1
datasources:
- name: Loki
type: loki
access: proxy
url: http://loki:3100
isDefault: true
jsonData:
# derivedFields: 从日志的 traceID 字段生成跳转链接到 Jaeger
derivedFields:
- matcherRegex: '"traceID":\s*"([a-f0-9]+)"'
name: TraceID
url: http://127.0.0.1:16686/trace/$${__value.raw}
targetBlank: true
- name: Jaeger
type: jaeger
uid: jaeger
access: proxy
url: http://jaeger:16686

View File

@ -0,0 +1,41 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: grafana
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: grafana
template:
metadata:
labels:
app: grafana
spec:
containers:
- name: grafana
image: grafana/grafana:10.4.2
ports:
- containerPort: 3000
env:
- name: GF_SECURITY_ADMIN_USER
value: "coslight"
- name: GF_SECURITY_ADMIN_PASSWORD
value: "coslight@tj"
- name: GF_AUTH_ANONYMOUS_ENABLED
value: "false"
volumeMounts:
- name: datasources
mountPath: /etc/grafana/provisioning/datasources
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 128Mi
volumes:
- name: datasources
configMap:
name: grafana-datasources

View File

@ -0,0 +1,14 @@
apiVersion: v1
kind: Service
metadata:
name: grafana
namespace: default
spec:
ports:
- name: http
port: 3000
targetPort: 3000
nodePort: 31000 # Grafana UI: http://<NodeIP>:31000
selector:
app: grafana
type: NodePort

View File

@ -1,31 +1,3 @@
apiVersion: v1
kind: Service
metadata:
name: jaeger
labels:
app: jaeger
spec:
ports:
- name: ui
port: 16686
targetPort: 16686
nodePort: 31686 # Jaeger UI浏览器访问 http://<NodeIP>:31686
- name: collector-http
port: 14268
targetPort: 14268
nodePort: 31268 # Jaeger 原生 HTTP collector非 OTel
- name: otlp-http
port: 4318
targetPort: 4318
nodePort: 31318 # OTLP HTTP集群外使用 <NodeIP>:31318
- name: otlp-grpc
port: 4317
targetPort: 4317
nodePort: 31317 # OTLP gRPC集群外使用 <NodeIP>:31317
selector:
app: jaeger
type: NodePort
---
apiVersion: apps/v1
kind: Deployment
metadata:

View File

@ -0,0 +1,27 @@
apiVersion: v1
kind: Service
metadata:
name: jaeger
labels:
app: jaeger
spec:
ports:
- name: ui
port: 16686
targetPort: 16686
nodePort: 31686 # Jaeger UI浏览器访问 http://<NodeIP>:31686
- name: collector-http
port: 14268
targetPort: 14268
nodePort: 31268 # Jaeger 原生 HTTP collector非 OTel
- name: otlp-http
port: 4318
targetPort: 4318
nodePort: 31318 # OTLP HTTP集群外使用 <NodeIP>:31318
- name: otlp-grpc
port: 4317
targetPort: 4317
nodePort: 31317 # OTLP gRPC集群外使用 <NodeIP>:31317
selector:
app: jaeger
type: NodePort

View File

@ -0,0 +1,49 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: loki-config
namespace: default
data:
loki.yaml: |
auth_enabled: false
server:
http_listen_port: 3100
ingester:
wal:
enabled: true
dir: /loki/wal # 指向 PVC 挂载路径,避免在容器根目录创建 /wal 时 permission denied
lifecycler:
ring:
kvstore:
store: inmemory
replication_factor: 1
chunk_idle_period: 5m
chunk_retain_period: 30s
schema_config:
configs:
- from: 2024-01-01
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
storage_config:
boltdb_shipper:
active_index_directory: /loki/index
cache_location: /loki/cache
shared_store: filesystem
filesystem:
directory: /loki/chunks
limits_config:
reject_old_samples: true
reject_old_samples_max_age: 168h
compactor:
working_directory: /loki/compactor
shared_store: filesystem

View File

@ -0,0 +1,45 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: loki
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: loki
template:
metadata:
labels:
app: loki
spec:
securityContext:
fsGroup: 10001 # 使 PVC 挂载目录对 Loki 默认用户UID 10001可写
runAsUser: 10001
runAsGroup: 10001
containers:
- name: loki
image: grafana/loki:2.9.4
args:
- -config.file=/etc/loki/loki.yaml
ports:
- containerPort: 3100
volumeMounts:
- name: config
mountPath: /etc/loki
- name: storage
mountPath: /loki
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 128Mi
volumes:
- name: config
configMap:
name: loki-config
- name: storage
persistentVolumeClaim:
claimName: loki-pvc

11
deploy/k8s/loki-pvc.yaml Normal file
View File

@ -0,0 +1,11 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: loki-pvc
namespace: default
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi

View File

@ -0,0 +1,14 @@
apiVersion: v1
kind: Service
metadata:
name: loki
namespace: default
spec:
ports:
- name: http
port: 3100
targetPort: 3100
nodePort: 31100 # 集群外访问: http://<NodeIP>:31100
selector:
app: loki
type: NodePort

View File

@ -0,0 +1,14 @@
#!/bin/sh
# Create the modelrt client certificate secret.
# Run this script from the directory that contains the three cert files,
# or adjust the paths below to point at the actual files.
#
# Expected files (generated during RabbitMQ TLS setup):
# ca_certificate.pem
# modelrt_client_cert.pem
# modelrt_client_key.pem
kubectl create secret generic modelrt-certs \
--from-file=ca_certificate.pem=./ca_certificate.pem \
--from-file=modelrt_client_cert.pem=./modelrt_client_cert.pem \
--from-file=modelrt_client_key.pem=./modelrt_client_key.pem

View File

@ -0,0 +1,86 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: modelrt-config
data:
config.yaml: |
postgres:
host: "192.168.1.101"
port: 5432
database: "demo"
user: "postgres"
password: "" # injected via env POSTGRES_PASSWORD
rabbitmq:
ca_cert_path: "/app/configs/certs/ca_certificate.pem"
client_key_path: "/app/configs/certs/modelrt_client_key.pem"
client_key_password: ""
client_cert_path: "/app/configs/certs/modelrt_client_cert.pem"
insecure_skip_verify: false
server_name: "rabbitmq-server"
user: ""
password: ""
host: "rabbitmq-service"
port: 5671
logger:
mode: "production"
level: "info"
filepath: ""
maxsize: 100
maxbackups: 5
maxage: 30
compress: false
loki:
endpoint: "" # Promtail handles log collection in K8s, direct push disabled
otel:
endpoint: "jaeger:4318"
insecure: true
ants:
parse_concurrent_quantity: 10
rtd_receive_concurrent_quantity: 10
async_task:
worker_pool_size: 10
queue_consumer_count: 2
max_retry_count: 3
retry_initial_delay: 1s
retry_max_delay: 5m
health_check_interval: 30s
locker_redis:
addr: "redis-service:6379"
password: ""
db: 1
poolsize: 50
dial_timeout: 10
read_timeout: 10
write_timeout: 10
storage_redis:
addr: "redis-service:6379"
password: ""
db: 0
poolsize: 50
dial_timeout: 10
read_timeout: 10
write_timeout: 10
base:
grid_id: 1
zone_id: 1
station_id: 1
service:
service_addr: ":8080"
service_name: "modelRT"
secret_key: "" # injected via env SERVICE_SECRET_KEY
deploy_env: "production"
dataRT:
host: "http://127.0.0.1"
port: 8888
polling_api: "datart/getPointData"
polling_api_method: "GET"

View File

@ -0,0 +1,90 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: modelrt
labels:
app: modelrt
spec:
replicas: 1
selector:
matchLabels:
app: modelrt
template:
metadata:
labels:
app: modelrt
spec:
containers:
- name: modelrt
image: coslight/modelrt:latest
imagePullPolicy: IfNotPresent
args:
- "-modelRT_config_dir=/app/configs"
- "-modelRT_config_name=config"
- "-modelRT_config_type=yaml"
ports:
- containerPort: 8080
env:
# Downward API — injected into every log line by logger/zap.go containerFields()
- name: K8S_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
# HOSTNAME is set automatically by K8s to the pod name
# Sensitive values injected from Secret so they stay out of ConfigMap
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: modelrt-secret
key: postgres-password
- name: SERVICE_SECRET_KEY
valueFrom:
secretKeyRef:
name: modelrt-secret
key: secret-key
volumeMounts:
- name: config
mountPath: /app/configs/config.yaml
subPath: config.yaml
readOnly: true
- name: certs
mountPath: /app/configs/certs
readOnly: true
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
securityContext:
runAsUser: 1000
runAsNonRoot: true
readOnlyRootFilesystem: true
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
livenessProbe:
tcpSocket:
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
failureThreshold: 3
readinessProbe:
tcpSocket:
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 3
volumes:
- name: config
configMap:
name: modelrt-config
- name: certs
secret:
secretName: modelrt-certs

View File

@ -0,0 +1,8 @@
apiVersion: v1
kind: Secret
metadata:
name: modelrt-secret
type: Opaque
stringData:
postgres-password: "coslight"
secret-key: "modelrt_key"

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: modelrt-service
labels:
app: modelrt
spec:
type: NodePort
selector:
app: modelrt
ports:
- name: http
port: 8080
targetPort: 8080
nodePort: 30080

View File

@ -0,0 +1,52 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: promtail-config
namespace: default
data:
promtail.yaml: |
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: kubernetes-pods
kubernetes_sd_configs:
- role: pod
pipeline_stages:
# 解析 zap 输出的 JSON 日志,提取结构化字段
- json:
expressions:
level: level
traceID: traceID
spanID: spanID
caller: caller
pod: pod
namespace: namespace
node: node
# 将关键字段提升为 Loki Label,支持在 Grafana 中按实例/Trace 过滤
- labels:
level:
traceID:
pod:
namespace:
node:
relabel_configs:
- source_labels: [__meta_kubernetes_namespace]
target_label: namespace
- source_labels: [__meta_kubernetes_pod_name]
target_label: pod
- source_labels: [__meta_kubernetes_pod_container_name]
target_label: container
- source_labels: [__meta_kubernetes_pod_label_app]
target_label: app
# 只采集有 app label 的 Pod
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: .+

View File

@ -0,0 +1,51 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: promtail
namespace: default
spec:
selector:
matchLabels:
app: promtail
template:
metadata:
labels:
app: promtail
spec:
serviceAccountName: promtail
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
containers:
- name: promtail
image: grafana/promtail:2.9.4
args:
- -config.file=/etc/promtail/promtail.yaml
ports:
- containerPort: 9080
volumeMounts:
- name: config
mountPath: /etc/promtail
- name: varlog
mountPath: /var/log
readOnly: true
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
resources:
limits:
cpu: 200m
memory: 128Mi
requests:
cpu: 50m
memory: 64Mi
volumes:
- name: config
configMap:
name: promtail-config
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers

View File

@ -0,0 +1,27 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: promtail
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: promtail
rules:
- apiGroups: [""]
resources: ["nodes", "nodes/proxy", "services", "endpoints", "pods"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: promtail
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: promtail
subjects:
- kind: ServiceAccount
name: promtail
namespace: default

View File

@ -0,0 +1,33 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbitmq-config
data:
rabbitmq.conf: |
# 确保允许PLAIN认证
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN
auth_mechanisms.3 = EXTERNAL
# 允许admin用户通过远程方式连接
loopback_users.admin = false
# 默认心跳和监听配置可在此扩展
# 确定 ssl 连接时验证使用的用户名
ssl_cert_login_from = common_name
# 开启此项配置会导致只能通过TLS端口访问
listeners.tcp = none
listeners.ssl.default = 5671
# default user config
load_definitions = /etc/rabbitmq/definitions.json
# ssl config
ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem
ssl_options.certfile = /etc/rabbitmq/certs/server_certificate.pem
ssl_options.keyfile = /etc/rabbitmq/certs/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# management config
management.ssl.port = 15671
management.ssl.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem
management.ssl.certfile = /etc/rabbitmq/certs/server_certificate.pem
management.ssl.keyfile = /etc/rabbitmq/certs/server_key.pem
management.ssl.verify = verify_peer
management.ssl.fail_if_no_peer_cert = true

View File

@ -0,0 +1,81 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: eventrt-rabbitmq
spec:
replicas: 1
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: rabbitmq:4.1.1-management-alpine
ports:
- containerPort: 4369
- containerPort: 5671
- containerPort: 5672 # AMQP
- containerPort: 15671
- containerPort: 15672 # Management UI
- containerPort: 15691
- containerPort: 15692
- containerPort: 25672
env:
- name: RABBITMQ_DEFAULT_USER
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: rabbitmq-user
- name: RABBITMQ_DEFAULT_PASS
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: rabbitmq-pass
- name: RABBITMQ_ERLANG_COOKIE
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: erlang-cookie
- name: RABBITMQ_DEFAULT_VHOST
value: "/"
volumeMounts:
- name: rabbitmq-certs-volume
mountPath: /etc/rabbitmq/certs
readOnly: true
- name: rabbitmq-config-volume
mountPath: /etc/rabbitmq/rabbitmq.conf
subPath: rabbitmq.conf
- name: rabbitmq-config-volume
mountPath: /etc/rabbitmq/advanced.config
subPath: advanced.config
readOnly: true
- name: plugins-config-volume
mountPath: /etc/rabbitmq/enabled_plugins
subPath: enabled_plugins
- name: users-config-volume
mountPath: /etc/rabbitmq/definitions.json
subPath: definitions.json
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
volumes:
- name: rabbitmq-certs-volume
secret:
secretName: rabbitmq-certs
- name: rabbitmq-config-volume
configMap:
name: rabbitmq-config
- name: rabbitmq-advanced-config-volume
configMap:
name: rabbitmq-config
- name: plugins-config-volume
configMap:
name: rabbit-plugins-conf
- name: users-config-volume
configMap:
name: rabbitmq-users-definitions
- name: rabbitmq-data
emptyDir: {}

View File

@ -0,0 +1,9 @@
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-secret
type: Opaque
stringData:
rabbitmq-user: "coslight"
rabbitmq-pass: "coslight@tj"
erlang-cookie: "secret-erlang-cookie"

View File

@ -0,0 +1,29 @@
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-service
spec:
type: NodePort # 在 Minikube 中使用 NodePort 方便外部访问
selector:
app: rabbitmq
ports:
- name: amqp-ssl
protocol: TCP
port: 5671
targetPort: 5671
nodePort: 30671
- name: amqp
protocol: TCP
port: 5672
targetPort: 5672
nodePort: 30672
- name: management-ssl
protocol: TCP
port: 15671
targetPort: 15671
nodePort: 31671
- name: management
protocol: TCP
port: 15672
targetPort: 15672
nodePort: 31672

View File

@ -0,0 +1,77 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbitmq-users-definitions
data:
definitions.json: |
{
"users": [
{
"name": "coslight",
"password_hash": "Gl2XVEJwPwDZQF8ZhsYnvm83wMkdftY3/raxyntdZueyx/Uv",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": ["administrator"]
},
{
"name": "web-client",
"password_hash": "",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": ["management"]
},
{
"name": "modelrt-client",
"password_hash": "",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": ["management"]
},
{
"name": "eventrt-client",
"password_hash": "",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": ["management"]
}
],
"vhosts": [ { "name": "/" } ],
"permissions": [
{
"user": "coslight",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "web-client",
"vhost": "/",
"configure": "^$",
"write": ".*",
"read": ".*"
},
{
"user": "modelrt-client",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "eventrt-client",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"topic_permissions": [],
"parameters": [],
"global_parameters": [
{
"name": "cluster_name",
"value": "evnetrt-rabbitmq-cluster"
}
],
"policies": [],
"queues": [],
"exchanges": [],
"bindings": []
}

View File

@ -0,0 +1,23 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis/redis-stack-server:latest
resources:
limits:
memory: "128Mi"
cpu: "500m"
ports:
- containerPort: 6379

View File

@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: redis-service
spec:
type: NodePort
selector:
app: redis
ports:
- port: 6379
targetPort: 6379
nodePort: 30001

View File

@ -7,6 +7,8 @@ import (
"modelRT/constants"
"modelRT/database"
"modelRT/logger"
"modelRT/mq"
"modelRT/mq/event"
"modelRT/orm"
"github.com/gin-gonic/gin"
@ -74,6 +76,10 @@ func AsyncTaskCancelHandler(c *gin.Context) {
return
}
if record, evtErr := event.NewTaskCancelledMessage(taskID.String(), string(asyncTask.TaskType)); evtErr == nil {
mq.TryEmitMessage(ctx, record)
}
err = database.UpdateAsyncTaskResultWithError(ctx, pgClient, taskID, 40009, "task cancelled by user", orm.JSONMap{
"cancelled_at": timestamp,
"cancelled_by": "user",

View File

@ -5,6 +5,8 @@ import (
"modelRT/constants"
"modelRT/database"
"modelRT/logger"
"modelRT/mq"
"modelRT/mq/event"
"modelRT/network"
"modelRT/orm"
"modelRT/task"
@ -77,6 +79,10 @@ func AsyncTaskCreateHandler(c *gin.Context) {
task.TaskMsgChan <- msg
logger.Info(ctx, "task enqueued to channel", "task_id", asyncTask.TaskID, "queue", constants.TaskQueueName)
if record, err := event.NewTaskSubmittedMessage(asyncTask.TaskID.String(), request.TaskType, 5); err == nil {
mq.TryEmitMessage(ctx, record)
}
logger.Info(ctx, "async task created success", "task_id", asyncTask.TaskID, "task_type", request.TaskType)
// return success response

View File

@ -188,6 +188,8 @@ func main() {
// async push event to rabbitMQ
go mq.PushUpDownLimitEventToRabbitMQ(ctx, mq.MsgChan)
// async push message (task state changes etc.) to rabbitMQ
go mq.PushMessageToRabbitMQ(ctx, mq.MessageMsgChan)
// async push task message to rabbitMQ
go task.PushTaskToRabbitMQ(ctx, modelRTConfig.RabbitMQConfig, task.TaskMsgChan)

28
mq/emit.go Normal file
View File

@ -0,0 +1,28 @@
// Package mq provides read or write access to message queue services
package mq
import (
"context"
"modelRT/logger"
"modelRT/mq/event"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
// TryEmitMessage pushes a message record into MessageMsgChan non-blocking.
// If the channel is full the message is dropped and a warning is logged.
func TryEmitMessage(ctx context.Context, record *event.EventRecord) {
carrier := make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier))
msg := &EventMessage{Record: record, TraceCarrier: carrier}
select {
case MessageMsgChan <- msg:
default:
logger.Warn(ctx, "message channel full, message dropped",
"event_uuid", record.EventUUID,
"category", record.Category,
)
}
}

View File

@ -0,0 +1,81 @@
// Package event define real time data evnet operation functions
package event
import (
"modelRT/constants"
)
// NewTaskSubmittedMessage creates a message record for when a task is submitted to the queue
func NewTaskSubmittedMessage(taskID, taskType string, priority int) (*EventRecord, error) {
return NewPlatformEventRecord(
int(constants.EventGeneralPlatformSoft),
0,
"async_task_submitted",
WithCategory(constants.MessageTaskSubmittedCategory),
WithCondition(map[string]any{
"task_id": taskID,
"task_type": taskType,
"priority": priority,
}),
)
}
// NewTaskRunningMessage creates a message record for when a task begins execution
func NewTaskRunningMessage(taskID, taskType string) (*EventRecord, error) {
return NewPlatformEventRecord(
int(constants.EventGeneralPlatformSoft),
0,
"async_task_running",
WithCategory(constants.MessageTaskRunningCategory),
WithCondition(map[string]any{
"task_id": taskID,
"task_type": taskType,
}),
)
}
// NewTaskCompletedMessage creates a message record for when a task finishes successfully
func NewTaskCompletedMessage(taskID, taskType string, executionMs int64) (*EventRecord, error) {
return NewPlatformEventRecord(
int(constants.EventGeneralPlatformSoft),
0,
"async_task_completed",
WithCategory(constants.MessageTaskCompletedCategory),
WithCondition(map[string]any{
"task_id": taskID,
"task_type": taskType,
}),
WithResult(map[string]any{
"execution_ms": executionMs,
}),
)
}
// NewTaskFailedMessage creates a message record for when a task fails during execution
func NewTaskFailedMessage(taskID, taskType, reason string) (*EventRecord, error) {
return NewPlatformEventRecord(
int(constants.EventGeneralPlatformSoft),
0,
"async_task_failed",
WithCategory(constants.MessageTaskFailedCategory),
WithCondition(map[string]any{
"task_id": taskID,
"task_type": taskType,
"reason": reason,
}),
)
}
// NewTaskCancelledMessage creates a message record for when a task is cancelled by a user
func NewTaskCancelledMessage(taskID, taskType string) (*EventRecord, error) {
return NewPlatformEventRecord(
int(constants.EventGeneralPlatformSoft),
0,
"async_task_cancelled",
WithCategory(constants.MessageTaskCancelledCategory),
WithCondition(map[string]any{
"task_id": taskID,
"task_type": taskType,
}),
)
}

149
mq/publish_message.go Normal file
View File

@ -0,0 +1,149 @@
// Package mq provides read or write access to message queue services
package mq
import (
"context"
"encoding/json"
"time"
"modelRT/constants"
"modelRT/logger"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
// MessageMsgChan buffers message records to be published to the message exchange asynchronously
var MessageMsgChan chan *EventMessage
func init() {
MessageMsgChan = make(chan *EventMessage, 10000)
}
func initMessageChannel(ctx context.Context) (*amqp.Channel, error) {
channel, err := GetConn().Channel()
if err != nil {
logger.Error(ctx, "open rabbitMQ server channel failed", "error", err)
return nil, err
}
err = channel.ExchangeDeclare(constants.MessageDeadExchangeName, "topic", true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare message dead letter exchange failed", "error", err)
return nil, err
}
_, err = channel.QueueDeclare(constants.MessageDeadQueueName, true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare message dead letter queue failed", "error", err)
return nil, err
}
err = channel.QueueBind(constants.MessageDeadQueueName, constants.MessageDeadRoutingKey, constants.MessageDeadExchangeName, false, nil)
if err != nil {
logger.Error(ctx, "bind message dead letter queue failed", "error", err)
return nil, err
}
err = channel.ExchangeDeclare(constants.MessageExchangeName, "topic", true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare message exchange failed", "error", err)
return nil, err
}
args := amqp.Table{
"x-dead-letter-exchange": constants.MessageDeadExchangeName,
"x-dead-letter-routing-key": constants.MessageDeadRoutingKey,
}
_, err = channel.QueueDeclare(constants.MessageQueueName, true, false, false, false, args)
if err != nil {
logger.Error(ctx, "declare message queue failed", "error", err)
return nil, err
}
err = channel.QueueBind(constants.MessageQueueName, constants.MessageRoutingKey, constants.MessageExchangeName, false, nil)
if err != nil {
logger.Error(ctx, "bind message queue failed", "error", err)
return nil, err
}
if err := channel.Confirm(false); err != nil {
logger.Error(ctx, "channel could not be put into confirm mode", "error", err)
return nil, err
}
return channel, nil
}
// PushMessageToRabbitMQ publishes message records from msgChan to the message exchange.
// The category of each record is used as the routing key so consumers can bind selectively.
func PushMessageToRabbitMQ(ctx context.Context, msgChan chan *EventMessage) {
channel, err := initMessageChannel(ctx)
if err != nil {
logger.Error(ctx, "initializing message rabbitMQ channel failed", "error", err)
return
}
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100))
go func() {
for {
select {
case confirm, ok := <-confirms:
if !ok {
return
}
if !confirm.Ack {
logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag)
}
case <-ctx.Done():
return
}
}
}()
for {
select {
case <-ctx.Done():
logger.Info(ctx, "push message to rabbitMQ stopped by context cancel")
channel.Close()
return
case msg, ok := <-msgChan:
if !ok {
logger.Info(ctx, "push message to rabbitMQ stopped by msgChan closed")
channel.Close()
return
}
record := msg.Record
recordBytes, err := json.Marshal(record)
if err != nil {
logger.Error(ctx, "marshal message record failed", "event_uuid", record.EventUUID, "error", err)
continue
}
msgCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier))
headers := amqp.Table{}
otel.GetTextMapPropagator().Inject(msgCtx, amqpHeaderCarrier(headers))
routingKey := record.Category
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
err = channel.PublishWithContext(pubCtx,
constants.MessageExchangeName,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: recordBytes,
Headers: headers,
})
cancel()
if err != nil {
logger.Error(ctx, "publish message to rabbitMQ failed", "routing_key", routingKey, "error", err)
}
}
}
}

View File

@ -13,6 +13,7 @@ import (
"modelRT/database"
"modelRT/logger"
"modelRT/mq"
"modelRT/mq/event"
"modelRT/orm"
"github.com/gofrs/uuid"
@ -314,6 +315,10 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
return
}
if record, err := event.NewTaskRunningMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType)); err == nil {
mq.TryEmitMessage(ctx, record)
}
// Execute task using handler
startTime := time.Now()
err := w.dispatch(ctx, taskMsg.TaskType, taskMsg.TaskID, taskMsg.Params, &msg)
@ -332,6 +337,10 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
logger.Error(ctx, "Failed to update task with error", "error", updateErr)
}
if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil {
mq.TryEmitMessage(ctx, record)
}
// Ack message even if task failed (we don't want to retry indefinitely)
msg.Ack(false)
w.metrics.mu.Lock()
@ -347,6 +356,10 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
// Still ack the message since task was processed successfully
}
if record, err := event.NewTaskCompletedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), processingTime.Milliseconds()); err == nil {
mq.TryEmitMessage(ctx, record)
}
// Acknowledge message
msg.Ack(false)