add code of async task system

This commit is contained in:
douxu 2026-03-13 11:45:22 +08:00
parent 6e0d2186d8
commit adcc8c6c91
3 changed files with 427 additions and 0 deletions

View File

@ -0,0 +1,321 @@
// Package database define database operation functions
package database
import (
"context"
"time"
"modelRT/orm"
"github.com/gofrs/uuid"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// CreateAsyncTask creates a new async task in the database
func CreateAsyncTask(ctx context.Context, tx *gorm.DB, taskType orm.AsyncTaskType, params orm.JSONMap) (*orm.AsyncTask, error) {
taskID, err := uuid.NewV4()
if err != nil {
return nil, err
}
task := &orm.AsyncTask{
TaskID: taskID,
TaskType: taskType,
Status: orm.AsyncTaskStatusSubmitted,
Params: params,
CreatedAt: time.Now().Unix(),
}
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).Create(task)
if result.Error != nil {
return nil, result.Error
}
return task, nil
}
// GetAsyncTaskByID retrieves an async task by its ID
func GetAsyncTaskByID(ctx context.Context, tx *gorm.DB, taskID uuid.UUID) (*orm.AsyncTask, error) {
var task orm.AsyncTask
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("task_id = ?", taskID).
Clauses(clause.Locking{Strength: "UPDATE"}).
First(&task)
if result.Error != nil {
return nil, result.Error
}
return &task, nil
}
// GetAsyncTasksByIDs retrieves multiple async tasks by their IDs
func GetAsyncTasksByIDs(ctx context.Context, tx *gorm.DB, taskIDs []uuid.UUID) ([]orm.AsyncTask, error) {
var tasks []orm.AsyncTask
if len(taskIDs) == 0 {
return tasks, nil
}
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("task_id IN ?", taskIDs).
Clauses(clause.Locking{Strength: "UPDATE"}).
Find(&tasks)
if result.Error != nil {
return nil, result.Error
}
return tasks, nil
}
// UpdateAsyncTaskStatus updates the status of an async task
func UpdateAsyncTaskStatus(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, status orm.AsyncTaskStatus) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Model(&orm.AsyncTask{}).
Where("task_id = ?", taskID).
Update("status", status)
return result.Error
}
// UpdateAsyncTaskProgress updates the progress of an async task
func UpdateAsyncTaskProgress(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, progress int) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Model(&orm.AsyncTask{}).
Where("task_id = ?", taskID).
Update("progress", progress)
return result.Error
}
// CompleteAsyncTask marks an async task as completed with timestamp
func CompleteAsyncTask(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, timestamp int64) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Model(&orm.AsyncTask{}).
Where("task_id = ?", taskID).
Updates(map[string]any{
"status": orm.AsyncTaskStatusCompleted,
"finished_at": timestamp,
"progress": 100,
})
return result.Error
}
// FailAsyncTask marks an async task as failed with timestamp
func FailAsyncTask(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, timestamp int64) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Model(&orm.AsyncTask{}).
Where("task_id = ?", taskID).
Updates(map[string]any{
"status": orm.AsyncTaskStatusFailed,
"finished_at": timestamp,
})
return result.Error
}
// CreateAsyncTaskResult creates a result record for an async task
func CreateAsyncTaskResult(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, result orm.JSONMap) error {
taskResult := &orm.AsyncTaskResult{
TaskID: taskID,
Result: result,
}
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
resultOp := tx.WithContext(cancelCtx).Create(taskResult)
return resultOp.Error
}
// UpdateAsyncTaskResultWithError updates a task result with error information
func UpdateAsyncTaskResultWithError(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, code int, message string, detail orm.JSONMap) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// Update with error information
result := tx.WithContext(cancelCtx).
Model(&orm.AsyncTaskResult{}).
Where("task_id = ?", taskID).
Updates(map[string]any{
"error_code": code,
"error_message": message,
"error_detail": detail,
"result": nil,
})
return result.Error
}
// UpdateAsyncTaskResultWithSuccess updates a task result with success information
func UpdateAsyncTaskResultWithSuccess(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, result orm.JSONMap) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// First try to update existing record, if not found create new one
existingResult := tx.WithContext(cancelCtx).
Where("task_id = ?", taskID).
FirstOrCreate(&orm.AsyncTaskResult{TaskID: taskID})
if existingResult.Error != nil {
return existingResult.Error
}
// Update with success information
updateResult := tx.WithContext(cancelCtx).
Model(&orm.AsyncTaskResult{}).
Where("task_id = ?", taskID).
Updates(map[string]any{
"result": result,
"error_code": nil,
"error_message": nil,
"error_detail": nil,
})
return updateResult.Error
}
// GetAsyncTaskResult retrieves the result of an async task
func GetAsyncTaskResult(ctx context.Context, tx *gorm.DB, taskID uuid.UUID) (*orm.AsyncTaskResult, error) {
var taskResult orm.AsyncTaskResult
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("task_id = ?", taskID).
First(&taskResult)
if result.Error != nil {
if result.Error == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, result.Error
}
return &taskResult, nil
}
// GetAsyncTaskResults retrieves multiple task results by task IDs
func GetAsyncTaskResults(ctx context.Context, tx *gorm.DB, taskIDs []uuid.UUID) ([]orm.AsyncTaskResult, error) {
var taskResults []orm.AsyncTaskResult
if len(taskIDs) == 0 {
return taskResults, nil
}
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("task_id IN ?", taskIDs).
Find(&taskResults)
if result.Error != nil {
return nil, result.Error
}
return taskResults, nil
}
// GetPendingTasks retrieves pending tasks (submitted but not yet running/completed)
func GetPendingTasks(ctx context.Context, tx *gorm.DB, limit int) ([]orm.AsyncTask, error) {
var tasks []orm.AsyncTask
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("status = ?", orm.AsyncTaskStatusSubmitted).
Order("created_at ASC").
Limit(limit).
Find(&tasks)
if result.Error != nil {
return nil, result.Error
}
return tasks, nil
}
// GetTasksByStatus retrieves tasks by status
func GetTasksByStatus(ctx context.Context, tx *gorm.DB, status orm.AsyncTaskStatus, limit int) ([]orm.AsyncTask, error) {
var tasks []orm.AsyncTask
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("status = ?", status).
Order("created_at ASC").
Limit(limit).
Find(&tasks)
if result.Error != nil {
return nil, result.Error
}
return tasks, nil
}
// DeleteOldTasks deletes tasks older than the specified timestamp
func DeleteOldTasks(ctx context.Context, tx *gorm.DB, olderThan int64) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// First delete task results
result := tx.WithContext(cancelCtx).
Where("task_id IN (SELECT task_id FROM async_task WHERE created_at < ?)", olderThan).
Delete(&orm.AsyncTaskResult{})
if result.Error != nil {
return result.Error
}
// Then delete tasks
result = tx.WithContext(cancelCtx).
Where("created_at < ?", olderThan).
Delete(&orm.AsyncTask{})
return result.Error
}

View File

@ -5,6 +5,7 @@ import (
"sync"
"modelRT/logger"
"modelRT/orm"
"gorm.io/driver/postgres"
"gorm.io/gorm"
@ -34,5 +35,15 @@ func initPostgresDBClient(PostgresDBURI string) *gorm.DB {
if err != nil {
panic(err)
}
// Auto migrate async task tables
err = db.AutoMigrate(
&orm.AsyncTask{},
&orm.AsyncTaskResult{},
)
if err != nil {
panic(err)
}
return db
}

View File

@ -0,0 +1,95 @@
// Package network define struct of network operation
package network
import (
"time"
"github.com/gofrs/uuid"
)
// AsyncTaskCreateRequest defines the request structure for creating an asynchronous task
type AsyncTaskCreateRequest struct {
// required: true
// enum: TOPOLOGY_ANALYSIS, PERFORMANCE_ANALYSIS, EVENT_ANALYSIS, BATCH_IMPORT
TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"异步任务类型"`
// required: true
Params map[string]interface{} `json:"params" swaggertype:"object" description:"任务参数,根据任务类型不同而不同"`
}
// AsyncTaskCreateResponse defines the response structure for creating an asynchronous task
type AsyncTaskCreateResponse struct {
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
}
// AsyncTaskResultQueryRequest defines the request structure for querying task results
type AsyncTaskResultQueryRequest struct {
// required: true
TaskIDs []uuid.UUID `json:"task_ids" swaggertype:"array,string" example:"[\"123e4567-e89b-12d3-a456-426614174000\",\"223e4567-e89b-12d3-a456-426614174001\"]" description:"任务ID列表"`
}
// AsyncTaskResult defines the structure for a single task result
type AsyncTaskResult struct {
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"任务类型"`
Status string `json:"status" example:"COMPLETED" description:"任务状态SUBMITTED, RUNNING, COMPLETED, FAILED"`
Progress *int `json:"progress,omitempty" example:"65" description:"任务进度(0-100)仅当状态为RUNNING时返回"`
CreatedAt int64 `json:"created_at" example:"1741846200" description:"任务创建时间戳"`
FinishedAt *int64 `json:"finished_at,omitempty" example:"1741846205" description:"任务完成时间戳仅当状态为COMPLETED或FAILED时返回"`
Result map[string]interface{} `json:"result,omitempty" swaggertype:"object" description:"任务结果仅当状态为COMPLETED时返回"`
ErrorCode *int `json:"error_code,omitempty" example:"400102" description:"错误码仅当状态为FAILED时返回"`
ErrorMessage *string `json:"error_message,omitempty" example:"Component UUID not found" description:"错误信息仅当状态为FAILED时返回"`
ErrorDetail map[string]interface{} `json:"error_detail,omitempty" swaggertype:"object" description:"错误详情仅当状态为FAILED时返回"`
}
// AsyncTaskResultQueryResponse defines the response structure for querying task results
type AsyncTaskResultQueryResponse struct {
Total int `json:"total" example:"3" description:"查询的任务总数"`
Tasks []AsyncTaskResult `json:"tasks" description:"任务结果列表"`
}
// AsyncTaskProgressUpdate defines the structure for task progress update
type AsyncTaskProgressUpdate struct {
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
Progress int `json:"progress" example:"50" description:"任务进度(0-100)"`
}
// AsyncTaskStatusUpdate defines the structure for task status update
type AsyncTaskStatusUpdate struct {
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
Status string `json:"status" example:"RUNNING" description:"任务状态SUBMITTED, RUNNING, COMPLETED, FAILED"`
Timestamp int64 `json:"timestamp" example:"1741846205" description:"状态更新时间戳"`
}
// TopologyAnalysisParams defines the parameters for topology analysis task
type TopologyAnalysisParams struct {
StartUUID string `json:"start_uuid" example:"comp-001" description:"起始元件UUID"`
EndUUID string `json:"end_uuid" example:"comp-999" description:"目标元件UUID"`
}
// PerformanceAnalysisParams defines the parameters for performance analysis task
type PerformanceAnalysisParams struct {
ComponentIDs []string `json:"component_ids" example:"[\"comp-001\",\"comp-002\"]" description:"需要分析的元件ID列表"`
TimeRange struct {
Start time.Time `json:"start" example:"2026-03-01T00:00:00Z" description:"分析开始时间"`
End time.Time `json:"end" example:"2026-03-02T00:00:00Z" description:"分析结束时间"`
} `json:"time_range" description:"分析时间范围"`
}
// EventAnalysisParams defines the parameters for event analysis task
type EventAnalysisParams struct {
EventType string `json:"event_type" example:"MOTOR_START" description:"事件类型"`
StartTime time.Time `json:"start_time" example:"2026-03-01T00:00:00Z" description:"事件开始时间"`
EndTime time.Time `json:"end_time" example:"2026-03-02T00:00:00Z" description:"事件结束时间"`
Components []string `json:"components,omitempty" example:"[\"comp-001\",\"comp-002\"]" description:"关联的元件列表"`
}
// BatchImportParams defines the parameters for batch import task
type BatchImportParams struct {
FilePath string `json:"file_path" example:"/data/import/model.csv" description:"导入文件路径"`
FileType string `json:"file_type" example:"CSV" description:"文件类型CSV, JSON, XML"`
Options struct {
Overwrite bool `json:"overwrite" example:"false" description:"是否覆盖现有数据"`
Validate bool `json:"validate" example:"true" description:"是否进行数据验证"`
NotifyUser bool `json:"notify_user" example:"true" description:"是否通知用户"`
} `json:"options" description:"导入选项"`
}