From adcc8c6c915a4142add8af5ae12ca709f09a94fa Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 13 Mar 2026 11:45:22 +0800 Subject: [PATCH] add code of async task system --- database/async_task_operations.go | 321 ++++++++++++++++++++++++++++++ database/postgres_init.go | 11 + network/async_task_request.go | 95 +++++++++ 3 files changed, 427 insertions(+) create mode 100644 database/async_task_operations.go create mode 100644 network/async_task_request.go diff --git a/database/async_task_operations.go b/database/async_task_operations.go new file mode 100644 index 0000000..991e77b --- /dev/null +++ b/database/async_task_operations.go @@ -0,0 +1,321 @@ +// Package database define database operation functions +package database + +import ( + "context" + "time" + + "modelRT/orm" + + "github.com/gofrs/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +// CreateAsyncTask creates a new async task in the database +func CreateAsyncTask(ctx context.Context, tx *gorm.DB, taskType orm.AsyncTaskType, params orm.JSONMap) (*orm.AsyncTask, error) { + taskID, err := uuid.NewV4() + if err != nil { + return nil, err + } + + task := &orm.AsyncTask{ + TaskID: taskID, + TaskType: taskType, + Status: orm.AsyncTaskStatusSubmitted, + Params: params, + CreatedAt: time.Now().Unix(), + } + + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx).Create(task) + if result.Error != nil { + return nil, result.Error + } + + return task, nil +} + +// GetAsyncTaskByID retrieves an async task by its ID +func GetAsyncTaskByID(ctx context.Context, tx *gorm.DB, taskID uuid.UUID) (*orm.AsyncTask, error) { + var task orm.AsyncTask + + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Where("task_id = ?", taskID). + Clauses(clause.Locking{Strength: "UPDATE"}). + First(&task) + + if result.Error != nil { + return nil, result.Error + } + + return &task, nil +} + +// GetAsyncTasksByIDs retrieves multiple async tasks by their IDs +func GetAsyncTasksByIDs(ctx context.Context, tx *gorm.DB, taskIDs []uuid.UUID) ([]orm.AsyncTask, error) { + var tasks []orm.AsyncTask + + if len(taskIDs) == 0 { + return tasks, nil + } + + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Where("task_id IN ?", taskIDs). + Clauses(clause.Locking{Strength: "UPDATE"}). + Find(&tasks) + + if result.Error != nil { + return nil, result.Error + } + + return tasks, nil +} + +// UpdateAsyncTaskStatus updates the status of an async task +func UpdateAsyncTaskStatus(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, status orm.AsyncTaskStatus) error { + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Model(&orm.AsyncTask{}). + Where("task_id = ?", taskID). + Update("status", status) + + return result.Error +} + +// UpdateAsyncTaskProgress updates the progress of an async task +func UpdateAsyncTaskProgress(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, progress int) error { + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Model(&orm.AsyncTask{}). + Where("task_id = ?", taskID). + Update("progress", progress) + + return result.Error +} + +// CompleteAsyncTask marks an async task as completed with timestamp +func CompleteAsyncTask(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, timestamp int64) error { + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Model(&orm.AsyncTask{}). + Where("task_id = ?", taskID). + Updates(map[string]any{ + "status": orm.AsyncTaskStatusCompleted, + "finished_at": timestamp, + "progress": 100, + }) + + return result.Error +} + +// FailAsyncTask marks an async task as failed with timestamp +func FailAsyncTask(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, timestamp int64) error { + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Model(&orm.AsyncTask{}). + Where("task_id = ?", taskID). + Updates(map[string]any{ + "status": orm.AsyncTaskStatusFailed, + "finished_at": timestamp, + }) + + return result.Error +} + +// CreateAsyncTaskResult creates a result record for an async task +func CreateAsyncTaskResult(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, result orm.JSONMap) error { + taskResult := &orm.AsyncTaskResult{ + TaskID: taskID, + Result: result, + } + + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + resultOp := tx.WithContext(cancelCtx).Create(taskResult) + return resultOp.Error +} + +// UpdateAsyncTaskResultWithError updates a task result with error information +func UpdateAsyncTaskResultWithError(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, code int, message string, detail orm.JSONMap) error { + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + // Update with error information + result := tx.WithContext(cancelCtx). + Model(&orm.AsyncTaskResult{}). + Where("task_id = ?", taskID). + Updates(map[string]any{ + "error_code": code, + "error_message": message, + "error_detail": detail, + "result": nil, + }) + + return result.Error +} + +// UpdateAsyncTaskResultWithSuccess updates a task result with success information +func UpdateAsyncTaskResultWithSuccess(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, result orm.JSONMap) error { + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + // First try to update existing record, if not found create new one + existingResult := tx.WithContext(cancelCtx). + Where("task_id = ?", taskID). + FirstOrCreate(&orm.AsyncTaskResult{TaskID: taskID}) + + if existingResult.Error != nil { + return existingResult.Error + } + + // Update with success information + updateResult := tx.WithContext(cancelCtx). + Model(&orm.AsyncTaskResult{}). + Where("task_id = ?", taskID). + Updates(map[string]any{ + "result": result, + "error_code": nil, + "error_message": nil, + "error_detail": nil, + }) + + return updateResult.Error +} + +// GetAsyncTaskResult retrieves the result of an async task +func GetAsyncTaskResult(ctx context.Context, tx *gorm.DB, taskID uuid.UUID) (*orm.AsyncTaskResult, error) { + var taskResult orm.AsyncTaskResult + + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Where("task_id = ?", taskID). + First(&taskResult) + + if result.Error != nil { + if result.Error == gorm.ErrRecordNotFound { + return nil, nil + } + return nil, result.Error + } + + return &taskResult, nil +} + +// GetAsyncTaskResults retrieves multiple task results by task IDs +func GetAsyncTaskResults(ctx context.Context, tx *gorm.DB, taskIDs []uuid.UUID) ([]orm.AsyncTaskResult, error) { + var taskResults []orm.AsyncTaskResult + + if len(taskIDs) == 0 { + return taskResults, nil + } + + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Where("task_id IN ?", taskIDs). + Find(&taskResults) + + if result.Error != nil { + return nil, result.Error + } + + return taskResults, nil +} + +// GetPendingTasks retrieves pending tasks (submitted but not yet running/completed) +func GetPendingTasks(ctx context.Context, tx *gorm.DB, limit int) ([]orm.AsyncTask, error) { + var tasks []orm.AsyncTask + + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Where("status = ?", orm.AsyncTaskStatusSubmitted). + Order("created_at ASC"). + Limit(limit). + Find(&tasks) + + if result.Error != nil { + return nil, result.Error + } + + return tasks, nil +} + +// GetTasksByStatus retrieves tasks by status +func GetTasksByStatus(ctx context.Context, tx *gorm.DB, status orm.AsyncTaskStatus, limit int) ([]orm.AsyncTask, error) { + var tasks []orm.AsyncTask + + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Where("status = ?", status). + Order("created_at ASC"). + Limit(limit). + Find(&tasks) + + if result.Error != nil { + return nil, result.Error + } + + return tasks, nil +} + +// DeleteOldTasks deletes tasks older than the specified timestamp +func DeleteOldTasks(ctx context.Context, tx *gorm.DB, olderThan int64) error { + // ctx timeout judgment + cancelCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + // First delete task results + result := tx.WithContext(cancelCtx). + Where("task_id IN (SELECT task_id FROM async_task WHERE created_at < ?)", olderThan). + Delete(&orm.AsyncTaskResult{}) + + if result.Error != nil { + return result.Error + } + + // Then delete tasks + result = tx.WithContext(cancelCtx). + Where("created_at < ?", olderThan). + Delete(&orm.AsyncTask{}) + + return result.Error +} \ No newline at end of file diff --git a/database/postgres_init.go b/database/postgres_init.go index fdba0ee..d2babd4 100644 --- a/database/postgres_init.go +++ b/database/postgres_init.go @@ -5,6 +5,7 @@ import ( "sync" "modelRT/logger" + "modelRT/orm" "gorm.io/driver/postgres" "gorm.io/gorm" @@ -34,5 +35,15 @@ func initPostgresDBClient(PostgresDBURI string) *gorm.DB { if err != nil { panic(err) } + + // Auto migrate async task tables + err = db.AutoMigrate( + &orm.AsyncTask{}, + &orm.AsyncTaskResult{}, + ) + if err != nil { + panic(err) + } + return db } diff --git a/network/async_task_request.go b/network/async_task_request.go new file mode 100644 index 0000000..9534b76 --- /dev/null +++ b/network/async_task_request.go @@ -0,0 +1,95 @@ +// Package network define struct of network operation +package network + +import ( + "time" + + "github.com/gofrs/uuid" +) + +// AsyncTaskCreateRequest defines the request structure for creating an asynchronous task +type AsyncTaskCreateRequest struct { + // required: true + // enum: TOPOLOGY_ANALYSIS, PERFORMANCE_ANALYSIS, EVENT_ANALYSIS, BATCH_IMPORT + TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"异步任务类型"` + // required: true + Params map[string]interface{} `json:"params" swaggertype:"object" description:"任务参数,根据任务类型不同而不同"` +} + +// AsyncTaskCreateResponse defines the response structure for creating an asynchronous task +type AsyncTaskCreateResponse struct { + TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"` +} + +// AsyncTaskResultQueryRequest defines the request structure for querying task results +type AsyncTaskResultQueryRequest struct { + // required: true + TaskIDs []uuid.UUID `json:"task_ids" swaggertype:"array,string" example:"[\"123e4567-e89b-12d3-a456-426614174000\",\"223e4567-e89b-12d3-a456-426614174001\"]" description:"任务ID列表"` +} + +// AsyncTaskResult defines the structure for a single task result +type AsyncTaskResult struct { + TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"` + TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"任务类型"` + Status string `json:"status" example:"COMPLETED" description:"任务状态:SUBMITTED, RUNNING, COMPLETED, FAILED"` + Progress *int `json:"progress,omitempty" example:"65" description:"任务进度(0-100),仅当状态为RUNNING时返回"` + CreatedAt int64 `json:"created_at" example:"1741846200" description:"任务创建时间戳"` + FinishedAt *int64 `json:"finished_at,omitempty" example:"1741846205" description:"任务完成时间戳,仅当状态为COMPLETED或FAILED时返回"` + Result map[string]interface{} `json:"result,omitempty" swaggertype:"object" description:"任务结果,仅当状态为COMPLETED时返回"` + ErrorCode *int `json:"error_code,omitempty" example:"400102" description:"错误码,仅当状态为FAILED时返回"` + ErrorMessage *string `json:"error_message,omitempty" example:"Component UUID not found" description:"错误信息,仅当状态为FAILED时返回"` + ErrorDetail map[string]interface{} `json:"error_detail,omitempty" swaggertype:"object" description:"错误详情,仅当状态为FAILED时返回"` +} + +// AsyncTaskResultQueryResponse defines the response structure for querying task results +type AsyncTaskResultQueryResponse struct { + Total int `json:"total" example:"3" description:"查询的任务总数"` + Tasks []AsyncTaskResult `json:"tasks" description:"任务结果列表"` +} + +// AsyncTaskProgressUpdate defines the structure for task progress update +type AsyncTaskProgressUpdate struct { + TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"` + Progress int `json:"progress" example:"50" description:"任务进度(0-100)"` +} + +// AsyncTaskStatusUpdate defines the structure for task status update +type AsyncTaskStatusUpdate struct { + TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"` + Status string `json:"status" example:"RUNNING" description:"任务状态:SUBMITTED, RUNNING, COMPLETED, FAILED"` + Timestamp int64 `json:"timestamp" example:"1741846205" description:"状态更新时间戳"` +} + +// TopologyAnalysisParams defines the parameters for topology analysis task +type TopologyAnalysisParams struct { + StartUUID string `json:"start_uuid" example:"comp-001" description:"起始元件UUID"` + EndUUID string `json:"end_uuid" example:"comp-999" description:"目标元件UUID"` +} + +// PerformanceAnalysisParams defines the parameters for performance analysis task +type PerformanceAnalysisParams struct { + ComponentIDs []string `json:"component_ids" example:"[\"comp-001\",\"comp-002\"]" description:"需要分析的元件ID列表"` + TimeRange struct { + Start time.Time `json:"start" example:"2026-03-01T00:00:00Z" description:"分析开始时间"` + End time.Time `json:"end" example:"2026-03-02T00:00:00Z" description:"分析结束时间"` + } `json:"time_range" description:"分析时间范围"` +} + +// EventAnalysisParams defines the parameters for event analysis task +type EventAnalysisParams struct { + EventType string `json:"event_type" example:"MOTOR_START" description:"事件类型"` + StartTime time.Time `json:"start_time" example:"2026-03-01T00:00:00Z" description:"事件开始时间"` + EndTime time.Time `json:"end_time" example:"2026-03-02T00:00:00Z" description:"事件结束时间"` + Components []string `json:"components,omitempty" example:"[\"comp-001\",\"comp-002\"]" description:"关联的元件列表"` +} + +// BatchImportParams defines the parameters for batch import task +type BatchImportParams struct { + FilePath string `json:"file_path" example:"/data/import/model.csv" description:"导入文件路径"` + FileType string `json:"file_type" example:"CSV" description:"文件类型:CSV, JSON, XML"` + Options struct { + Overwrite bool `json:"overwrite" example:"false" description:"是否覆盖现有数据"` + Validate bool `json:"validate" example:"true" description:"是否进行数据验证"` + NotifyUser bool `json:"notify_user" example:"true" description:"是否通知用户"` + } `json:"options" description:"导入选项"` +}