refactor: rename TaskParams to Params and remove debug prints

- rename TaskParams interface to Params in task/base_task.go for brevity                                                     - remove debug fmt.Println/Printf statements from graph.go and handler_factory.go
  - fix is_local flag from false to true for existing test components in deploy.md                                             - add 6 new test component records (ns4-ns8) to deploy seed data
This commit is contained in:
douxu 2026-04-28 17:41:28 +08:00
parent 33f7d758e5
commit 9661278935
11 changed files with 661 additions and 187 deletions

View File

@ -46,4 +46,10 @@ var (
// ErrCacheQueryFailed define variable to indicates query cached data by token failed.
ErrCacheQueryFailed = newError(60003, "query cached data by token failed")
// ErrTaskNotFound indicates the async task with the given ID does not exist.
ErrTaskNotFound = newError(40008, "async task not found")
// ErrTaskCannotCancel indicates the task is already running or completed and cannot be cancelled.
ErrTaskCannotCancel = newError(40009, "task cannot be cancelled, already running or completed")
)

View File

@ -136,7 +136,7 @@ VALUES
'ns1', 'tag1', 'component1', 'bus_1', '',
'grid1', 'zone1', 'station1', 1,
-1,
false,
true,
-1, -1,
'{}',
'{}',
@ -149,7 +149,7 @@ VALUES
'ns2', 'tag2', 'component2', 'bus_1', '',
'grid1', 'zone1', 'station1', 1,
-1,
false,
true,
-1, -1,
'{}',
'{}',
@ -162,13 +162,78 @@ VALUES
'ns3', 'tag3', 'component3', 'bus_1', '',
'grid1', 'zone1', 'station2', 2,
-1,
false,
true,
-1, -1,
'{}',
'{}',
'{}',
-1,
CURRENT_TIMESTAMP
),
(
'70c190f2-8a60-42a9-b143-ec5f87e0aa6b',
'ns4', 'tag4', 'component4', 'bus_1', '',
'grid1', 'zone1', 'station1', 1,
-1,
true,
-1, -1,
'{}',
'{}',
'{}',
-1,
CURRENT_TIMESTAMP
),
(
'10f155cf-bd27-4557-85b2-d126b6e2657f',
'ns5', 'tag5', 'component5', 'bus_1', '',
'grid1', 'zone1', 'station1', 1,
-1,
true,
-1, -1,
'{}',
'{}',
'{}',
-1,
CURRENT_TIMESTAMP
),
(
'e32bc0be-67f4-4d79-a5da-eaa40a5bd77d',
'ns6', 'tag6', 'component6', 'bus_1', '',
'grid1', 'zone1', 'station1', 1,
-1,
true,
-1, -1,
'{}',
'{}',
'{}',
-1,
CURRENT_TIMESTAMP
),
(
'70c190f2-8a75-42a9-b166-ec5f87e0aa6b',
'ns7', 'tag7', 'component7', 'bus_1', '',
'grid1', 'zone1', 'station1', 1,
-1,
true,
-1, -1,
'{}',
'{}',
'{}',
-1,
CURRENT_TIMESTAMP
),
(
'70c200f2-8a75-42a9-c166-bf5f87e0aa6b',
'ns8', 'tag8', 'component8', 'bus_1', '',
'grid1', 'zone1', 'station1', 1,
-1,
true,
-1, -1,
'{}',
'{}',
'{}',
-1,
CURRENT_TIMESTAMP
);
INSERT INTO public.measurement (id, tag, name, type, size, data_source, event_plan, bay_uuid, component_uuid, op, ts)

View File

@ -112,7 +112,6 @@ func (g *Graph) DelEdge(from, to uuid.UUID) error {
return fmt.Errorf("delete edge failed: %w", err)
}
fmt.Println("fromKeys:", fromKeys)
for _, fromUUID := range fromKeys {
fromKey := fromUUID.String()
var delIndex int

539
doc/async_task_api.md Normal file
View File

@ -0,0 +1,539 @@
# ModelRT 异步任务 API 文档
## 1. 概述
ModelRT 异步任务系统基于 RabbitMQ 消息驱动,提供完整的任务生命周期管理。任务创建后进入队列,由后台 Worker 消费并执行,调用方可通过轮询接口获取进度和结果。
**Base URL**: `http://{host}:{port}`(默认 `localhost:8080`
**鉴权**: 公开接口需在 Header 中携带 `X-Service-Token`(由服务端启动时生成)。
---
## 2. API 端点总览
| 方法 | 路径 | 描述 | 权限 |
| :----- | :--------------------------------- | :--------------- | :----- |
| POST | `/task/async` | 创建异步任务 | 公开 |
| GET | `/task/async/results` | 批量查询任务结果 | 公开 |
| GET | `/task/async/{task_id}` | 查询单个任务详情 | 公开 |
| POST | `/task/async/{task_id}/cancel` | 取消异步任务 | 公开 |
| POST | `/task/internal/async/progress` | 更新任务进度 | 内部 |
| POST | `/task/internal/async/status` | 更新任务状态 | 内部 |
---
## 3. 通用响应结构
### 成功响应
```json
{
"code": 2000,
"msg": "success message",
"payload": {}
}
```
### 失败响应
```json
{
"code": 4001,
"msg": "error description",
"payload": null
}
```
### 响应码说明
| code | 含义 |
| :--- | :--------------------- |
| 2000 | 成功 |
| 3000 | 处理失败 |
| 4001 | 请求参数无效 |
| 4002 | 未授权Token 无效) |
| 5000 | 服务器内部错误 |
---
## 4. 详细接口说明
### 4.1 创建异步任务
**POST** `/task/async`
创建新的异步任务,任务进入队列等待 Worker 消费。返回 `task_id` 用于后续查询。
**请求头**
```
Content-Type: application/json
X-Service-Token: <token>
```
**请求体**
```json
{
"task_type": "TOPOLOGY_ANALYSIS",
"params": { }
}
```
| 字段 | 类型 | 必填 | 说明 |
| :---------- | :----- | :--- | :-------------------------------------------------------------------------------- |
| `task_type` | string | 是 | 任务类型,枚举值见 §5.1 |
| `params` | object | 是 | 任务参数,不同任务类型的参数结构见 §5.3 |
**成功响应** `200`
```json
{
"code": 2000,
"msg": "task created successfully",
"payload": {
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
}
```
**失败响应**
| 场景 | code | msg |
| :----------------- | :--- | :-------------------------- |
| 参数格式错误 | 4001 | invalid request parameters |
| task_type 不合法 | 4001 | invalid task type |
| params 内容不合法 | 4001 | invalid task parameters |
| 数据库连接失败 | 5000 | database connection error |
| 任务写库失败 | 5000 | failed to create task |
**curl 示例**
```bash
curl -X POST "http://localhost:8080/task/async" \
-H "Content-Type: application/json" \
-H "X-Service-Token: <token>" \
-d '{
"task_type": "TOPOLOGY_ANALYSIS",
"params": {
"start_component_uuid": "550e8400-e29b-41d4-a716-446655440000",
"end_component_uuid": "550e8400-e29b-41d4-a716-446655440001",
"check_in_service": true
}
}'
```
---
### 4.2 批量查询任务结果
**GET** `/task/async/results`
根据一组任务 ID 批量查询状态和结果,适用于轮询多个任务。
**Query 参数**
| 参数 | 类型 | 必填 | 说明 |
| :--------- | :----- | :--- | :----------------------------------- |
| `task_ids` | string | 是 | 逗号分隔的 UUID 列表,最少 1 个 |
**请求示例**
```
GET /task/async/results?task_ids=123e4567-e89b-12d3-a456-426614174000,223e4567-e89b-12d3-a456-426614174001
```
**成功响应** `200`
```json
{
"code": 2000,
"msg": "query completed",
"payload": {
"total": 2,
"tasks": [
{
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"task_type": "TOPOLOGY_ANALYSIS",
"status": "RUNNING",
"progress": 50,
"created_at": 1741846200
},
{
"task_id": "223e4567-e89b-12d3-a456-426614174001",
"task_type": "PERFORMANCE_ANALYSIS",
"status": "COMPLETED",
"progress": 100,
"created_at": 1741846200,
"finished_at": 1741846260,
"result": {
"components_analyzed": 5
}
}
]
}
}
```
**失败响应**
| 场景 | code | msg |
| :----------------- | :--- | :-------------------------- |
| 缺少 task_ids | 4001 | task_ids parameter is required |
| UUID 格式不合法 | 4001 | invalid task ID format |
| 数据库连接失败 | 5000 | database connection error |
| 查询失败 | 5000 | failed to query tasks |
**curl 示例**
```bash
curl "http://localhost:8080/task/async/results?task_ids=123e4567-e89b-12d3-a456-426614174000"
```
---
### 4.3 查询单个任务详情
**GET** `/task/async/{task_id}`
查询单个任务的完整信息,包含结果或错误详情。
**路径参数**
| 参数 | 类型 | 必填 | 说明 |
| :-------- | :----- | :--- | :-------------- |
| `task_id` | string | 是 | 任务 UUID |
**成功响应** `200`
```json
{
"code": 2000,
"msg": "query completed",
"payload": {
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"task_type": "TOPOLOGY_ANALYSIS",
"status": "COMPLETED",
"progress": 100,
"created_at": 1741846200,
"finished_at": 1741846260,
"result": {
"path_exists": true,
"path_length": 3,
"path_nodes": ["comp-001", "comp-005", "comp-999"]
}
}
}
```
任务失败时 payload 附带错误信息:
```json
{
"code": 2000,
"msg": "query completed",
"payload": {
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"task_type": "TOPOLOGY_ANALYSIS",
"status": "FAILED",
"created_at": 1741846200,
"finished_at": 1741846210,
"error_code": 400102,
"error_message": "Component UUID not found",
"error_detail": {
"component_uuid": "550e8400-0000-0000-0000-000000000000"
}
}
}
```
**失败响应**
| 场景 | code | msg |
| :----------------- | :--- | :-------------------------- |
| 缺少 task_id | 4001 | task_id parameter is required |
| UUID 格式不合法 | 4001 | invalid task ID format |
| 任务不存在 | 404 | task not found |
| 数据库连接失败 | 5000 | database connection error |
**curl 示例**
```bash
curl "http://localhost:8080/task/async/123e4567-e89b-12d3-a456-426614174000"
```
---
### 4.4 取消异步任务
**POST** `/task/async/{task_id}/cancel`
取消指定任务。**仅 `SUBMITTED`(排队中)状态的任务可以被取消**,已开始执行(`RUNNING`)或已结束的任务无法取消。
取消后任务状态变为 `FAILED`,错误码 `40003`,错误信息 `task cancelled by user`
**路径参数**
| 参数 | 类型 | 必填 | 说明 |
| :-------- | :----- | :--- | :-------- |
| `task_id` | string | 是 | 任务 UUID |
**成功响应** `200`
```json
{
"code": 2000,
"msg": "task cancelled successfully"
}
```
**失败响应**
| 场景 | code | msg |
| :----------------------- | :--- | :--------------------------------------------------------- |
| 缺少 task_id | 400 | task_id parameter is required |
| UUID 格式不合法 | 400 | invalid task ID format |
| 任务不存在 | 404 | task not found |
| 任务已执行或已完成 | 400 | task cannot be cancelled (already running or completed) |
| 数据库连接失败 | 500 | database connection error |
| 取消操作失败 | 500 | failed to cancel task |
**curl 示例**
```bash
curl -X POST "http://localhost:8080/task/async/123e4567-e89b-12d3-a456-426614174000/cancel"
```
---
### 4.5 内部接口:更新任务进度
**POST** `/task/internal/async/progress`
由 Worker 内部调用更新任务执行进度0-100
**请求体**
```json
{
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"progress": 75
}
```
| 字段 | 类型 | 必填 | 说明 |
| :--------- | :----- | :--- | :---------------- |
| `task_id` | string | 是 | 任务 UUID |
| `progress` | int | 是 | 进度值,范围 0-100 |
**成功响应** `200`
```json
{
"code": 2000,
"msg": "task progress updated successfully",
"payload": null
}
```
---
### 4.6 内部接口:更新任务状态
**POST** `/task/internal/async/status`
由 Worker 内部调用,更新任务状态。当状态更新为 `COMPLETED``FAILED` 时,同步写入 `finished_at` 时间戳。
**请求体**
```json
{
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"status": "RUNNING",
"timestamp": 1741846205
}
```
| 字段 | 类型 | 必填 | 说明 |
| :---------- | :----- | :--- | :--------------------------------------- |
| `task_id` | string | 是 | 任务 UUID |
| `status` | string | 是 | 目标状态,枚举值见 §5.2 |
| `timestamp` | int64 | 是 | 状态变更时间戳Unix 秒) |
**成功响应** `200`
```json
{
"code": 2000,
"msg": "task status updated successfully",
"payload": null
}
```
---
## 5. 数据结构参考
### 5.1 任务类型task_type
| 枚举值 | 描述 |
| :--------------------- | :----------- |
| `TOPOLOGY_ANALYSIS` | 拓扑连通性分析 |
| `PERFORMANCE_ANALYSIS` | 性能分析 |
| `EVENT_ANALYSIS` | 事件分析 |
| `BATCH_IMPORT` | 批量数据导入 |
| `TEST` | 测试任务(系统验证用) |
### 5.2 任务状态status
| 枚举值 | 描述 | 可转换至 |
| :---------- | :----------------- | :------------------------------ |
| `SUBMITTED` | 已提交至队列 | `RUNNING`, `FAILED`(取消) |
| `RUNNING` | 正在执行 | `COMPLETED`, `FAILED` |
| `COMPLETED` | 执行成功 | — |
| `FAILED` | 执行失败或被取消 | — |
### 5.3 各任务类型的 params 结构
#### TOPOLOGY_ANALYSIS — 拓扑连通性分析
分析两个元件之间是否存在连通路径。
```json
{
"start_component_uuid": "550e8400-e29b-41d4-a716-446655440000",
"end_component_uuid": "550e8400-e29b-41d4-a716-446655440001",
"check_in_service": true
}
```
| 字段 | 类型 | 必填 | 说明 |
| :--------------------- | :------ | :--- | :------------------------------------- |
| `start_component_uuid` | string | 是 | 起始元件 UUID |
| `end_component_uuid` | string | 是 | 目标元件 UUID |
| `check_in_service` | boolean | 否 | 是否只检查投运状态元件,默认 `true` |
#### PERFORMANCE_ANALYSIS — 性能分析
```json
{
"component_ids": ["comp-001", "comp-002"],
"time_range": {
"start": "2026-03-01T00:00:00Z",
"end": "2026-03-02T00:00:00Z"
}
}
```
| 字段 | 类型 | 必填 | 说明 |
| :--------------- | :------- | :--- | :------------------ |
| `component_ids` | []string | 是 | 待分析的元件 ID 列表(至少 1 个) |
| `time_range` | object | 否 | 分析时间范围 |
#### EVENT_ANALYSIS — 事件分析
```json
{
"event_type": "MOTOR_START",
"start_time": "2026-03-01T00:00:00Z",
"end_time": "2026-03-02T00:00:00Z",
"components": ["comp-001", "comp-002"]
}
```
| 字段 | 类型 | 必填 | 说明 |
| :------------ | :------- | :--- | :------------------ |
| `event_type` | string | 是 | 事件类型 |
| `start_time` | string | 否 | 事件起始时间RFC3339 |
| `end_time` | string | 否 | 事件截止时间RFC3339 |
| `components` | []string | 否 | 关联的元件列表 |
#### BATCH_IMPORT — 批量导入
```json
{
"file_path": "/data/import/model.csv",
"file_type": "CSV",
"options": {
"overwrite": false,
"validate": true,
"notify_user": true
}
}
```
| 字段 | 类型 | 必填 | 说明 |
| :--------------------- | :------ | :--- | :-------------------------------- |
| `file_path` | string | 是 | 导入文件路径 |
| `file_type` | string | 否 | 文件类型:`CSV`, `JSON`, `XML` |
| `options.overwrite` | boolean | 否 | 是否覆盖已有数据,默认 `false` |
| `options.validate` | boolean | 否 | 是否校验数据,默认 `true` |
| `options.notify_user` | boolean | 否 | 完成后是否通知用户,默认 `true` |
#### TEST — 测试任务
```json
{
"sleep_duration": 30
}
```
| 字段 | 类型 | 必填 | 说明 |
| :--------------- | :--- | :--- | :---------------------------------------------- |
| `sleep_duration` | int | 否 | 模拟执行耗时(秒),默认 60最大 3600 |
### 5.4 任务结果对象AsyncTaskResult
| 字段 | 类型 | 说明 |
| :-------------- | :----- | :--------------------------------------------------- |
| `task_id` | string | 任务 UUID |
| `task_type` | string | 任务类型 |
| `status` | string | 当前状态 |
| `progress` | int | 进度0-100`RUNNING` 时返回 |
| `created_at` | int64 | 创建时间戳Unix 秒) |
| `finished_at` | int64 | 完成时间戳Unix 秒),仅 `COMPLETED`/`FAILED` 返回 |
| `result` | object | 任务结果,仅 `COMPLETED` 时返回 |
| `error_code` | int | 错误码,仅 `FAILED` 时返回 |
| `error_message` | string | 错误描述,仅 `FAILED` 时返回 |
| `error_detail` | object | 错误详情,仅 `FAILED` 时返回 |
---
## 6. 典型调用流程
```
创建任务
└─ POST /task/async
└─ 返回 task_id
轮询状态(建议间隔 2-5 秒)
└─ GET /task/async/{task_id}
├─ status=SUBMITTED → 继续等待
├─ status=RUNNING → 查看 progress
├─ status=COMPLETED → 读取 result 字段
└─ status=FAILED → 读取 error_code / error_message
如需中止(仅 SUBMITTED 状态有效)
└─ POST /task/async/{task_id}/cancel
```
---
## 7. 队列配置参考
| 配置项 | 值 |
| :------------- | :-------------------------- |
| Exchange | `modelrt.tasks.exchange` |
| Queue | `modelrt.tasks.queue` |
| Routing Key | `modelrt.task` |
| 优先级范围 | 010默认 5 |
| 消息 TTL | 24 小时 |
| 最大重试次数 | 3 次 |
| 重试初始延迟 | 1 秒(指数退避,最大 5 分钟)|
---
**文档版本**: 1.1
**最后更新**: 2026-04-28
**相关文档**: [异步任务系统设计文档](./async_task_system.md)

View File

@ -2,12 +2,11 @@
package handler
import (
"net/http"
"time"
"modelRT/constants"
"modelRT/database"
"modelRT/logger"
"modelRT/network"
"modelRT/orm"
"github.com/gin-gonic/gin"
@ -23,97 +22,65 @@ import (
// @Produce json
// @Param task_id path string true "任务ID"
// @Success 200 {object} network.SuccessResponse "任务取消成功"
// @Failure 400 {object} network.FailureResponse "请求参数错误或任务无法取消"
// @Failure 404 {object} network.FailureResponse "任务不存在"
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
// @Failure 200 {object} network.FailureResponse "请求参数错误或任务无法取消"
// @Router /task/async/{task_id}/cancel [post]
func AsyncTaskCancelHandler(c *gin.Context) {
ctx := c.Request.Context()
// Parse task ID from path parameter
taskIDStr := c.Param("task_id")
if taskIDStr == "" {
logger.Error(ctx, "task_id parameter is required")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "task_id parameter is required",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "task_id parameter is required", nil)
return
}
taskID, err := uuid.FromString(taskIDStr)
if err != nil {
logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task ID format",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "invalid task ID format", nil)
return
}
pgClient := database.GetPostgresDBClient()
if pgClient == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil)
return
}
// Query task from database
asyncTask, err := database.GetAsyncTaskByID(ctx, pgClient, taskID)
if err != nil {
if err == gorm.ErrRecordNotFound {
logger.Error(ctx, "async task not found", "task_id", taskID)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusNotFound,
Msg: "task not found",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "task not found", nil)
return
}
logger.Error(ctx, "failed to query async task from database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to query task",
})
renderRespFailure(c, constants.RespCodeServerError, "failed to query task", nil)
return
}
// Check if task can be cancelled (only SUBMITTED tasks can be cancelled)
if asyncTask.Status != orm.AsyncTaskStatusSubmitted {
logger.Error(ctx, "task cannot be cancelled", "task_id", taskID, "status", asyncTask.Status)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "task cannot be cancelled (already running or completed)",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "task cannot be cancelled, already running or completed", nil)
return
}
// Update task status to failed with cancellation reason
timestamp := time.Now().Unix()
err = database.FailAsyncTask(ctx, pgClient, taskID, timestamp)
if err != nil {
logger.Error(ctx, "failed to cancel async task", "task_id", taskID, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to cancel task",
})
renderRespFailure(c, constants.RespCodeServerError, "failed to cancel task", nil)
return
}
// Update task result with cancellation error
err = database.UpdateAsyncTaskResultWithError(ctx, pgClient, taskID, 40003, "task cancelled by user", orm.JSONMap{
err = database.UpdateAsyncTaskResultWithError(ctx, pgClient, taskID, 40009, "task cancelled by user", orm.JSONMap{
"cancelled_at": timestamp,
"cancelled_by": "user",
})
if err != nil {
logger.Error(ctx, "failed to update task result with cancellation error", "task_id", taskID, "error", err)
// Continue anyway since task is already marked as failed
}
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "task cancelled successfully",
})
renderRespSuccess(c, constants.RespCodeSuccess, "task cancelled successfully", nil)
}

View File

@ -2,8 +2,7 @@
package handler
import (
"net/http"
"modelRT/constants"
"modelRT/database"
"modelRT/logger"
"modelRT/network"
@ -18,37 +17,23 @@ func AsyncTaskProgressUpdateHandler(c *gin.Context) {
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(ctx, "failed to unmarshal async task progress update request", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid request parameters",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "invalid request parameters", nil)
return
}
pgClient := database.GetPostgresDBClient()
if pgClient == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil)
return
}
// Update task progress
err := database.UpdateAsyncTaskProgress(ctx, pgClient, request.TaskID, request.Progress)
if err != nil {
logger.Error(ctx, "failed to update async task progress", "task_id", request.TaskID, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to update task progress",
})
renderRespFailure(c, constants.RespCodeServerError, "failed to update task progress", nil)
return
}
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "task progress updated successfully",
Payload: nil,
})
renderRespSuccess(c, constants.RespCodeSuccess, "task progress updated successfully", nil)
}

View File

@ -2,8 +2,7 @@
package handler
import (
"net/http"
"modelRT/constants"
"modelRT/database"
"modelRT/logger"
"modelRT/network"
@ -21,75 +20,51 @@ import (
// @Produce json
// @Param task_id path string true "任务ID"
// @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskResult} "查询成功"
// @Failure 400 {object} network.FailureResponse "请求参数错误"
// @Failure 404 {object} network.FailureResponse "任务不存在"
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
// @Failure 200 {object} network.FailureResponse "请求参数错误"
// @Router /task/async/{task_id} [get]
func AsyncTaskResultDetailHandler(c *gin.Context) {
ctx := c.Request.Context()
// Parse task ID from path parameter
taskIDStr := c.Param("task_id")
if taskIDStr == "" {
logger.Error(ctx, "task_id parameter is required")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "task_id parameter is required",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "task_id parameter is required", nil)
return
}
taskID, err := uuid.FromString(taskIDStr)
if err != nil {
logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task ID format",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "invalid task ID format", nil)
return
}
pgClient := database.GetPostgresDBClient()
if pgClient == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil)
return
}
// Query task from database
asyncTask, err := database.GetAsyncTaskByID(ctx, pgClient, taskID)
if err != nil {
if err == gorm.ErrRecordNotFound {
logger.Error(ctx, "async task not found", "task_id", taskID)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusNotFound,
Msg: "task not found",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "task not found", nil)
return
}
logger.Error(ctx, "failed to query async task from database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to query task",
})
renderRespFailure(c, constants.RespCodeServerError, "failed to query task", nil)
return
}
// Query task result from database
taskResult, err := database.GetAsyncTaskResult(ctx, pgClient, taskID)
if err != nil {
logger.Error(ctx, "failed to query async task result from database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to query task result",
})
renderRespFailure(c, constants.RespCodeServerError, "failed to query task result", nil)
return
}
// Convert to response format
responseTask := network.AsyncTaskResult{
TaskID: asyncTask.TaskID,
TaskType: string(asyncTask.TaskType),
@ -99,7 +74,6 @@ func AsyncTaskResultDetailHandler(c *gin.Context) {
Progress: asyncTask.Progress,
}
// Add result or error information if available
if taskResult != nil {
if taskResult.Result != nil {
responseTask.Result = map[string]any(taskResult.Result)
@ -115,10 +89,5 @@ func AsyncTaskResultDetailHandler(c *gin.Context) {
}
}
// Return success response
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "query completed",
Payload: responseTask,
})
renderRespSuccess(c, constants.RespCodeSuccess, "query completed", responseTask)
}

View File

@ -2,9 +2,9 @@
package handler
import (
"net/http"
"strings"
"modelRT/constants"
"modelRT/database"
"modelRT/logger"
"modelRT/network"
@ -22,34 +22,25 @@ import (
// @Produce json
// @Param task_ids query string true "任务ID列表用逗号分隔"
// @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskResultQueryResponse} "查询成功"
// @Failure 400 {object} network.FailureResponse "请求参数错误"
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
// @Failure 200 {object} network.FailureResponse "请求参数错误"
// @Router /task/async/results [get]
func AsyncTaskResultQueryHandler(c *gin.Context) {
ctx := c.Request.Context()
// Parse task IDs from query parameter
taskIDsParam := c.Query("task_ids")
if taskIDsParam == "" {
logger.Error(ctx, "task_ids parameter is required")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "task_ids parameter is required",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "task_ids parameter is required", nil)
return
}
// Parse comma-separated task IDs
var taskIDs []uuid.UUID
taskIDStrs := splitCommaSeparated(taskIDsParam)
for _, taskIDStr := range taskIDStrs {
taskID, err := uuid.FromString(taskIDStr)
if err != nil {
logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task ID format",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "invalid task ID format", nil)
return
}
taskIDs = append(taskIDs, taskID)
@ -57,52 +48,36 @@ func AsyncTaskResultQueryHandler(c *gin.Context) {
if len(taskIDs) == 0 {
logger.Error(ctx, "no valid task IDs provided")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "no valid task IDs provided",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "no valid task IDs provided", nil)
return
}
pgClient := database.GetPostgresDBClient()
if pgClient == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil)
return
}
// Query tasks from database
asyncTasks, err := database.GetAsyncTasksByIDs(ctx, pgClient, taskIDs)
if err != nil {
logger.Error(ctx, "failed to query async tasks from database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to query tasks",
})
renderRespFailure(c, constants.RespCodeServerError, "failed to query tasks", nil)
return
}
// Query task results from database
taskResults, err := database.GetAsyncTaskResults(ctx, pgClient, taskIDs)
if err != nil {
logger.Error(ctx, "failed to query async task results from database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to query task results",
})
renderRespFailure(c, constants.RespCodeServerError, "failed to query task results", nil)
return
}
// Create a map of task results for easy lookup
taskResultMap := make(map[uuid.UUID]orm.AsyncTaskResult)
for _, result := range taskResults {
taskResultMap[result.TaskID] = result
}
// Convert to response format
var responseTasks []network.AsyncTaskResult
for _, asyncTask := range asyncTasks {
taskResult := network.AsyncTaskResult{
@ -114,7 +89,6 @@ func AsyncTaskResultQueryHandler(c *gin.Context) {
Progress: asyncTask.Progress,
}
// Add result or error information if available
if result, exists := taskResultMap[asyncTask.TaskID]; exists {
if result.Result != nil {
taskResult.Result = map[string]any(result.Result)
@ -133,14 +107,9 @@ func AsyncTaskResultQueryHandler(c *gin.Context) {
responseTasks = append(responseTasks, taskResult)
}
// Return success response
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "query completed",
Payload: network.AsyncTaskResultQueryResponse{
Total: len(responseTasks),
Tasks: responseTasks,
},
renderRespSuccess(c, constants.RespCodeSuccess, "query completed", network.AsyncTaskResultQueryResponse{
Total: len(responseTasks),
Tasks: responseTasks,
})
}

View File

@ -2,8 +2,7 @@
package handler
import (
"net/http"
"modelRT/constants"
"modelRT/database"
"modelRT/logger"
"modelRT/network"
@ -19,14 +18,10 @@ func AsyncTaskStatusUpdateHandler(c *gin.Context) {
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(ctx, "failed to unmarshal async task status update request", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid request parameters",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "invalid request parameters", nil)
return
}
// Validate status
validStatus := map[string]bool{
string(orm.AsyncTaskStatusSubmitted): true,
string(orm.AsyncTaskStatusRunning): true,
@ -36,36 +31,25 @@ func AsyncTaskStatusUpdateHandler(c *gin.Context) {
if !validStatus[request.Status] {
logger.Error(ctx, "invalid task status", "status", request.Status)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task status",
})
renderRespFailure(c, constants.RespCodeInvalidParams, "invalid task status", nil)
return
}
pgClient := database.GetPostgresDBClient()
if pgClient == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil)
return
}
// Update task status
status := orm.AsyncTaskStatus(request.Status)
err := database.UpdateAsyncTaskStatus(ctx, pgClient, request.TaskID, status)
if err != nil {
logger.Error(ctx, "failed to update async task status", "task_id", request.TaskID, "status", request.Status, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to update task status",
})
renderRespFailure(c, constants.RespCodeServerError, "failed to update task status", nil)
return
}
// If task is completed or failed, update finished_at timestamp
if request.Status == string(orm.AsyncTaskStatusCompleted) {
err = database.CompleteAsyncTask(ctx, pgClient, request.TaskID, request.Timestamp)
} else if request.Status == string(orm.AsyncTaskStatusFailed) {
@ -74,16 +58,9 @@ func AsyncTaskStatusUpdateHandler(c *gin.Context) {
if err != nil {
logger.Error(ctx, "failed to update async task completion timestamp", "task_id", request.TaskID, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to update task completion timestamp",
})
renderRespFailure(c, constants.RespCodeServerError, "failed to update task completion timestamp", nil)
return
}
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "task status updated successfully",
Payload: nil,
})
renderRespSuccess(c, constants.RespCodeSuccess, "task status updated successfully", nil)
}

View File

@ -9,8 +9,8 @@ import (
"gorm.io/gorm"
)
// TaskParams defines the interface for task-specific parameters
type TaskParams interface {
// Params defines the interface for task-specific parameters
type Params interface {
Validate() error
GetType() UnifiedTaskType
ToMap() map[string]interface{}
@ -20,12 +20,12 @@ type TaskParams interface {
// BaseTask provides common functionality for all task implementations
type BaseTask struct {
taskType UnifiedTaskType
params TaskParams
params Params
name string
}
// NewBaseTask creates a new BaseTask instance
func NewBaseTask(taskType UnifiedTaskType, params TaskParams, name string) *BaseTask {
func NewBaseTask(taskType UnifiedTaskType, params Params, name string) *BaseTask {
return &BaseTask{
taskType: taskType,
params: params,
@ -37,7 +37,7 @@ func (t *BaseTask) GetType() UnifiedTaskType {
return t.taskType
}
func (t *BaseTask) GetParams() TaskParams {
func (t *BaseTask) GetParams() Params {
return t.params
}

View File

@ -158,7 +158,6 @@ func (h *TopologyAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID,
// check the start node itself before BFS
if !inServiceMap[startComponentUUID] {
fmt.Println(11111)
return persistTopologyResult(ctx, db, taskID, startComponentUUID, endComponentUUID,
checkInService, false, nil, &startComponentUUID)
}
@ -221,7 +220,6 @@ func (h *TopologyAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID,
// parseTopologyAnalysisParams extracts and validates the three required fields.
// check_in_service defaults to true when absent.
func parseTopologyAnalysisParams(params map[string]any) (startID, endID uuid.UUID, checkInService bool, err error) {
fmt.Printf("params:%+v\n", params)
startStr, ok := params["start_component_uuid"].(string)
if !ok || startStr == "" {
err = fmt.Errorf("missing or invalid start_component_uuid")