diff --git a/common/errcode/bussiness_error.go b/common/errcode/bussiness_error.go index 08fcfea..6ab5999 100644 --- a/common/errcode/bussiness_error.go +++ b/common/errcode/bussiness_error.go @@ -46,4 +46,10 @@ var ( // ErrCacheQueryFailed define variable to indicates query cached data by token failed. ErrCacheQueryFailed = newError(60003, "query cached data by token failed") + + // ErrTaskNotFound indicates the async task with the given ID does not exist. + ErrTaskNotFound = newError(40008, "async task not found") + + // ErrTaskCannotCancel indicates the task is already running or completed and cannot be cancelled. + ErrTaskCannotCancel = newError(40009, "task cannot be cancelled, already running or completed") ) diff --git a/deploy/deploy.md b/deploy/deploy.md index 3364838..0080b23 100644 --- a/deploy/deploy.md +++ b/deploy/deploy.md @@ -136,7 +136,7 @@ VALUES 'ns1', 'tag1', 'component1', 'bus_1', '', 'grid1', 'zone1', 'station1', 1, -1, - false, + true, -1, -1, '{}', '{}', @@ -149,7 +149,7 @@ VALUES 'ns2', 'tag2', 'component2', 'bus_1', '', 'grid1', 'zone1', 'station1', 1, -1, - false, + true, -1, -1, '{}', '{}', @@ -162,13 +162,78 @@ VALUES 'ns3', 'tag3', 'component3', 'bus_1', '', 'grid1', 'zone1', 'station2', 2, -1, - false, + true, -1, -1, '{}', '{}', '{}', -1, CURRENT_TIMESTAMP +), +( + '70c190f2-8a60-42a9-b143-ec5f87e0aa6b', + 'ns4', 'tag4', 'component4', 'bus_1', '', + 'grid1', 'zone1', 'station1', 1, + -1, + true, + -1, -1, + '{}', + '{}', + '{}', + -1, + CURRENT_TIMESTAMP +), +( + '10f155cf-bd27-4557-85b2-d126b6e2657f', + 'ns5', 'tag5', 'component5', 'bus_1', '', + 'grid1', 'zone1', 'station1', 1, + -1, + true, + -1, -1, + '{}', + '{}', + '{}', + -1, + CURRENT_TIMESTAMP +), +( + 'e32bc0be-67f4-4d79-a5da-eaa40a5bd77d', + 'ns6', 'tag6', 'component6', 'bus_1', '', + 'grid1', 'zone1', 'station1', 1, + -1, + true, + -1, -1, + '{}', + '{}', + '{}', + -1, + CURRENT_TIMESTAMP +), +( + '70c190f2-8a75-42a9-b166-ec5f87e0aa6b', + 'ns7', 'tag7', 'component7', 'bus_1', '', + 'grid1', 'zone1', 'station1', 1, + -1, + true, + -1, -1, + '{}', + '{}', + '{}', + -1, + CURRENT_TIMESTAMP +), +( + '70c200f2-8a75-42a9-c166-bf5f87e0aa6b', + 'ns8', 'tag8', 'component8', 'bus_1', '', + 'grid1', 'zone1', 'station1', 1, + -1, + true, + -1, -1, + '{}', + '{}', + '{}', + -1, + CURRENT_TIMESTAMP ); INSERT INTO public.measurement (id, tag, name, type, size, data_source, event_plan, bay_uuid, component_uuid, op, ts) diff --git a/diagram/graph.go b/diagram/graph.go index 1d9ee6e..6c4970d 100644 --- a/diagram/graph.go +++ b/diagram/graph.go @@ -112,7 +112,6 @@ func (g *Graph) DelEdge(from, to uuid.UUID) error { return fmt.Errorf("delete edge failed: %w", err) } - fmt.Println("fromKeys:", fromKeys) for _, fromUUID := range fromKeys { fromKey := fromUUID.String() var delIndex int diff --git a/doc/async_task_api.md b/doc/async_task_api.md new file mode 100644 index 0000000..6a474ed --- /dev/null +++ b/doc/async_task_api.md @@ -0,0 +1,539 @@ +# ModelRT 异步任务 API 文档 + +## 1. 概述 + +ModelRT 异步任务系统基于 RabbitMQ 消息驱动,提供完整的任务生命周期管理。任务创建后进入队列,由后台 Worker 消费并执行,调用方可通过轮询接口获取进度和结果。 + +**Base URL**: `http://{host}:{port}`(默认 `localhost:8080`) + +**鉴权**: 公开接口需在 Header 中携带 `X-Service-Token`(由服务端启动时生成)。 + +--- + +## 2. API 端点总览 + +| 方法 | 路径 | 描述 | 权限 | +| :----- | :--------------------------------- | :--------------- | :----- | +| POST | `/task/async` | 创建异步任务 | 公开 | +| GET | `/task/async/results` | 批量查询任务结果 | 公开 | +| GET | `/task/async/{task_id}` | 查询单个任务详情 | 公开 | +| POST | `/task/async/{task_id}/cancel` | 取消异步任务 | 公开 | +| POST | `/task/internal/async/progress` | 更新任务进度 | 内部 | +| POST | `/task/internal/async/status` | 更新任务状态 | 内部 | + +--- + +## 3. 通用响应结构 + +### 成功响应 + +```json +{ + "code": 2000, + "msg": "success message", + "payload": {} +} +``` + +### 失败响应 + +```json +{ + "code": 4001, + "msg": "error description", + "payload": null +} +``` + +### 响应码说明 + +| code | 含义 | +| :--- | :--------------------- | +| 2000 | 成功 | +| 3000 | 处理失败 | +| 4001 | 请求参数无效 | +| 4002 | 未授权(Token 无效) | +| 5000 | 服务器内部错误 | + +--- + +## 4. 详细接口说明 + +### 4.1 创建异步任务 + +**POST** `/task/async` + +创建新的异步任务,任务进入队列等待 Worker 消费。返回 `task_id` 用于后续查询。 + +**请求头** + +``` +Content-Type: application/json +X-Service-Token: +``` + +**请求体** + +```json +{ + "task_type": "TOPOLOGY_ANALYSIS", + "params": { } +} +``` + +| 字段 | 类型 | 必填 | 说明 | +| :---------- | :----- | :--- | :-------------------------------------------------------------------------------- | +| `task_type` | string | 是 | 任务类型,枚举值见 §5.1 | +| `params` | object | 是 | 任务参数,不同任务类型的参数结构见 §5.3 | + +**成功响应** `200` + +```json +{ + "code": 2000, + "msg": "task created successfully", + "payload": { + "task_id": "123e4567-e89b-12d3-a456-426614174000" + } +} +``` + +**失败响应** + +| 场景 | code | msg | +| :----------------- | :--- | :-------------------------- | +| 参数格式错误 | 4001 | invalid request parameters | +| task_type 不合法 | 4001 | invalid task type | +| params 内容不合法 | 4001 | invalid task parameters | +| 数据库连接失败 | 5000 | database connection error | +| 任务写库失败 | 5000 | failed to create task | + +**curl 示例** + +```bash +curl -X POST "http://localhost:8080/task/async" \ + -H "Content-Type: application/json" \ + -H "X-Service-Token: " \ + -d '{ + "task_type": "TOPOLOGY_ANALYSIS", + "params": { + "start_component_uuid": "550e8400-e29b-41d4-a716-446655440000", + "end_component_uuid": "550e8400-e29b-41d4-a716-446655440001", + "check_in_service": true + } + }' +``` + +--- + +### 4.2 批量查询任务结果 + +**GET** `/task/async/results` + +根据一组任务 ID 批量查询状态和结果,适用于轮询多个任务。 + +**Query 参数** + +| 参数 | 类型 | 必填 | 说明 | +| :--------- | :----- | :--- | :----------------------------------- | +| `task_ids` | string | 是 | 逗号分隔的 UUID 列表,最少 1 个 | + +**请求示例** + +``` +GET /task/async/results?task_ids=123e4567-e89b-12d3-a456-426614174000,223e4567-e89b-12d3-a456-426614174001 +``` + +**成功响应** `200` + +```json +{ + "code": 2000, + "msg": "query completed", + "payload": { + "total": 2, + "tasks": [ + { + "task_id": "123e4567-e89b-12d3-a456-426614174000", + "task_type": "TOPOLOGY_ANALYSIS", + "status": "RUNNING", + "progress": 50, + "created_at": 1741846200 + }, + { + "task_id": "223e4567-e89b-12d3-a456-426614174001", + "task_type": "PERFORMANCE_ANALYSIS", + "status": "COMPLETED", + "progress": 100, + "created_at": 1741846200, + "finished_at": 1741846260, + "result": { + "components_analyzed": 5 + } + } + ] + } +} +``` + +**失败响应** + +| 场景 | code | msg | +| :----------------- | :--- | :-------------------------- | +| 缺少 task_ids | 4001 | task_ids parameter is required | +| UUID 格式不合法 | 4001 | invalid task ID format | +| 数据库连接失败 | 5000 | database connection error | +| 查询失败 | 5000 | failed to query tasks | + +**curl 示例** + +```bash +curl "http://localhost:8080/task/async/results?task_ids=123e4567-e89b-12d3-a456-426614174000" +``` + +--- + +### 4.3 查询单个任务详情 + +**GET** `/task/async/{task_id}` + +查询单个任务的完整信息,包含结果或错误详情。 + +**路径参数** + +| 参数 | 类型 | 必填 | 说明 | +| :-------- | :----- | :--- | :-------------- | +| `task_id` | string | 是 | 任务 UUID | + +**成功响应** `200` + +```json +{ + "code": 2000, + "msg": "query completed", + "payload": { + "task_id": "123e4567-e89b-12d3-a456-426614174000", + "task_type": "TOPOLOGY_ANALYSIS", + "status": "COMPLETED", + "progress": 100, + "created_at": 1741846200, + "finished_at": 1741846260, + "result": { + "path_exists": true, + "path_length": 3, + "path_nodes": ["comp-001", "comp-005", "comp-999"] + } + } +} +``` + +任务失败时 payload 附带错误信息: + +```json +{ + "code": 2000, + "msg": "query completed", + "payload": { + "task_id": "123e4567-e89b-12d3-a456-426614174000", + "task_type": "TOPOLOGY_ANALYSIS", + "status": "FAILED", + "created_at": 1741846200, + "finished_at": 1741846210, + "error_code": 400102, + "error_message": "Component UUID not found", + "error_detail": { + "component_uuid": "550e8400-0000-0000-0000-000000000000" + } + } +} +``` + +**失败响应** + +| 场景 | code | msg | +| :----------------- | :--- | :-------------------------- | +| 缺少 task_id | 4001 | task_id parameter is required | +| UUID 格式不合法 | 4001 | invalid task ID format | +| 任务不存在 | 404 | task not found | +| 数据库连接失败 | 5000 | database connection error | + +**curl 示例** + +```bash +curl "http://localhost:8080/task/async/123e4567-e89b-12d3-a456-426614174000" +``` + +--- + +### 4.4 取消异步任务 + +**POST** `/task/async/{task_id}/cancel` + +取消指定任务。**仅 `SUBMITTED`(排队中)状态的任务可以被取消**,已开始执行(`RUNNING`)或已结束的任务无法取消。 + +取消后任务状态变为 `FAILED`,错误码 `40003`,错误信息 `task cancelled by user`。 + +**路径参数** + +| 参数 | 类型 | 必填 | 说明 | +| :-------- | :----- | :--- | :-------- | +| `task_id` | string | 是 | 任务 UUID | + +**成功响应** `200` + +```json +{ + "code": 2000, + "msg": "task cancelled successfully" +} +``` + +**失败响应** + +| 场景 | code | msg | +| :----------------------- | :--- | :--------------------------------------------------------- | +| 缺少 task_id | 400 | task_id parameter is required | +| UUID 格式不合法 | 400 | invalid task ID format | +| 任务不存在 | 404 | task not found | +| 任务已执行或已完成 | 400 | task cannot be cancelled (already running or completed) | +| 数据库连接失败 | 500 | database connection error | +| 取消操作失败 | 500 | failed to cancel task | + +**curl 示例** + +```bash +curl -X POST "http://localhost:8080/task/async/123e4567-e89b-12d3-a456-426614174000/cancel" +``` + +--- + +### 4.5 内部接口:更新任务进度 + +**POST** `/task/internal/async/progress` + +由 Worker 内部调用,更新任务执行进度(0-100)。 + +**请求体** + +```json +{ + "task_id": "123e4567-e89b-12d3-a456-426614174000", + "progress": 75 +} +``` + +| 字段 | 类型 | 必填 | 说明 | +| :--------- | :----- | :--- | :---------------- | +| `task_id` | string | 是 | 任务 UUID | +| `progress` | int | 是 | 进度值,范围 0-100 | + +**成功响应** `200` + +```json +{ + "code": 2000, + "msg": "task progress updated successfully", + "payload": null +} +``` + +--- + +### 4.6 内部接口:更新任务状态 + +**POST** `/task/internal/async/status` + +由 Worker 内部调用,更新任务状态。当状态更新为 `COMPLETED` 或 `FAILED` 时,同步写入 `finished_at` 时间戳。 + +**请求体** + +```json +{ + "task_id": "123e4567-e89b-12d3-a456-426614174000", + "status": "RUNNING", + "timestamp": 1741846205 +} +``` + +| 字段 | 类型 | 必填 | 说明 | +| :---------- | :----- | :--- | :--------------------------------------- | +| `task_id` | string | 是 | 任务 UUID | +| `status` | string | 是 | 目标状态,枚举值见 §5.2 | +| `timestamp` | int64 | 是 | 状态变更时间戳(Unix 秒) | + +**成功响应** `200` + +```json +{ + "code": 2000, + "msg": "task status updated successfully", + "payload": null +} +``` + +--- + +## 5. 数据结构参考 + +### 5.1 任务类型(task_type) + +| 枚举值 | 描述 | +| :--------------------- | :----------- | +| `TOPOLOGY_ANALYSIS` | 拓扑连通性分析 | +| `PERFORMANCE_ANALYSIS` | 性能分析 | +| `EVENT_ANALYSIS` | 事件分析 | +| `BATCH_IMPORT` | 批量数据导入 | +| `TEST` | 测试任务(系统验证用) | + +### 5.2 任务状态(status) + +| 枚举值 | 描述 | 可转换至 | +| :---------- | :----------------- | :------------------------------ | +| `SUBMITTED` | 已提交至队列 | `RUNNING`, `FAILED`(取消) | +| `RUNNING` | 正在执行 | `COMPLETED`, `FAILED` | +| `COMPLETED` | 执行成功 | — | +| `FAILED` | 执行失败或被取消 | — | + +### 5.3 各任务类型的 params 结构 + +#### TOPOLOGY_ANALYSIS — 拓扑连通性分析 + +分析两个元件之间是否存在连通路径。 + +```json +{ + "start_component_uuid": "550e8400-e29b-41d4-a716-446655440000", + "end_component_uuid": "550e8400-e29b-41d4-a716-446655440001", + "check_in_service": true +} +``` + +| 字段 | 类型 | 必填 | 说明 | +| :--------------------- | :------ | :--- | :------------------------------------- | +| `start_component_uuid` | string | 是 | 起始元件 UUID | +| `end_component_uuid` | string | 是 | 目标元件 UUID | +| `check_in_service` | boolean | 否 | 是否只检查投运状态元件,默认 `true` | + +#### PERFORMANCE_ANALYSIS — 性能分析 + +```json +{ + "component_ids": ["comp-001", "comp-002"], + "time_range": { + "start": "2026-03-01T00:00:00Z", + "end": "2026-03-02T00:00:00Z" + } +} +``` + +| 字段 | 类型 | 必填 | 说明 | +| :--------------- | :------- | :--- | :------------------ | +| `component_ids` | []string | 是 | 待分析的元件 ID 列表(至少 1 个) | +| `time_range` | object | 否 | 分析时间范围 | + +#### EVENT_ANALYSIS — 事件分析 + +```json +{ + "event_type": "MOTOR_START", + "start_time": "2026-03-01T00:00:00Z", + "end_time": "2026-03-02T00:00:00Z", + "components": ["comp-001", "comp-002"] +} +``` + +| 字段 | 类型 | 必填 | 说明 | +| :------------ | :------- | :--- | :------------------ | +| `event_type` | string | 是 | 事件类型 | +| `start_time` | string | 否 | 事件起始时间(RFC3339) | +| `end_time` | string | 否 | 事件截止时间(RFC3339) | +| `components` | []string | 否 | 关联的元件列表 | + +#### BATCH_IMPORT — 批量导入 + +```json +{ + "file_path": "/data/import/model.csv", + "file_type": "CSV", + "options": { + "overwrite": false, + "validate": true, + "notify_user": true + } +} +``` + +| 字段 | 类型 | 必填 | 说明 | +| :--------------------- | :------ | :--- | :-------------------------------- | +| `file_path` | string | 是 | 导入文件路径 | +| `file_type` | string | 否 | 文件类型:`CSV`, `JSON`, `XML` | +| `options.overwrite` | boolean | 否 | 是否覆盖已有数据,默认 `false` | +| `options.validate` | boolean | 否 | 是否校验数据,默认 `true` | +| `options.notify_user` | boolean | 否 | 完成后是否通知用户,默认 `true` | + +#### TEST — 测试任务 + +```json +{ + "sleep_duration": 30 +} +``` + +| 字段 | 类型 | 必填 | 说明 | +| :--------------- | :--- | :--- | :---------------------------------------------- | +| `sleep_duration` | int | 否 | 模拟执行耗时(秒),默认 60,最大 3600 | + +### 5.4 任务结果对象(AsyncTaskResult) + +| 字段 | 类型 | 说明 | +| :-------------- | :----- | :--------------------------------------------------- | +| `task_id` | string | 任务 UUID | +| `task_type` | string | 任务类型 | +| `status` | string | 当前状态 | +| `progress` | int | 进度(0-100),仅 `RUNNING` 时返回 | +| `created_at` | int64 | 创建时间戳(Unix 秒) | +| `finished_at` | int64 | 完成时间戳(Unix 秒),仅 `COMPLETED`/`FAILED` 返回 | +| `result` | object | 任务结果,仅 `COMPLETED` 时返回 | +| `error_code` | int | 错误码,仅 `FAILED` 时返回 | +| `error_message` | string | 错误描述,仅 `FAILED` 时返回 | +| `error_detail` | object | 错误详情,仅 `FAILED` 时返回 | + +--- + +## 6. 典型调用流程 + +``` +创建任务 + └─ POST /task/async + └─ 返回 task_id + +轮询状态(建议间隔 2-5 秒) + └─ GET /task/async/{task_id} + ├─ status=SUBMITTED → 继续等待 + ├─ status=RUNNING → 查看 progress + ├─ status=COMPLETED → 读取 result 字段 + └─ status=FAILED → 读取 error_code / error_message + +如需中止(仅 SUBMITTED 状态有效) + └─ POST /task/async/{task_id}/cancel +``` + +--- + +## 7. 队列配置参考 + +| 配置项 | 值 | +| :------------- | :-------------------------- | +| Exchange | `modelrt.tasks.exchange` | +| Queue | `modelrt.tasks.queue` | +| Routing Key | `modelrt.task` | +| 优先级范围 | 0–10(默认 5) | +| 消息 TTL | 24 小时 | +| 最大重试次数 | 3 次 | +| 重试初始延迟 | 1 秒(指数退避,最大 5 分钟)| + +--- + +**文档版本**: 1.1 +**最后更新**: 2026-04-28 +**相关文档**: [异步任务系统设计文档](./async_task_system.md) diff --git a/handler/async_task_cancel_handler.go b/handler/async_task_cancel_handler.go index 9b99b52..6fa0bc3 100644 --- a/handler/async_task_cancel_handler.go +++ b/handler/async_task_cancel_handler.go @@ -2,12 +2,11 @@ package handler import ( - "net/http" "time" + "modelRT/constants" "modelRT/database" "modelRT/logger" - "modelRT/network" "modelRT/orm" "github.com/gin-gonic/gin" @@ -23,97 +22,65 @@ import ( // @Produce json // @Param task_id path string true "任务ID" // @Success 200 {object} network.SuccessResponse "任务取消成功" -// @Failure 400 {object} network.FailureResponse "请求参数错误或任务无法取消" -// @Failure 404 {object} network.FailureResponse "任务不存在" -// @Failure 500 {object} network.FailureResponse "服务器内部错误" +// @Failure 200 {object} network.FailureResponse "请求参数错误或任务无法取消" // @Router /task/async/{task_id}/cancel [post] func AsyncTaskCancelHandler(c *gin.Context) { ctx := c.Request.Context() - // Parse task ID from path parameter taskIDStr := c.Param("task_id") if taskIDStr == "" { logger.Error(ctx, "task_id parameter is required") - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "task_id parameter is required", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "task_id parameter is required", nil) return } taskID, err := uuid.FromString(taskIDStr) if err != nil { logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "invalid task ID format", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "invalid task ID format", nil) return } pgClient := database.GetPostgresDBClient() if pgClient == nil { logger.Error(ctx, "database connection not found in context") - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "database connection error", - }) + renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil) return } - // Query task from database asyncTask, err := database.GetAsyncTaskByID(ctx, pgClient, taskID) if err != nil { if err == gorm.ErrRecordNotFound { logger.Error(ctx, "async task not found", "task_id", taskID) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusNotFound, - Msg: "task not found", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "task not found", nil) return } logger.Error(ctx, "failed to query async task from database", "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "failed to query task", - }) + renderRespFailure(c, constants.RespCodeServerError, "failed to query task", nil) return } - // Check if task can be cancelled (only SUBMITTED tasks can be cancelled) if asyncTask.Status != orm.AsyncTaskStatusSubmitted { logger.Error(ctx, "task cannot be cancelled", "task_id", taskID, "status", asyncTask.Status) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "task cannot be cancelled (already running or completed)", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "task cannot be cancelled, already running or completed", nil) return } - // Update task status to failed with cancellation reason timestamp := time.Now().Unix() err = database.FailAsyncTask(ctx, pgClient, taskID, timestamp) if err != nil { logger.Error(ctx, "failed to cancel async task", "task_id", taskID, "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "failed to cancel task", - }) + renderRespFailure(c, constants.RespCodeServerError, "failed to cancel task", nil) return } - // Update task result with cancellation error - err = database.UpdateAsyncTaskResultWithError(ctx, pgClient, taskID, 40003, "task cancelled by user", orm.JSONMap{ + err = database.UpdateAsyncTaskResultWithError(ctx, pgClient, taskID, 40009, "task cancelled by user", orm.JSONMap{ "cancelled_at": timestamp, "cancelled_by": "user", }) if err != nil { logger.Error(ctx, "failed to update task result with cancellation error", "task_id", taskID, "error", err) - // Continue anyway since task is already marked as failed } - c.JSON(http.StatusOK, network.SuccessResponse{ - Code: 2000, - Msg: "task cancelled successfully", - }) + renderRespSuccess(c, constants.RespCodeSuccess, "task cancelled successfully", nil) } diff --git a/handler/async_task_progress_update_handler.go b/handler/async_task_progress_update_handler.go index ad3fe9b..08276ce 100644 --- a/handler/async_task_progress_update_handler.go +++ b/handler/async_task_progress_update_handler.go @@ -2,8 +2,7 @@ package handler import ( - "net/http" - + "modelRT/constants" "modelRT/database" "modelRT/logger" "modelRT/network" @@ -18,37 +17,23 @@ func AsyncTaskProgressUpdateHandler(c *gin.Context) { if err := c.ShouldBindJSON(&request); err != nil { logger.Error(ctx, "failed to unmarshal async task progress update request", "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "invalid request parameters", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "invalid request parameters", nil) return } pgClient := database.GetPostgresDBClient() if pgClient == nil { logger.Error(ctx, "database connection not found in context") - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "database connection error", - }) + renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil) return } - // Update task progress err := database.UpdateAsyncTaskProgress(ctx, pgClient, request.TaskID, request.Progress) if err != nil { logger.Error(ctx, "failed to update async task progress", "task_id", request.TaskID, "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "failed to update task progress", - }) + renderRespFailure(c, constants.RespCodeServerError, "failed to update task progress", nil) return } - c.JSON(http.StatusOK, network.SuccessResponse{ - Code: 2000, - Msg: "task progress updated successfully", - Payload: nil, - }) + renderRespSuccess(c, constants.RespCodeSuccess, "task progress updated successfully", nil) } diff --git a/handler/async_task_result_detail_handler.go b/handler/async_task_result_detail_handler.go index 1494945..2d4174a 100644 --- a/handler/async_task_result_detail_handler.go +++ b/handler/async_task_result_detail_handler.go @@ -2,8 +2,7 @@ package handler import ( - "net/http" - + "modelRT/constants" "modelRT/database" "modelRT/logger" "modelRT/network" @@ -21,75 +20,51 @@ import ( // @Produce json // @Param task_id path string true "任务ID" // @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskResult} "查询成功" -// @Failure 400 {object} network.FailureResponse "请求参数错误" -// @Failure 404 {object} network.FailureResponse "任务不存在" -// @Failure 500 {object} network.FailureResponse "服务器内部错误" +// @Failure 200 {object} network.FailureResponse "请求参数错误" // @Router /task/async/{task_id} [get] func AsyncTaskResultDetailHandler(c *gin.Context) { ctx := c.Request.Context() - // Parse task ID from path parameter taskIDStr := c.Param("task_id") if taskIDStr == "" { logger.Error(ctx, "task_id parameter is required") - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "task_id parameter is required", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "task_id parameter is required", nil) return } taskID, err := uuid.FromString(taskIDStr) if err != nil { logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "invalid task ID format", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "invalid task ID format", nil) return } pgClient := database.GetPostgresDBClient() if pgClient == nil { logger.Error(ctx, "database connection not found in context") - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "database connection error", - }) + renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil) return } - // Query task from database asyncTask, err := database.GetAsyncTaskByID(ctx, pgClient, taskID) if err != nil { if err == gorm.ErrRecordNotFound { logger.Error(ctx, "async task not found", "task_id", taskID) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusNotFound, - Msg: "task not found", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "task not found", nil) return } logger.Error(ctx, "failed to query async task from database", "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "failed to query task", - }) + renderRespFailure(c, constants.RespCodeServerError, "failed to query task", nil) return } - // Query task result from database taskResult, err := database.GetAsyncTaskResult(ctx, pgClient, taskID) if err != nil { logger.Error(ctx, "failed to query async task result from database", "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "failed to query task result", - }) + renderRespFailure(c, constants.RespCodeServerError, "failed to query task result", nil) return } - // Convert to response format responseTask := network.AsyncTaskResult{ TaskID: asyncTask.TaskID, TaskType: string(asyncTask.TaskType), @@ -99,7 +74,6 @@ func AsyncTaskResultDetailHandler(c *gin.Context) { Progress: asyncTask.Progress, } - // Add result or error information if available if taskResult != nil { if taskResult.Result != nil { responseTask.Result = map[string]any(taskResult.Result) @@ -115,10 +89,5 @@ func AsyncTaskResultDetailHandler(c *gin.Context) { } } - // Return success response - c.JSON(http.StatusOK, network.SuccessResponse{ - Code: 2000, - Msg: "query completed", - Payload: responseTask, - }) + renderRespSuccess(c, constants.RespCodeSuccess, "query completed", responseTask) } diff --git a/handler/async_task_result_query_handler.go b/handler/async_task_result_query_handler.go index 12c7d67..e9b08ae 100644 --- a/handler/async_task_result_query_handler.go +++ b/handler/async_task_result_query_handler.go @@ -2,9 +2,9 @@ package handler import ( - "net/http" "strings" + "modelRT/constants" "modelRT/database" "modelRT/logger" "modelRT/network" @@ -22,34 +22,25 @@ import ( // @Produce json // @Param task_ids query string true "任务ID列表,用逗号分隔" // @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskResultQueryResponse} "查询成功" -// @Failure 400 {object} network.FailureResponse "请求参数错误" -// @Failure 500 {object} network.FailureResponse "服务器内部错误" +// @Failure 200 {object} network.FailureResponse "请求参数错误" // @Router /task/async/results [get] func AsyncTaskResultQueryHandler(c *gin.Context) { ctx := c.Request.Context() - // Parse task IDs from query parameter taskIDsParam := c.Query("task_ids") if taskIDsParam == "" { logger.Error(ctx, "task_ids parameter is required") - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "task_ids parameter is required", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "task_ids parameter is required", nil) return } - // Parse comma-separated task IDs var taskIDs []uuid.UUID taskIDStrs := splitCommaSeparated(taskIDsParam) for _, taskIDStr := range taskIDStrs { taskID, err := uuid.FromString(taskIDStr) if err != nil { logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "invalid task ID format", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "invalid task ID format", nil) return } taskIDs = append(taskIDs, taskID) @@ -57,52 +48,36 @@ func AsyncTaskResultQueryHandler(c *gin.Context) { if len(taskIDs) == 0 { logger.Error(ctx, "no valid task IDs provided") - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "no valid task IDs provided", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "no valid task IDs provided", nil) return } pgClient := database.GetPostgresDBClient() if pgClient == nil { logger.Error(ctx, "database connection not found in context") - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "database connection error", - }) + renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil) return } - // Query tasks from database asyncTasks, err := database.GetAsyncTasksByIDs(ctx, pgClient, taskIDs) if err != nil { logger.Error(ctx, "failed to query async tasks from database", "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "failed to query tasks", - }) + renderRespFailure(c, constants.RespCodeServerError, "failed to query tasks", nil) return } - // Query task results from database taskResults, err := database.GetAsyncTaskResults(ctx, pgClient, taskIDs) if err != nil { logger.Error(ctx, "failed to query async task results from database", "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "failed to query task results", - }) + renderRespFailure(c, constants.RespCodeServerError, "failed to query task results", nil) return } - // Create a map of task results for easy lookup taskResultMap := make(map[uuid.UUID]orm.AsyncTaskResult) for _, result := range taskResults { taskResultMap[result.TaskID] = result } - // Convert to response format var responseTasks []network.AsyncTaskResult for _, asyncTask := range asyncTasks { taskResult := network.AsyncTaskResult{ @@ -114,7 +89,6 @@ func AsyncTaskResultQueryHandler(c *gin.Context) { Progress: asyncTask.Progress, } - // Add result or error information if available if result, exists := taskResultMap[asyncTask.TaskID]; exists { if result.Result != nil { taskResult.Result = map[string]any(result.Result) @@ -133,14 +107,9 @@ func AsyncTaskResultQueryHandler(c *gin.Context) { responseTasks = append(responseTasks, taskResult) } - // Return success response - c.JSON(http.StatusOK, network.SuccessResponse{ - Code: 2000, - Msg: "query completed", - Payload: network.AsyncTaskResultQueryResponse{ - Total: len(responseTasks), - Tasks: responseTasks, - }, + renderRespSuccess(c, constants.RespCodeSuccess, "query completed", network.AsyncTaskResultQueryResponse{ + Total: len(responseTasks), + Tasks: responseTasks, }) } diff --git a/handler/async_task_status_update_handler.go b/handler/async_task_status_update_handler.go index caeab2b..daf3c72 100644 --- a/handler/async_task_status_update_handler.go +++ b/handler/async_task_status_update_handler.go @@ -2,8 +2,7 @@ package handler import ( - "net/http" - + "modelRT/constants" "modelRT/database" "modelRT/logger" "modelRT/network" @@ -19,14 +18,10 @@ func AsyncTaskStatusUpdateHandler(c *gin.Context) { if err := c.ShouldBindJSON(&request); err != nil { logger.Error(ctx, "failed to unmarshal async task status update request", "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "invalid request parameters", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "invalid request parameters", nil) return } - // Validate status validStatus := map[string]bool{ string(orm.AsyncTaskStatusSubmitted): true, string(orm.AsyncTaskStatusRunning): true, @@ -36,36 +31,25 @@ func AsyncTaskStatusUpdateHandler(c *gin.Context) { if !validStatus[request.Status] { logger.Error(ctx, "invalid task status", "status", request.Status) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "invalid task status", - }) + renderRespFailure(c, constants.RespCodeInvalidParams, "invalid task status", nil) return } pgClient := database.GetPostgresDBClient() if pgClient == nil { logger.Error(ctx, "database connection not found in context") - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "database connection error", - }) + renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil) return } - // Update task status status := orm.AsyncTaskStatus(request.Status) err := database.UpdateAsyncTaskStatus(ctx, pgClient, request.TaskID, status) if err != nil { logger.Error(ctx, "failed to update async task status", "task_id", request.TaskID, "status", request.Status, "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "failed to update task status", - }) + renderRespFailure(c, constants.RespCodeServerError, "failed to update task status", nil) return } - // If task is completed or failed, update finished_at timestamp if request.Status == string(orm.AsyncTaskStatusCompleted) { err = database.CompleteAsyncTask(ctx, pgClient, request.TaskID, request.Timestamp) } else if request.Status == string(orm.AsyncTaskStatusFailed) { @@ -74,16 +58,9 @@ func AsyncTaskStatusUpdateHandler(c *gin.Context) { if err != nil { logger.Error(ctx, "failed to update async task completion timestamp", "task_id", request.TaskID, "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "failed to update task completion timestamp", - }) + renderRespFailure(c, constants.RespCodeServerError, "failed to update task completion timestamp", nil) return } - c.JSON(http.StatusOK, network.SuccessResponse{ - Code: 2000, - Msg: "task status updated successfully", - Payload: nil, - }) + renderRespSuccess(c, constants.RespCodeSuccess, "task status updated successfully", nil) } diff --git a/task/base_task.go b/task/base_task.go index 271d043..c49b897 100644 --- a/task/base_task.go +++ b/task/base_task.go @@ -9,8 +9,8 @@ import ( "gorm.io/gorm" ) -// TaskParams defines the interface for task-specific parameters -type TaskParams interface { +// Params defines the interface for task-specific parameters +type Params interface { Validate() error GetType() UnifiedTaskType ToMap() map[string]interface{} @@ -20,12 +20,12 @@ type TaskParams interface { // BaseTask provides common functionality for all task implementations type BaseTask struct { taskType UnifiedTaskType - params TaskParams + params Params name string } // NewBaseTask creates a new BaseTask instance -func NewBaseTask(taskType UnifiedTaskType, params TaskParams, name string) *BaseTask { +func NewBaseTask(taskType UnifiedTaskType, params Params, name string) *BaseTask { return &BaseTask{ taskType: taskType, params: params, @@ -37,7 +37,7 @@ func (t *BaseTask) GetType() UnifiedTaskType { return t.taskType } -func (t *BaseTask) GetParams() TaskParams { +func (t *BaseTask) GetParams() Params { return t.params } diff --git a/task/handler_factory.go b/task/handler_factory.go index 7fcc174..b705e27 100644 --- a/task/handler_factory.go +++ b/task/handler_factory.go @@ -158,7 +158,6 @@ func (h *TopologyAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, // check the start node itself before BFS if !inServiceMap[startComponentUUID] { - fmt.Println(11111) return persistTopologyResult(ctx, db, taskID, startComponentUUID, endComponentUUID, checkInService, false, nil, &startComponentUUID) } @@ -221,7 +220,6 @@ func (h *TopologyAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, // parseTopologyAnalysisParams extracts and validates the three required fields. // check_in_service defaults to true when absent. func parseTopologyAnalysisParams(params map[string]any) (startID, endID uuid.UUID, checkInService bool, err error) { - fmt.Printf("params:%+v\n", params) startStr, ok := params["start_component_uuid"].(string) if !ok || startStr == "" { err = fmt.Errorf("missing or invalid start_component_uuid")