From 42956d17935429df5560ef2921fe38d7d5624ee4 Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 13 May 2026 16:58:36 +0800 Subject: [PATCH] feat: add dedicated message-exchange for task lifecycle notifications - add constants/message.go with MessageTask* categories and message-exchange / message-queue / dead-letter routing constants - add mq/publish_message.go with PushMessageToRabbitMQ (confirm mode, dead-letter queue) separate from the existing event-exchange publisher - add mq/emit.go with TryEmitMessage for non-blocking, OTel-traced dispatch - add mq/event/task_event_gen.go with NewTaskSubmitted/Running/Completed/ Failed/CancelledMessage constructors - wire TryEmitMessage into task worker and create/cancel handlers so all 5 lifecycle transitions are published (previously task.* routed to event-exchange with no matching binding, causing silent drops) - harden Dockerfile: scratch final image, pinned alpine:3.21 certs stage, apk upgrade in builder, add -trimpath -mod=readonly go build flags - add full K8s manifests under deploy/k8s/ for Redis, RabbitMQ (mTLS), ModelRT (Downward API, scratch image, readOnlyRootFilesystem), Jaeger, Loki, Promtail, Grafana - expand deploy.md with async_task SQL schema, TLS cert generation steps, K8s deployment procedures, and SSH tunnel configuration --- constants/event.go | 7 + constants/message.go | 33 ++ deploy/deploy.md | 506 +++++++++++++++++- deploy/dockerfile/modelrt.Dockerfile | 34 +- deploy/k8s/grafana-configmap.yaml | 26 + deploy/k8s/grafana-deployment.yaml | 41 ++ deploy/k8s/grafana-service.yaml | 14 + .../jaeger-deployment.yaml} | 28 - deploy/k8s/jaeger-service.yaml | 27 + deploy/k8s/loki-configmap.yaml | 49 ++ deploy/k8s/loki-deployment.yaml | 45 ++ deploy/k8s/loki-pvc.yaml | 11 + deploy/k8s/loki-service.yaml | 14 + deploy/k8s/modelrt-certs-secret.sh | 14 + deploy/k8s/modelrt-configmap.yaml | 86 +++ deploy/k8s/modelrt-deployment.yaml | 90 ++++ deploy/k8s/modelrt-secret.yaml | 8 + deploy/k8s/modelrt-service.yaml | 15 + deploy/k8s/promtail-configmap.yaml | 52 ++ deploy/k8s/promtail-daemonset.yaml | 51 ++ deploy/k8s/promtail-rbac.yaml | 27 + deploy/k8s/rabbitmq-config.yaml | 33 ++ deploy/k8s/rabbitmq-deployment.yaml | 81 +++ deploy/k8s/rabbitmq-secret.yaml | 9 + deploy/k8s/rabbitmq-service.yaml | 29 + deploy/k8s/rabbitmq-users-config.yaml | 77 +++ deploy/k8s/redis-deployment.yaml | 23 + deploy/k8s/redis-service.yaml | 13 + handler/async_task_cancel_handler.go | 6 + handler/async_task_create_handler.go | 6 + main.go | 2 + mq/emit.go | 28 + mq/event/task_event_gen.go | 81 +++ mq/publish_message.go | 149 ++++++ task/worker.go | 13 + 35 files changed, 1688 insertions(+), 40 deletions(-) create mode 100644 constants/message.go create mode 100644 deploy/k8s/grafana-configmap.yaml create mode 100644 deploy/k8s/grafana-deployment.yaml create mode 100644 deploy/k8s/grafana-service.yaml rename deploy/{jaeger.yaml => k8s/jaeger-deployment.yaml} (52%) create mode 100644 deploy/k8s/jaeger-service.yaml create mode 100644 deploy/k8s/loki-configmap.yaml create mode 100644 deploy/k8s/loki-deployment.yaml create mode 100644 deploy/k8s/loki-pvc.yaml create mode 100644 deploy/k8s/loki-service.yaml create mode 100644 deploy/k8s/modelrt-certs-secret.sh create mode 100644 deploy/k8s/modelrt-configmap.yaml create mode 100644 deploy/k8s/modelrt-deployment.yaml create mode 100644 deploy/k8s/modelrt-secret.yaml create mode 100644 deploy/k8s/modelrt-service.yaml create mode 100644 deploy/k8s/promtail-configmap.yaml create mode 100644 deploy/k8s/promtail-daemonset.yaml create mode 100644 deploy/k8s/promtail-rbac.yaml create mode 100644 deploy/k8s/rabbitmq-config.yaml create mode 100644 deploy/k8s/rabbitmq-deployment.yaml create mode 100644 deploy/k8s/rabbitmq-secret.yaml create mode 100644 deploy/k8s/rabbitmq-service.yaml create mode 100644 deploy/k8s/rabbitmq-users-config.yaml create mode 100644 deploy/k8s/redis-deployment.yaml create mode 100644 deploy/k8s/redis-service.yaml create mode 100644 mq/emit.go create mode 100644 mq/event/task_event_gen.go create mode 100644 mq/publish_message.go diff --git a/constants/event.go b/constants/event.go index 11dbf29..293ca25 100644 --- a/constants/event.go +++ b/constants/event.go @@ -88,3 +88,10 @@ const ( // EventCriticalUpDownLimitCategroy define category for critical up and down limit event EventCriticalUpDownLimitCategroy = "event.critical.updown.limit" ) + +const ( + // EventTaskGeneralTestCategory define category for test task event + EventTaskGeneralTestCategory = "event.general.task.test" + // EventTaskGeneralTopologyAnalyzeCategory define category for topology analyze task event + EventTaskGeneralTopologyAnalyzeCategory = "event.general.task.topology_analyze" +) diff --git a/constants/message.go b/constants/message.go new file mode 100644 index 0000000..7b6fb1f --- /dev/null +++ b/constants/message.go @@ -0,0 +1,33 @@ +// Package constants define constant variable +package constants + +const ( + // MessageExchangeName define exchange name for message + MessageExchangeName = "message-exchange" + // MessageDeadExchangeName define dead letter exchange name for message + MessageDeadExchangeName = "message-dead-letter-exchange" +) + +const ( + // MessageRoutingKey define binding routing key pattern for the message queue (matches all message.* categories) + MessageRoutingKey = "message.#" + // MessageDeadRoutingKey define binding routing key for the message dead letter queue + MessageDeadRoutingKey = "#" + // MessageQueueName define queue name for message + MessageQueueName = "message-queue" + // MessageDeadQueueName define dead letter queue name for message + MessageDeadQueueName = "message-dead-letter-queue" +) + +const ( + // MessageTaskSubmittedCategory define category for task submitted message + MessageTaskSubmittedCategory = "message.task.submitted" + // MessageTaskRunningCategory define category for task running message + MessageTaskRunningCategory = "message.task.running" + // MessageTaskCompletedCategory define category for task completed message + MessageTaskCompletedCategory = "message.task.completed" + // MessageTaskFailedCategory define category for task failed message + MessageTaskFailedCategory = "message.task.failed" + // MessageTaskCancelledCategory define category for task cancelled message + MessageTaskCancelledCategory = "message.task.cancelled" +) diff --git a/deploy/deploy.md b/deploy/deploy.md index 0080b23..8930c91 100644 --- a/deploy/deploy.md +++ b/deploy/deploy.md @@ -45,6 +45,68 @@ docker ps -a |grep postgres docker logs postgres ``` +#### 1.4 初始化异步任务表 + +$\text{PostgreSQL}$ 启动后执行以下建表语句,创建异步任务系统所需的两张表: + +```sql +-- ========================================== +-- 表: async_task +-- 说明: 存储异步任务的生命周期跟踪信息 +-- ========================================== +CREATE TABLE IF NOT EXISTS async_task ( + task_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + task_type VARCHAR(50) NOT NULL, + status VARCHAR(20) NOT NULL, + params JSONB, + created_at BIGINT NOT NULL, + finished_at BIGINT, + started_at BIGINT, + execution_time BIGINT, + progress INTEGER, + retry_count INTEGER DEFAULT 0, + max_retry_count INTEGER DEFAULT 3, + next_retry_time BIGINT, + retry_delay INTEGER DEFAULT 5000, + priority INTEGER DEFAULT 5, + queue_name VARCHAR(100) DEFAULT 'default', + worker_id VARCHAR(50), + failure_reason TEXT, + stack_trace TEXT, + created_by VARCHAR(100) +); + +CREATE INDEX IF NOT EXISTS idx_async_task_task_type ON async_task(task_type); +CREATE INDEX IF NOT EXISTS idx_async_task_status ON async_task(status); +CREATE INDEX IF NOT EXISTS idx_async_task_created_at ON async_task(created_at); +CREATE INDEX IF NOT EXISTS idx_async_task_finished_at ON async_task(finished_at); +CREATE INDEX IF NOT EXISTS idx_async_task_started_at ON async_task(started_at); +CREATE INDEX IF NOT EXISTS idx_async_task_next_retry_time ON async_task(next_retry_time); +CREATE INDEX IF NOT EXISTS idx_async_task_priority ON async_task(priority); +CREATE INDEX IF NOT EXISTS idx_async_task_status_retry ON async_task(status, next_retry_time) + WHERE status = 'FAILED' AND next_retry_time IS NOT NULL; + +-- ========================================== +-- 表: async_task_result +-- 说明: 存储异步任务的执行结果 +-- ========================================== +CREATE TABLE IF NOT EXISTS async_task_result ( + task_id UUID PRIMARY KEY, + result JSONB, + error_code INTEGER, + error_message TEXT, + error_detail JSONB, + execution_time BIGINT NOT NULL DEFAULT 0, + memory_usage BIGINT, + cpu_usage DOUBLE PRECISION, + retry_count INTEGER DEFAULT 0, + completed_at BIGINT NOT NULL +); + +COMMENT ON TABLE async_task IS '异步任务生命周期跟踪表'; +COMMENT ON TABLE async_task_result IS '异步任务执行结果表'; +``` + ### 2\. 部署 Redis Stack Server 我们将使用 `redis/redis-stack-server:latest` 镜像该镜像内置了 $\text{Redisearch}$ 模块,用于 $\text{ModelRT}$ 项目中补全功能 @@ -401,15 +463,453 @@ go build -o model-rt main.go 在发现控制台输出如下信息`starting ModelRT server` 后即代表服务启动成功 -### 4\. 后续操作(停止与清理) +### 4\. 部署基础依赖(Kubernetes) -#### 4.1 停止容器 +Redis 和 RabbitMQ 部署在 Minikube 中,YAML 文件位于 `deploy/k8s/`。RabbitMQ 启用双向 TLS(mTLS),客户端以 X.509 证书的 CN 字段作为用户名进行认证。 + +#### 4.1 部署 Redis + +```bash +kubectl apply -f deploy/k8s/redis-deployment.yaml +kubectl apply -f deploy/k8s/redis-service.yaml +``` + +| 参数 | 值 | 说明 | +| :--- | :--- | :--- | +| **镜像** | `redis/redis-stack-server:latest` | 内置 Redisearch 模块 | +| **NodePort** | `30001` | 集群外访问端口 | + +#### 4.2 RabbitMQ TLS 证书生成 + +RabbitMQ 配置为仅允许 TLS 连接(`listeners.tcp = none`),所有客户端须持有由同一 CA 签发的证书。 + +##### 4.2.1 生成根 CA + +```bash +# 克隆 tls-gen 工具 +git clone https://github.com/rabbitmq/tls-gen.git +cd tls-gen/basic + +# 生成根 CA(结果在 result/ 目录) +make CN=rabbitmq-server +# ca_certificate.pem 和 ca_key.pem 生成于 result/ +``` + +##### 4.2.2 生成服务器证书 + +服务器证书需包含 SAN(Subject Alternative Name),使其同时匹配集群内 DNS 和 Minikube IP。 + +创建 `server.cnf`: + +```text +[req] +distinguished_name = req_distinguished_name +prompt = no + +[req_distinguished_name] +C = CN +ST = Beijing +L = Beijing +O = coslight +CN = rabbitmq-server + +[v3_server] +keyUsage = critical, digitalSignature, keyEncipherment +extendedKeyUsage = serverAuth, clientAuth +subjectAltName = @alt_names + +[alt_names] +DNS.1 = rabbitmq-server +DNS.2 = rabbitmq-service.default.svc.cluster.local +DNS.3 = localhost +IP.1 = 192.168.49.2 +IP.2 = 127.0.0.1 +``` + +生成证书: + +```bash +# 将 ca_certificate.pem 和 ca_key.pem(即 cakey.pem)放在当前目录 +openssl genrsa -out server_key.pem 2048 + +openssl req -new -key server_key.pem -out server_cert.csr -config server.cnf + +openssl x509 -req -in server_cert.csr \ + -CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \ + -out server_certificate.pem -days 730 -sha256 \ + -extfile server.cnf -extensions v3_server + +rm server_cert.csr +``` + +##### 4.2.3 生成 ModelRT 客户端证书 + +CN 必须与 RabbitMQ 中注册的用户名一致(`modelrt-client`)。 + +创建 `modelrt.cnf`: + +```text +[req] +distinguished_name = req_distinguished_name +prompt = no + +[req_distinguished_name] +C = CN +ST = Beijing +L = Beijing +O = coslight +CN = modelrt-client + +[v3_client] +keyUsage = critical, digitalSignature, keyEncipherment +extendedKeyUsage = clientAuth +``` + +生成证书: + +```bash +openssl genrsa -out modelrt_client_key.pem 2048 + +openssl req -new -key modelrt_client_key.pem \ + -out modelrt_client.csr -config modelrt.cnf + +openssl x509 -req -in modelrt_client.csr \ + -CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \ + -out modelrt_client_cert.pem -days 365 \ + -extensions v3_client -extfile modelrt.cnf + +rm modelrt_client.csr +``` + +##### 4.2.4 生成 EventRT 客户端证书 + +创建 `eventrt.cnf`(CN 改为 `eventrt-client`): + +```text +[req] +distinguished_name = req_distinguished_name +prompt = no + +[req_distinguished_name] +C = CN +ST = Beijing +L = Beijing +O = coslight +CN = eventrt-client + +[v3_client] +keyUsage = critical, digitalSignature, keyEncipherment +extendedKeyUsage = clientAuth +``` + +生成证书: + +```bash +openssl genrsa -out eventrt_client_key.pem 2048 + +openssl req -new -key eventrt_client_key.pem \ + -out eventrt_client.csr -config eventrt.cnf + +openssl x509 -req -in eventrt_client.csr \ + -CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \ + -out eventrt_client_cert.pem -days 365 \ + -extensions v3_client -extfile eventrt.cnf + +rm eventrt_client.csr +``` + +##### 4.2.5 验证证书 + +```bash +# 验证服务器证书 +openssl verify -CAfile ca_certificate.pem server_certificate.pem + +# 验证客户端证书 +openssl verify -CAfile ca_certificate.pem modelrt_client_cert.pem +openssl verify -CAfile ca_certificate.pem eventrt_client_cert.pem + +# 查看证书详情(确认 CN 和 SAN) +openssl x509 -in server_certificate.pem -noout -subject -ext subjectAltName +openssl x509 -in modelrt_client_cert.pem -noout -subject +openssl x509 -in eventrt_client_cert.pem -noout -subject +``` + +#### 4.3 部署 RabbitMQ + +##### 4.3.1 创建证书 Secret + +将服务器端三个证书文件打包为 K8s Secret(在证书文件所在目录执行): + +```bash +kubectl create secret generic rabbitmq-certs \ + --from-file=ca_certificate.pem=./ca_certificate.pem \ + --from-file=server_certificate.pem=./server_certificate.pem \ + --from-file=server_key.pem=./server_key.pem +``` + +##### 4.3.2 部署 + +```bash +kubectl apply -f deploy/k8s/rabbitmq-secret.yaml +kubectl apply -f deploy/k8s/rabbitmq-config.yaml +kubectl apply -f deploy/k8s/rabbitmq-users-config.yaml +kubectl apply -f deploy/k8s/rabbitmq-deployment.yaml +kubectl apply -f deploy/k8s/rabbitmq-service.yaml +``` + +##### 4.3.3 端口汇总 + +| 端口 | NodePort | 说明 | +| :--- | :--- | :--- | +| `5671` | `30671` | AMQP over TLS(客户端连接) | +| `5672` | `30672` | AMQP 明文(内部备用,生产禁用) | +| `15671` | `31671` | Management UI over TLS | +| `15672` | `31672` | Management UI 明文(内部备用) | + +##### 4.3.4 用户与权限说明 + +用户定义在 `rabbitmq-users-config.yaml` 的 `definitions.json` 中,通过 `load_definitions` 启动时自动加载: + +| 用户 | 认证方式 | 权限 | 说明 | +| :--- | :--- | :--- | :--- | +| `coslight` | 密码 | administrator | 管理员,密码在 rabbitmq-secret.yaml | +| `modelrt-client` | X.509 证书(CN) | configure/read/write | ModelRT 服务专用 | +| `eventrt-client` | X.509 证书(CN) | configure/read/write | EventRT 服务专用 | +| `web-client` | X.509 证书(CN) | read/write | Web 客户端 | + +> **注意:** 证书认证用户的 `password_hash` 留空;RabbitMQ 通过 `ssl_cert_login_from = common_name` 将证书 CN 映射为用户名。 + +### 5\. 部署 ModelRT(Kubernetes) + +所有资源部署在 `default` 命名空间,YAML 文件位于 `deploy/k8s/`。 + +#### 5.1 构建并推送镜像 + +```bash +# 在项目根目录执行 +docker build -f deploy/dockerfile/modelrt.Dockerfile -t coslight/modelrt:latest . + +# 推送到镜像仓库(或直接加载到 Minikube) +minikube image load coslight/modelrt:latest +``` + +#### 5.2 创建客户端证书 Secret + +在 RabbitMQ TLS 证书生成完成后(见 4.2),进入证书文件所在目录执行: + +```bash +sh deploy/k8s/modelrt-certs-secret.sh +``` + +该脚本等价于: + +```bash +kubectl create secret generic modelrt-certs \ + --from-file=ca_certificate.pem=./ca_certificate.pem \ + --from-file=modelrt_client_cert.pem=./modelrt_client_cert.pem \ + --from-file=modelrt_client_key.pem=./modelrt_client_key.pem +``` + +#### 5.3 部署 + +```bash +kubectl apply -f deploy/k8s/modelrt-secret.yaml +kubectl apply -f deploy/k8s/modelrt-configmap.yaml +kubectl apply -f deploy/k8s/modelrt-deployment.yaml +kubectl apply -f deploy/k8s/modelrt-service.yaml +``` + +#### 5.4 配置说明 + +| 配置项 | 方式 | 说明 | +| :--- | :--- | :--- | +| `postgres.password` | Secret `modelrt-secret` | 不写入 ConfigMap | +| `service.secret_key` | Secret `modelrt-secret` | 不写入 ConfigMap | +| RabbitMQ 客户端证书 | Secret `modelrt-certs` | 挂载至 `/app/configs/certs/` | +| `config.yaml` 其余配置 | ConfigMap `modelrt-config` | 所有 host 已替换为 K8s service 名 | +| `K8S_NAMESPACE` / `K8S_NODE_NAME` | Downward API | 注入至日志全局字段 | + +> **注意:** `modelrt-configmap.yaml` 中 `postgres.password` 和 `service.secret_key` 留空,实际值由容器启动时的环境变量 `POSTGRES_PASSWORD` / `SERVICE_SECRET_KEY` 注入,应用需读取这两个环境变量覆盖 config 中的空值。若应用当前仅读取文件配置,可直接将值填入 `modelrt-secret.yaml` 并在 ConfigMap 中引用,或在 ConfigMap 中直接填写。 + +#### 5.5 状态检查 + +```bash +# 查看 Pod 状态 +kubectl get pods -l app=modelrt + +# 查看启动日志 +kubectl logs -l app=modelrt --tail=50 + +# 查看 Service +kubectl get svc modelrt-service +``` + +#### 5.6 端口汇总 + +| NodePort | 说明 | +| :--- | :--- | +| `30080` | ModelRT HTTP API,SSH 隧道本地端口 `8080` | + +#### 5.7 清理 + +```bash +kubectl delete -f deploy/k8s/modelrt-service.yaml \ + -f deploy/k8s/modelrt-deployment.yaml \ + -f deploy/k8s/modelrt-configmap.yaml \ + -f deploy/k8s/modelrt-secret.yaml +kubectl delete secret modelrt-certs +``` + +### 6\. 部署可观测性栈(Kubernetes) + +在 $\text{Kubernetes}$ 集群中部署 $\text{Jaeger}$(链路追踪)+ $\text{Loki + Promtail + Grafana}$(日志可视化)。所有资源部署在 `default` 命名空间,$\text{YAML}$ 文件位于 `deploy/k8s/`。 + +#### 6.1 部署 Jaeger + +```bash +kubectl apply -f deploy/k8s/jaeger-deployment.yaml +kubectl apply -f deploy/k8s/jaeger-service.yaml +``` + +#### 6.2 部署 Loki + +```bash +kubectl apply -f deploy/k8s/loki-configmap.yaml +kubectl apply -f deploy/k8s/loki-pvc.yaml +kubectl apply -f deploy/k8s/loki-deployment.yaml +kubectl apply -f deploy/k8s/loki-service.yaml +``` + +#### 6.3 部署 Promtail + +```bash +kubectl apply -f deploy/k8s/promtail-rbac.yaml +kubectl apply -f deploy/k8s/promtail-configmap.yaml +kubectl apply -f deploy/k8s/promtail-daemonset.yaml +``` + +#### 6.4 部署 Grafana + +```bash +kubectl apply -f deploy/k8s/grafana-configmap.yaml +kubectl apply -f deploy/k8s/grafana-deployment.yaml +kubectl apply -f deploy/k8s/grafana-service.yaml +``` + +#### 6.5 一键部署 + +```bash +kubectl apply -f deploy/k8s/jaeger-deployment.yaml \ + -f deploy/k8s/jaeger-service.yaml \ + -f deploy/k8s/loki-configmap.yaml \ + -f deploy/k8s/loki-pvc.yaml \ + -f deploy/k8s/loki-deployment.yaml \ + -f deploy/k8s/loki-service.yaml \ + -f deploy/k8s/promtail-rbac.yaml \ + -f deploy/k8s/promtail-configmap.yaml \ + -f deploy/k8s/promtail-daemonset.yaml \ + -f deploy/k8s/grafana-configmap.yaml \ + -f deploy/k8s/grafana-deployment.yaml \ + -f deploy/k8s/grafana-service.yaml +``` + +#### 6.6 状态检查 + +```bash +# 查看所有 Pod 状态 +kubectl get pods + +# 查看所有 Service 及 NodePort +kubectl get svc +``` + +#### 6.7 端口汇总 + +| 服务 | NodePort | 访问地址 | 说明 | +| :--- | :--- | :--- | :--- | +| **Jaeger UI** | `31686` | `http://:31686` | 链路追踪查询界面 | +| **Loki** | `31100` | `http://:31100` | 日志 HTTP API | +| **Grafana** | `31000` | `http://:31000` | 可视化界面,账号 `admin / coslight` | +| **OTLP gRPC** | `31317` | `:31317` | ModelRT OTel 上报地址(gRPC) | +| **OTLP HTTP** | `31318` | `http://:31318` | ModelRT OTel 上报地址(HTTP) | + +#### 6.8 清理 + +```bash +kubectl delete -f deploy/k8s/ +``` + +### 7\. Mac 本地访问(SSH 隧道) + +$\text{ModelRT / EventRT}$ 在 $\text{Mac}$ 本地运行时,依赖的 $\text{RabbitMQ}$、$\text{Redis}$、$\text{Jaeger}$、$\text{Loki}$、$\text{Grafana}$ 均部署在 $\text{Ubuntu}$ 宿主机(`192.168.1.101`)上的 $\text{Minikube}$(`192.168.49.2`)中。由于 $\text{Minikube}$ 网络不直接对外暴露,需通过 $\text{SSH}$ 本地端口转发建立访问隧道。 + +#### 7.1 网络拓扑 + +``` text +Mac 本地端口 ──SSH隧道──▶ Ubuntu 宿主机 (192.168.1.101) ──▶ Minikube NodePort (192.168.49.2) +``` + +#### 7.2 建立隧道 + +```bash +ssh -L 5671:192.168.49.2:30671 \ + -L 15671:192.168.49.2:31671 \ + -L 6379:192.168.49.2:30001 \ + -L 4318:192.168.49.2:31318 \ + -L 16686:192.168.49.2:31686 \ + -L 3100:192.168.49.2:31100 \ + -L 3000:192.168.49.2:31000 \ + douxu@192.168.1.101 +``` + +如需后台静默运行(不占用终端): + +```bash +ssh -fN \ + -L 5671:192.168.49.2:30671 \ + -L 15671:192.168.49.2:31671 \ + -L 6379:192.168.49.2:30001 \ + -L 4318:192.168.49.2:31318 \ + -L 16686:192.168.49.2:31686 \ + -L 3100:192.168.49.2:31100 \ + -L 3000:192.168.49.2:31000 \ + douxu@192.168.1.101 +``` + +#### 7.3 端口映射说明 + +| Mac 本地端口 | Minikube NodePort | 服务 | 说明 | +| :--- | :--- | :--- | :--- | +| `5671` | `30671` | RabbitMQ AMQP | ModelRT / EventRT 消息队列连接 | +| `15671` | `31671` | RabbitMQ Management | RabbitMQ 管理界面 `http://localhost:15671` | +| `6379` | `30001` | Redis | 分布式锁 / 数据存储 | +| `4318` | `31318` | OTLP HTTP | OTel Trace 上报(Jaeger Collector) | +| `16686` | `31686` | Jaeger UI | 链路追踪查询 `http://localhost:16686` | +| `3100` | `31100` | Loki | 日志查询 API | +| `3000` | `31000` | Grafana | 可视化界面 `http://localhost:3000` | + +> **注意:** 隧道建立后,本地配置文件中所有服务地址均填 `localhost:<本地端口>`,无需修改即可在 $\text{Mac}$ 上直接运行服务。 + +#### 7.4 关闭隧道 + +前台运行时直接 `Ctrl+C`;后台运行时查找并终止进程: + +```bash +# 找到 ssh 隧道进程 +ps aux | grep "ssh -fN" +# 终止(替换为实际 PID) +kill +``` + +### 8\. 后续操作(停止与清理) + +#### 8.1 停止容器 ```bash docker stop postgres redis ``` -#### 4.2 删除容器(删除后数据将丢失) +#### 8.2 删除容器(删除后数据将丢失) ```bash docker rm postgres redis diff --git a/deploy/dockerfile/modelrt.Dockerfile b/deploy/dockerfile/modelrt.Dockerfile index e251642..c091aae 100644 --- a/deploy/dockerfile/modelrt.Dockerfile +++ b/deploy/dockerfile/modelrt.Dockerfile @@ -1,19 +1,35 @@ FROM golang:1.24-alpine AS builder +RUN apk --no-cache upgrade WORKDIR /app -COPY go.mod . -COPY go.sum . +COPY go.mod go.sum ./ RUN GOPROXY="https://goproxy.cn,direct" go mod download COPY . . -RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o modelrt main.go +RUN CGO_ENABLED=0 GOOS=linux go build \ + -ldflags="-s -w" \ + -trimpath \ + -mod=readonly \ + -o modelrt main.go -FROM alpine:latest -WORKDIR /app +# Prepare runtime dependencies in a pinned Alpine stage so they can be +# copied into scratch without pulling any vulnerable OS packages at run time. +FROM alpine:3.21 AS certs ARG USER_ID=1000 -RUN adduser -D -u ${USER_ID} modelrt +RUN apk --no-cache add ca-certificates tzdata && \ + adduser -D -u ${USER_ID} modelrt + +FROM scratch +# CA certificates required for TLS connections (RabbitMQ amqps://) +COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +# Timezone data +COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo +# Non-root user/group definitions +COPY --from=certs /etc/passwd /etc/passwd +COPY --from=certs /etc/group /etc/group + +WORKDIR /app COPY --from=builder /app/modelrt ./modelrt COPY configs/config.example.yaml ./configs/config.example.yaml -RUN chown -R modelrt:modelrt /app -RUN chmod +x /app/modelrt + USER modelrt -CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"] \ No newline at end of file +CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"] diff --git a/deploy/k8s/grafana-configmap.yaml b/deploy/k8s/grafana-configmap.yaml new file mode 100644 index 0000000..76b2cb0 --- /dev/null +++ b/deploy/k8s/grafana-configmap.yaml @@ -0,0 +1,26 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: grafana-datasources + namespace: default +data: + datasources.yaml: | + apiVersion: 1 + datasources: + - name: Loki + type: loki + access: proxy + url: http://loki:3100 + isDefault: true + jsonData: + # derivedFields: 从日志的 traceID 字段生成跳转链接到 Jaeger + derivedFields: + - matcherRegex: '"traceID":\s*"([a-f0-9]+)"' + name: TraceID + url: http://127.0.0.1:16686/trace/$${__value.raw} + targetBlank: true + - name: Jaeger + type: jaeger + uid: jaeger + access: proxy + url: http://jaeger:16686 diff --git a/deploy/k8s/grafana-deployment.yaml b/deploy/k8s/grafana-deployment.yaml new file mode 100644 index 0000000..9f23045 --- /dev/null +++ b/deploy/k8s/grafana-deployment.yaml @@ -0,0 +1,41 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: grafana + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: grafana + template: + metadata: + labels: + app: grafana + spec: + containers: + - name: grafana + image: grafana/grafana:10.4.2 + ports: + - containerPort: 3000 + env: + - name: GF_SECURITY_ADMIN_USER + value: "coslight" + - name: GF_SECURITY_ADMIN_PASSWORD + value: "coslight@tj" + - name: GF_AUTH_ANONYMOUS_ENABLED + value: "false" + volumeMounts: + - name: datasources + mountPath: /etc/grafana/provisioning/datasources + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 128Mi + volumes: + - name: datasources + configMap: + name: grafana-datasources diff --git a/deploy/k8s/grafana-service.yaml b/deploy/k8s/grafana-service.yaml new file mode 100644 index 0000000..1cc3782 --- /dev/null +++ b/deploy/k8s/grafana-service.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Service +metadata: + name: grafana + namespace: default +spec: + ports: + - name: http + port: 3000 + targetPort: 3000 + nodePort: 31000 # Grafana UI: http://:31000 + selector: + app: grafana + type: NodePort diff --git a/deploy/jaeger.yaml b/deploy/k8s/jaeger-deployment.yaml similarity index 52% rename from deploy/jaeger.yaml rename to deploy/k8s/jaeger-deployment.yaml index 8dac477..c0444b7 100644 --- a/deploy/jaeger.yaml +++ b/deploy/k8s/jaeger-deployment.yaml @@ -1,31 +1,3 @@ -apiVersion: v1 -kind: Service -metadata: - name: jaeger - labels: - app: jaeger -spec: - ports: - - name: ui - port: 16686 - targetPort: 16686 - nodePort: 31686 # Jaeger UI,浏览器访问 http://:31686 - - name: collector-http - port: 14268 - targetPort: 14268 - nodePort: 31268 # Jaeger 原生 HTTP collector(非 OTel) - - name: otlp-http - port: 4318 - targetPort: 4318 - nodePort: 31318 # OTLP HTTP,集群外使用 :31318 - - name: otlp-grpc - port: 4317 - targetPort: 4317 - nodePort: 31317 # OTLP gRPC,集群外使用 :31317 - selector: - app: jaeger - type: NodePort ---- apiVersion: apps/v1 kind: Deployment metadata: diff --git a/deploy/k8s/jaeger-service.yaml b/deploy/k8s/jaeger-service.yaml new file mode 100644 index 0000000..d1e4779 --- /dev/null +++ b/deploy/k8s/jaeger-service.yaml @@ -0,0 +1,27 @@ +apiVersion: v1 +kind: Service +metadata: + name: jaeger + labels: + app: jaeger +spec: + ports: + - name: ui + port: 16686 + targetPort: 16686 + nodePort: 31686 # Jaeger UI,浏览器访问 http://:31686 + - name: collector-http + port: 14268 + targetPort: 14268 + nodePort: 31268 # Jaeger 原生 HTTP collector(非 OTel) + - name: otlp-http + port: 4318 + targetPort: 4318 + nodePort: 31318 # OTLP HTTP,集群外使用 :31318 + - name: otlp-grpc + port: 4317 + targetPort: 4317 + nodePort: 31317 # OTLP gRPC,集群外使用 :31317 + selector: + app: jaeger + type: NodePort diff --git a/deploy/k8s/loki-configmap.yaml b/deploy/k8s/loki-configmap.yaml new file mode 100644 index 0000000..b126a9f --- /dev/null +++ b/deploy/k8s/loki-configmap.yaml @@ -0,0 +1,49 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: loki-config + namespace: default +data: + loki.yaml: | + auth_enabled: false + + server: + http_listen_port: 3100 + + ingester: + wal: + enabled: true + dir: /loki/wal # 指向 PVC 挂载路径,避免在容器根目录创建 /wal 时 permission denied + lifecycler: + ring: + kvstore: + store: inmemory + replication_factor: 1 + chunk_idle_period: 5m + chunk_retain_period: 30s + + schema_config: + configs: + - from: 2024-01-01 + store: boltdb-shipper + object_store: filesystem + schema: v11 + index: + prefix: index_ + period: 24h + + storage_config: + boltdb_shipper: + active_index_directory: /loki/index + cache_location: /loki/cache + shared_store: filesystem + filesystem: + directory: /loki/chunks + + limits_config: + reject_old_samples: true + reject_old_samples_max_age: 168h + + compactor: + working_directory: /loki/compactor + shared_store: filesystem diff --git a/deploy/k8s/loki-deployment.yaml b/deploy/k8s/loki-deployment.yaml new file mode 100644 index 0000000..63ff925 --- /dev/null +++ b/deploy/k8s/loki-deployment.yaml @@ -0,0 +1,45 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: loki + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: loki + template: + metadata: + labels: + app: loki + spec: + securityContext: + fsGroup: 10001 # 使 PVC 挂载目录对 Loki 默认用户(UID 10001)可写 + runAsUser: 10001 + runAsGroup: 10001 + containers: + - name: loki + image: grafana/loki:2.9.4 + args: + - -config.file=/etc/loki/loki.yaml + ports: + - containerPort: 3100 + volumeMounts: + - name: config + mountPath: /etc/loki + - name: storage + mountPath: /loki + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 128Mi + volumes: + - name: config + configMap: + name: loki-config + - name: storage + persistentVolumeClaim: + claimName: loki-pvc diff --git a/deploy/k8s/loki-pvc.yaml b/deploy/k8s/loki-pvc.yaml new file mode 100644 index 0000000..f329a08 --- /dev/null +++ b/deploy/k8s/loki-pvc.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: loki-pvc + namespace: default +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi diff --git a/deploy/k8s/loki-service.yaml b/deploy/k8s/loki-service.yaml new file mode 100644 index 0000000..e0df759 --- /dev/null +++ b/deploy/k8s/loki-service.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Service +metadata: + name: loki + namespace: default +spec: + ports: + - name: http + port: 3100 + targetPort: 3100 + nodePort: 31100 # 集群外访问: http://:31100 + selector: + app: loki + type: NodePort diff --git a/deploy/k8s/modelrt-certs-secret.sh b/deploy/k8s/modelrt-certs-secret.sh new file mode 100644 index 0000000..33a1bbd --- /dev/null +++ b/deploy/k8s/modelrt-certs-secret.sh @@ -0,0 +1,14 @@ +#!/bin/sh +# Create the modelrt client certificate secret. +# Run this script from the directory that contains the three cert files, +# or adjust the paths below to point at the actual files. +# +# Expected files (generated during RabbitMQ TLS setup): +# ca_certificate.pem +# modelrt_client_cert.pem +# modelrt_client_key.pem + +kubectl create secret generic modelrt-certs \ + --from-file=ca_certificate.pem=./ca_certificate.pem \ + --from-file=modelrt_client_cert.pem=./modelrt_client_cert.pem \ + --from-file=modelrt_client_key.pem=./modelrt_client_key.pem diff --git a/deploy/k8s/modelrt-configmap.yaml b/deploy/k8s/modelrt-configmap.yaml new file mode 100644 index 0000000..b1a6afc --- /dev/null +++ b/deploy/k8s/modelrt-configmap.yaml @@ -0,0 +1,86 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: modelrt-config +data: + config.yaml: | + postgres: + host: "192.168.1.101" + port: 5432 + database: "demo" + user: "postgres" + password: "" # injected via env POSTGRES_PASSWORD + + rabbitmq: + ca_cert_path: "/app/configs/certs/ca_certificate.pem" + client_key_path: "/app/configs/certs/modelrt_client_key.pem" + client_key_password: "" + client_cert_path: "/app/configs/certs/modelrt_client_cert.pem" + insecure_skip_verify: false + server_name: "rabbitmq-server" + user: "" + password: "" + host: "rabbitmq-service" + port: 5671 + + logger: + mode: "production" + level: "info" + filepath: "" + maxsize: 100 + maxbackups: 5 + maxage: 30 + compress: false + loki: + endpoint: "" # Promtail handles log collection in K8s, direct push disabled + + otel: + endpoint: "jaeger:4318" + insecure: true + + ants: + parse_concurrent_quantity: 10 + rtd_receive_concurrent_quantity: 10 + + async_task: + worker_pool_size: 10 + queue_consumer_count: 2 + max_retry_count: 3 + retry_initial_delay: 1s + retry_max_delay: 5m + health_check_interval: 30s + + locker_redis: + addr: "redis-service:6379" + password: "" + db: 1 + poolsize: 50 + dial_timeout: 10 + read_timeout: 10 + write_timeout: 10 + + storage_redis: + addr: "redis-service:6379" + password: "" + db: 0 + poolsize: 50 + dial_timeout: 10 + read_timeout: 10 + write_timeout: 10 + + base: + grid_id: 1 + zone_id: 1 + station_id: 1 + + service: + service_addr: ":8080" + service_name: "modelRT" + secret_key: "" # injected via env SERVICE_SECRET_KEY + deploy_env: "production" + + dataRT: + host: "http://127.0.0.1" + port: 8888 + polling_api: "datart/getPointData" + polling_api_method: "GET" diff --git a/deploy/k8s/modelrt-deployment.yaml b/deploy/k8s/modelrt-deployment.yaml new file mode 100644 index 0000000..38a9a23 --- /dev/null +++ b/deploy/k8s/modelrt-deployment.yaml @@ -0,0 +1,90 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: modelrt + labels: + app: modelrt +spec: + replicas: 1 + selector: + matchLabels: + app: modelrt + template: + metadata: + labels: + app: modelrt + spec: + containers: + - name: modelrt + image: coslight/modelrt:latest + imagePullPolicy: IfNotPresent + args: + - "-modelRT_config_dir=/app/configs" + - "-modelRT_config_name=config" + - "-modelRT_config_type=yaml" + ports: + - containerPort: 8080 + env: + # Downward API — injected into every log line by logger/zap.go containerFields() + - name: K8S_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: K8S_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + # HOSTNAME is set automatically by K8s to the pod name + # Sensitive values injected from Secret so they stay out of ConfigMap + - name: POSTGRES_PASSWORD + valueFrom: + secretKeyRef: + name: modelrt-secret + key: postgres-password + - name: SERVICE_SECRET_KEY + valueFrom: + secretKeyRef: + name: modelrt-secret + key: secret-key + volumeMounts: + - name: config + mountPath: /app/configs/config.yaml + subPath: config.yaml + readOnly: true + - name: certs + mountPath: /app/configs/certs + readOnly: true + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 512Mi + securityContext: + runAsUser: 1000 + runAsNonRoot: true + readOnlyRootFilesystem: true + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + livenessProbe: + tcpSocket: + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 30 + failureThreshold: 3 + readinessProbe: + tcpSocket: + port: 8080 + initialDelaySeconds: 5 + periodSeconds: 10 + failureThreshold: 3 + volumes: + - name: config + configMap: + name: modelrt-config + - name: certs + secret: + secretName: modelrt-certs diff --git a/deploy/k8s/modelrt-secret.yaml b/deploy/k8s/modelrt-secret.yaml new file mode 100644 index 0000000..e078e4d --- /dev/null +++ b/deploy/k8s/modelrt-secret.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: modelrt-secret +type: Opaque +stringData: + postgres-password: "coslight" + secret-key: "modelrt_key" diff --git a/deploy/k8s/modelrt-service.yaml b/deploy/k8s/modelrt-service.yaml new file mode 100644 index 0000000..88b2cf7 --- /dev/null +++ b/deploy/k8s/modelrt-service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: modelrt-service + labels: + app: modelrt +spec: + type: NodePort + selector: + app: modelrt + ports: + - name: http + port: 8080 + targetPort: 8080 + nodePort: 30080 diff --git a/deploy/k8s/promtail-configmap.yaml b/deploy/k8s/promtail-configmap.yaml new file mode 100644 index 0000000..0ccf089 --- /dev/null +++ b/deploy/k8s/promtail-configmap.yaml @@ -0,0 +1,52 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: promtail-config + namespace: default +data: + promtail.yaml: | + server: + http_listen_port: 9080 + grpc_listen_port: 0 + + positions: + filename: /tmp/positions.yaml + + clients: + - url: http://loki:3100/loki/api/v1/push + + scrape_configs: + - job_name: kubernetes-pods + kubernetes_sd_configs: + - role: pod + pipeline_stages: + # 解析 zap 输出的 JSON 日志,提取结构化字段 + - json: + expressions: + level: level + traceID: traceID + spanID: spanID + caller: caller + pod: pod + namespace: namespace + node: node + # 将关键字段提升为 Loki Label,支持在 Grafana 中按实例/Trace 过滤 + - labels: + level: + traceID: + pod: + namespace: + node: + relabel_configs: + - source_labels: [__meta_kubernetes_namespace] + target_label: namespace + - source_labels: [__meta_kubernetes_pod_name] + target_label: pod + - source_labels: [__meta_kubernetes_pod_container_name] + target_label: container + - source_labels: [__meta_kubernetes_pod_label_app] + target_label: app + # 只采集有 app label 的 Pod + - source_labels: [__meta_kubernetes_pod_label_app] + action: keep + regex: .+ diff --git a/deploy/k8s/promtail-daemonset.yaml b/deploy/k8s/promtail-daemonset.yaml new file mode 100644 index 0000000..eedf72d --- /dev/null +++ b/deploy/k8s/promtail-daemonset.yaml @@ -0,0 +1,51 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: promtail + namespace: default +spec: + selector: + matchLabels: + app: promtail + template: + metadata: + labels: + app: promtail + spec: + serviceAccountName: promtail + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule + containers: + - name: promtail + image: grafana/promtail:2.9.4 + args: + - -config.file=/etc/promtail/promtail.yaml + ports: + - containerPort: 9080 + volumeMounts: + - name: config + mountPath: /etc/promtail + - name: varlog + mountPath: /var/log + readOnly: true + - name: varlibdockercontainers + mountPath: /var/lib/docker/containers + readOnly: true + resources: + limits: + cpu: 200m + memory: 128Mi + requests: + cpu: 50m + memory: 64Mi + volumes: + - name: config + configMap: + name: promtail-config + - name: varlog + hostPath: + path: /var/log + - name: varlibdockercontainers + hostPath: + path: /var/lib/docker/containers diff --git a/deploy/k8s/promtail-rbac.yaml b/deploy/k8s/promtail-rbac.yaml new file mode 100644 index 0000000..2433f23 --- /dev/null +++ b/deploy/k8s/promtail-rbac.yaml @@ -0,0 +1,27 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: promtail + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: promtail +rules: + - apiGroups: [""] + resources: ["nodes", "nodes/proxy", "services", "endpoints", "pods"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: promtail +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: promtail +subjects: + - kind: ServiceAccount + name: promtail + namespace: default diff --git a/deploy/k8s/rabbitmq-config.yaml b/deploy/k8s/rabbitmq-config.yaml new file mode 100644 index 0000000..a5cbad7 --- /dev/null +++ b/deploy/k8s/rabbitmq-config.yaml @@ -0,0 +1,33 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: rabbitmq-config +data: + rabbitmq.conf: | + # 确保允许PLAIN认证 + auth_mechanisms.1 = PLAIN + auth_mechanisms.2 = AMQPLAIN + auth_mechanisms.3 = EXTERNAL + # 允许admin用户通过远程方式连接 + loopback_users.admin = false + # 默认心跳和监听配置可在此扩展 + # 确定 ssl 连接时验证使用的用户名 + ssl_cert_login_from = common_name + # 开启此项配置会导致只能通过TLS端口访问 + listeners.tcp = none + listeners.ssl.default = 5671 + # default user config + load_definitions = /etc/rabbitmq/definitions.json + # ssl config + ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem + ssl_options.certfile = /etc/rabbitmq/certs/server_certificate.pem + ssl_options.keyfile = /etc/rabbitmq/certs/server_key.pem + ssl_options.verify = verify_peer + ssl_options.fail_if_no_peer_cert = true + # management config + management.ssl.port = 15671 + management.ssl.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem + management.ssl.certfile = /etc/rabbitmq/certs/server_certificate.pem + management.ssl.keyfile = /etc/rabbitmq/certs/server_key.pem + management.ssl.verify = verify_peer + management.ssl.fail_if_no_peer_cert = true diff --git a/deploy/k8s/rabbitmq-deployment.yaml b/deploy/k8s/rabbitmq-deployment.yaml new file mode 100644 index 0000000..758daca --- /dev/null +++ b/deploy/k8s/rabbitmq-deployment.yaml @@ -0,0 +1,81 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: eventrt-rabbitmq +spec: + replicas: 1 + selector: + matchLabels: + app: rabbitmq + template: + metadata: + labels: + app: rabbitmq + spec: + containers: + - name: rabbitmq + image: rabbitmq:4.1.1-management-alpine + ports: + - containerPort: 4369 + - containerPort: 5671 + - containerPort: 5672 # AMQP + - containerPort: 15671 + - containerPort: 15672 # Management UI + - containerPort: 15691 + - containerPort: 15692 + - containerPort: 25672 + env: + - name: RABBITMQ_DEFAULT_USER + valueFrom: + secretKeyRef: + name: rabbitmq-secret + key: rabbitmq-user + - name: RABBITMQ_DEFAULT_PASS + valueFrom: + secretKeyRef: + name: rabbitmq-secret + key: rabbitmq-pass + - name: RABBITMQ_ERLANG_COOKIE + valueFrom: + secretKeyRef: + name: rabbitmq-secret + key: erlang-cookie + - name: RABBITMQ_DEFAULT_VHOST + value: "/" + volumeMounts: + - name: rabbitmq-certs-volume + mountPath: /etc/rabbitmq/certs + readOnly: true + - name: rabbitmq-config-volume + mountPath: /etc/rabbitmq/rabbitmq.conf + subPath: rabbitmq.conf + - name: rabbitmq-config-volume + mountPath: /etc/rabbitmq/advanced.config + subPath: advanced.config + readOnly: true + - name: plugins-config-volume + mountPath: /etc/rabbitmq/enabled_plugins + subPath: enabled_plugins + - name: users-config-volume + mountPath: /etc/rabbitmq/definitions.json + subPath: definitions.json + - name: rabbitmq-data + mountPath: /var/lib/rabbitmq + volumes: + - name: rabbitmq-certs-volume + secret: + secretName: rabbitmq-certs + - name: rabbitmq-config-volume + configMap: + name: rabbitmq-config + - name: rabbitmq-advanced-config-volume + configMap: + name: rabbitmq-config + - name: plugins-config-volume + configMap: + name: rabbit-plugins-conf + - name: users-config-volume + configMap: + name: rabbitmq-users-definitions + - name: rabbitmq-data + emptyDir: {} diff --git a/deploy/k8s/rabbitmq-secret.yaml b/deploy/k8s/rabbitmq-secret.yaml new file mode 100644 index 0000000..eae46a1 --- /dev/null +++ b/deploy/k8s/rabbitmq-secret.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: Secret +metadata: + name: rabbitmq-secret +type: Opaque +stringData: + rabbitmq-user: "coslight" + rabbitmq-pass: "coslight@tj" + erlang-cookie: "secret-erlang-cookie" diff --git a/deploy/k8s/rabbitmq-service.yaml b/deploy/k8s/rabbitmq-service.yaml new file mode 100644 index 0000000..6cdb259 --- /dev/null +++ b/deploy/k8s/rabbitmq-service.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: Service +metadata: + name: rabbitmq-service +spec: + type: NodePort # 在 Minikube 中使用 NodePort 方便外部访问 + selector: + app: rabbitmq + ports: + - name: amqp-ssl + protocol: TCP + port: 5671 + targetPort: 5671 + nodePort: 30671 + - name: amqp + protocol: TCP + port: 5672 + targetPort: 5672 + nodePort: 30672 + - name: management-ssl + protocol: TCP + port: 15671 + targetPort: 15671 + nodePort: 31671 + - name: management + protocol: TCP + port: 15672 + targetPort: 15672 + nodePort: 31672 diff --git a/deploy/k8s/rabbitmq-users-config.yaml b/deploy/k8s/rabbitmq-users-config.yaml new file mode 100644 index 0000000..8de5f30 --- /dev/null +++ b/deploy/k8s/rabbitmq-users-config.yaml @@ -0,0 +1,77 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: rabbitmq-users-definitions +data: + definitions.json: | + { + "users": [ + { + "name": "coslight", + "password_hash": "Gl2XVEJwPwDZQF8ZhsYnvm83wMkdftY3/raxyntdZueyx/Uv", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": ["administrator"] + }, + { + "name": "web-client", + "password_hash": "", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": ["management"] + }, + { + "name": "modelrt-client", + "password_hash": "", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": ["management"] + }, + { + "name": "eventrt-client", + "password_hash": "", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": ["management"] + } + ], + "vhosts": [ { "name": "/" } ], + "permissions": [ + { + "user": "coslight", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + }, + { + "user": "web-client", + "vhost": "/", + "configure": "^$", + "write": ".*", + "read": ".*" + }, + { + "user": "modelrt-client", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + }, + { + "user": "eventrt-client", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + } + ], + "topic_permissions": [], + "parameters": [], + "global_parameters": [ + { + "name": "cluster_name", + "value": "evnetrt-rabbitmq-cluster" + } + ], + "policies": [], + "queues": [], + "exchanges": [], + "bindings": [] + } diff --git a/deploy/k8s/redis-deployment.yaml b/deploy/k8s/redis-deployment.yaml new file mode 100644 index 0000000..b2f08fc --- /dev/null +++ b/deploy/k8s/redis-deployment.yaml @@ -0,0 +1,23 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis +spec: + replicas: 1 + selector: + matchLabels: + app: redis + template: + metadata: + labels: + app: redis + spec: + containers: + - name: redis + image: redis/redis-stack-server:latest + resources: + limits: + memory: "128Mi" + cpu: "500m" + ports: + - containerPort: 6379 diff --git a/deploy/k8s/redis-service.yaml b/deploy/k8s/redis-service.yaml new file mode 100644 index 0000000..ba82d15 --- /dev/null +++ b/deploy/k8s/redis-service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: redis-service +spec: + type: NodePort + selector: + app: redis + ports: + - port: 6379 + targetPort: 6379 + nodePort: 30001 + diff --git a/handler/async_task_cancel_handler.go b/handler/async_task_cancel_handler.go index 6fa0bc3..40fa480 100644 --- a/handler/async_task_cancel_handler.go +++ b/handler/async_task_cancel_handler.go @@ -7,6 +7,8 @@ import ( "modelRT/constants" "modelRT/database" "modelRT/logger" + "modelRT/mq" + "modelRT/mq/event" "modelRT/orm" "github.com/gin-gonic/gin" @@ -74,6 +76,10 @@ func AsyncTaskCancelHandler(c *gin.Context) { return } + if record, evtErr := event.NewTaskCancelledMessage(taskID.String(), string(asyncTask.TaskType)); evtErr == nil { + mq.TryEmitMessage(ctx, record) + } + err = database.UpdateAsyncTaskResultWithError(ctx, pgClient, taskID, 40009, "task cancelled by user", orm.JSONMap{ "cancelled_at": timestamp, "cancelled_by": "user", diff --git a/handler/async_task_create_handler.go b/handler/async_task_create_handler.go index 592460b..30b8531 100644 --- a/handler/async_task_create_handler.go +++ b/handler/async_task_create_handler.go @@ -5,6 +5,8 @@ import ( "modelRT/constants" "modelRT/database" "modelRT/logger" + "modelRT/mq" + "modelRT/mq/event" "modelRT/network" "modelRT/orm" "modelRT/task" @@ -77,6 +79,10 @@ func AsyncTaskCreateHandler(c *gin.Context) { task.TaskMsgChan <- msg logger.Info(ctx, "task enqueued to channel", "task_id", asyncTask.TaskID, "queue", constants.TaskQueueName) + if record, err := event.NewTaskSubmittedMessage(asyncTask.TaskID.String(), request.TaskType, 5); err == nil { + mq.TryEmitMessage(ctx, record) + } + logger.Info(ctx, "async task created success", "task_id", asyncTask.TaskID, "task_type", request.TaskType) // return success response diff --git a/main.go b/main.go index f120c6f..8034f87 100644 --- a/main.go +++ b/main.go @@ -188,6 +188,8 @@ func main() { // async push event to rabbitMQ go mq.PushUpDownLimitEventToRabbitMQ(ctx, mq.MsgChan) + // async push message (task state changes etc.) to rabbitMQ + go mq.PushMessageToRabbitMQ(ctx, mq.MessageMsgChan) // async push task message to rabbitMQ go task.PushTaskToRabbitMQ(ctx, modelRTConfig.RabbitMQConfig, task.TaskMsgChan) diff --git a/mq/emit.go b/mq/emit.go new file mode 100644 index 0000000..4c3dca6 --- /dev/null +++ b/mq/emit.go @@ -0,0 +1,28 @@ +// Package mq provides read or write access to message queue services +package mq + +import ( + "context" + + "modelRT/logger" + "modelRT/mq/event" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" +) + +// TryEmitMessage pushes a message record into MessageMsgChan non-blocking. +// If the channel is full the message is dropped and a warning is logged. +func TryEmitMessage(ctx context.Context, record *event.EventRecord) { + carrier := make(map[string]string) + otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier)) + msg := &EventMessage{Record: record, TraceCarrier: carrier} + select { + case MessageMsgChan <- msg: + default: + logger.Warn(ctx, "message channel full, message dropped", + "event_uuid", record.EventUUID, + "category", record.Category, + ) + } +} diff --git a/mq/event/task_event_gen.go b/mq/event/task_event_gen.go new file mode 100644 index 0000000..ed32c4f --- /dev/null +++ b/mq/event/task_event_gen.go @@ -0,0 +1,81 @@ +// Package event define real time data evnet operation functions +package event + +import ( + "modelRT/constants" +) + +// NewTaskSubmittedMessage creates a message record for when a task is submitted to the queue +func NewTaskSubmittedMessage(taskID, taskType string, priority int) (*EventRecord, error) { + return NewPlatformEventRecord( + int(constants.EventGeneralPlatformSoft), + 0, + "async_task_submitted", + WithCategory(constants.MessageTaskSubmittedCategory), + WithCondition(map[string]any{ + "task_id": taskID, + "task_type": taskType, + "priority": priority, + }), + ) +} + +// NewTaskRunningMessage creates a message record for when a task begins execution +func NewTaskRunningMessage(taskID, taskType string) (*EventRecord, error) { + return NewPlatformEventRecord( + int(constants.EventGeneralPlatformSoft), + 0, + "async_task_running", + WithCategory(constants.MessageTaskRunningCategory), + WithCondition(map[string]any{ + "task_id": taskID, + "task_type": taskType, + }), + ) +} + +// NewTaskCompletedMessage creates a message record for when a task finishes successfully +func NewTaskCompletedMessage(taskID, taskType string, executionMs int64) (*EventRecord, error) { + return NewPlatformEventRecord( + int(constants.EventGeneralPlatformSoft), + 0, + "async_task_completed", + WithCategory(constants.MessageTaskCompletedCategory), + WithCondition(map[string]any{ + "task_id": taskID, + "task_type": taskType, + }), + WithResult(map[string]any{ + "execution_ms": executionMs, + }), + ) +} + +// NewTaskFailedMessage creates a message record for when a task fails during execution +func NewTaskFailedMessage(taskID, taskType, reason string) (*EventRecord, error) { + return NewPlatformEventRecord( + int(constants.EventGeneralPlatformSoft), + 0, + "async_task_failed", + WithCategory(constants.MessageTaskFailedCategory), + WithCondition(map[string]any{ + "task_id": taskID, + "task_type": taskType, + "reason": reason, + }), + ) +} + +// NewTaskCancelledMessage creates a message record for when a task is cancelled by a user +func NewTaskCancelledMessage(taskID, taskType string) (*EventRecord, error) { + return NewPlatformEventRecord( + int(constants.EventGeneralPlatformSoft), + 0, + "async_task_cancelled", + WithCategory(constants.MessageTaskCancelledCategory), + WithCondition(map[string]any{ + "task_id": taskID, + "task_type": taskType, + }), + ) +} diff --git a/mq/publish_message.go b/mq/publish_message.go new file mode 100644 index 0000000..46e796c --- /dev/null +++ b/mq/publish_message.go @@ -0,0 +1,149 @@ +// Package mq provides read or write access to message queue services +package mq + +import ( + "context" + "encoding/json" + "time" + + "modelRT/constants" + "modelRT/logger" + + amqp "github.com/rabbitmq/amqp091-go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" +) + +// MessageMsgChan buffers message records to be published to the message exchange asynchronously +var MessageMsgChan chan *EventMessage + +func init() { + MessageMsgChan = make(chan *EventMessage, 10000) +} + +func initMessageChannel(ctx context.Context) (*amqp.Channel, error) { + channel, err := GetConn().Channel() + if err != nil { + logger.Error(ctx, "open rabbitMQ server channel failed", "error", err) + return nil, err + } + + err = channel.ExchangeDeclare(constants.MessageDeadExchangeName, "topic", true, false, false, false, nil) + if err != nil { + logger.Error(ctx, "declare message dead letter exchange failed", "error", err) + return nil, err + } + + _, err = channel.QueueDeclare(constants.MessageDeadQueueName, true, false, false, false, nil) + if err != nil { + logger.Error(ctx, "declare message dead letter queue failed", "error", err) + return nil, err + } + + err = channel.QueueBind(constants.MessageDeadQueueName, constants.MessageDeadRoutingKey, constants.MessageDeadExchangeName, false, nil) + if err != nil { + logger.Error(ctx, "bind message dead letter queue failed", "error", err) + return nil, err + } + + err = channel.ExchangeDeclare(constants.MessageExchangeName, "topic", true, false, false, false, nil) + if err != nil { + logger.Error(ctx, "declare message exchange failed", "error", err) + return nil, err + } + + args := amqp.Table{ + "x-dead-letter-exchange": constants.MessageDeadExchangeName, + "x-dead-letter-routing-key": constants.MessageDeadRoutingKey, + } + _, err = channel.QueueDeclare(constants.MessageQueueName, true, false, false, false, args) + if err != nil { + logger.Error(ctx, "declare message queue failed", "error", err) + return nil, err + } + + err = channel.QueueBind(constants.MessageQueueName, constants.MessageRoutingKey, constants.MessageExchangeName, false, nil) + if err != nil { + logger.Error(ctx, "bind message queue failed", "error", err) + return nil, err + } + + if err := channel.Confirm(false); err != nil { + logger.Error(ctx, "channel could not be put into confirm mode", "error", err) + return nil, err + } + return channel, nil +} + +// PushMessageToRabbitMQ publishes message records from msgChan to the message exchange. +// The category of each record is used as the routing key so consumers can bind selectively. +func PushMessageToRabbitMQ(ctx context.Context, msgChan chan *EventMessage) { + channel, err := initMessageChannel(ctx) + if err != nil { + logger.Error(ctx, "initializing message rabbitMQ channel failed", "error", err) + return + } + + confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100)) + + go func() { + for { + select { + case confirm, ok := <-confirms: + if !ok { + return + } + if !confirm.Ack { + logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag) + } + case <-ctx.Done(): + return + } + } + }() + + for { + select { + case <-ctx.Done(): + logger.Info(ctx, "push message to rabbitMQ stopped by context cancel") + channel.Close() + return + case msg, ok := <-msgChan: + if !ok { + logger.Info(ctx, "push message to rabbitMQ stopped by msgChan closed") + channel.Close() + return + } + + record := msg.Record + + recordBytes, err := json.Marshal(record) + if err != nil { + logger.Error(ctx, "marshal message record failed", "event_uuid", record.EventUUID, "error", err) + continue + } + + msgCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier)) + headers := amqp.Table{} + otel.GetTextMapPropagator().Inject(msgCtx, amqpHeaderCarrier(headers)) + + routingKey := record.Category + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + err = channel.PublishWithContext(pubCtx, + constants.MessageExchangeName, + routingKey, + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: recordBytes, + Headers: headers, + }) + cancel() + + if err != nil { + logger.Error(ctx, "publish message to rabbitMQ failed", "routing_key", routingKey, "error", err) + } + } + } +} diff --git a/task/worker.go b/task/worker.go index b51e00d..1febca4 100644 --- a/task/worker.go +++ b/task/worker.go @@ -13,6 +13,7 @@ import ( "modelRT/database" "modelRT/logger" "modelRT/mq" + "modelRT/mq/event" "modelRT/orm" "github.com/gofrs/uuid" @@ -314,6 +315,10 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { return } + if record, err := event.NewTaskRunningMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType)); err == nil { + mq.TryEmitMessage(ctx, record) + } + // Execute task using handler startTime := time.Now() err := w.dispatch(ctx, taskMsg.TaskType, taskMsg.TaskID, taskMsg.Params, &msg) @@ -332,6 +337,10 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { logger.Error(ctx, "Failed to update task with error", "error", updateErr) } + if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil { + mq.TryEmitMessage(ctx, record) + } + // Ack message even if task failed (we don't want to retry indefinitely) msg.Ack(false) w.metrics.mu.Lock() @@ -347,6 +356,10 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Still ack the message since task was processed successfully } + if record, err := event.NewTaskCompletedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), processingTime.Milliseconds()); err == nil { + mq.TryEmitMessage(ctx, record) + } + // Acknowledge message msg.Ack(false)