// Package database define database operation functions package database import ( "context" "time" "modelRT/orm" "github.com/gofrs/uuid" "gorm.io/gorm" "gorm.io/gorm/clause" ) // CreateAsyncTask creates a new async task in the database func CreateAsyncTask(ctx context.Context, tx *gorm.DB, taskType orm.AsyncTaskType, params orm.JSONMap) (*orm.AsyncTask, error) { taskID, err := uuid.NewV4() if err != nil { return nil, err } task := &orm.AsyncTask{ TaskID: taskID, TaskType: taskType, Status: orm.AsyncTaskStatusSubmitted, Params: params, CreatedAt: time.Now().Unix(), } // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx).Create(task) if result.Error != nil { return nil, result.Error } return task, nil } // GetAsyncTaskByID retrieves an async task by its ID func GetAsyncTaskByID(ctx context.Context, tx *gorm.DB, taskID uuid.UUID) (*orm.AsyncTask, error) { var task orm.AsyncTask // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Where("task_id = ?", taskID). Clauses(clause.Locking{Strength: "UPDATE"}). First(&task) if result.Error != nil { return nil, result.Error } return &task, nil } // GetAsyncTasksByIDs retrieves multiple async tasks by their IDs func GetAsyncTasksByIDs(ctx context.Context, tx *gorm.DB, taskIDs []uuid.UUID) ([]orm.AsyncTask, error) { var tasks []orm.AsyncTask if len(taskIDs) == 0 { return tasks, nil } // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Where("task_id IN ?", taskIDs). Clauses(clause.Locking{Strength: "UPDATE"}). Find(&tasks) if result.Error != nil { return nil, result.Error } return tasks, nil } // UpdateAsyncTaskStatus updates the status of an async task func UpdateAsyncTaskStatus(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, status orm.AsyncTaskStatus) error { // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Model(&orm.AsyncTask{}). Where("task_id = ?", taskID). Update("status", status) return result.Error } // UpdateAsyncTaskProgress updates the progress of an async task func UpdateAsyncTaskProgress(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, progress int) error { // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Model(&orm.AsyncTask{}). Where("task_id = ?", taskID). Update("progress", progress) return result.Error } // CompleteAsyncTask marks an async task as completed with timestamp func CompleteAsyncTask(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, timestamp int64) error { // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Model(&orm.AsyncTask{}). Where("task_id = ?", taskID). Updates(map[string]any{ "status": orm.AsyncTaskStatusCompleted, "finished_at": timestamp, "progress": 100, }) return result.Error } // FailAsyncTask marks an async task as failed with timestamp func FailAsyncTask(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, timestamp int64) error { // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Model(&orm.AsyncTask{}). Where("task_id = ?", taskID). Updates(map[string]any{ "status": orm.AsyncTaskStatusFailed, "finished_at": timestamp, }) return result.Error } // CreateAsyncTaskResult creates a result record for an async task func CreateAsyncTaskResult(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, result orm.JSONMap) error { taskResult := &orm.AsyncTaskResult{ TaskID: taskID, Result: result, } // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() resultOp := tx.WithContext(cancelCtx).Create(taskResult) return resultOp.Error } // UpdateAsyncTaskResultWithError updates a task result with error information func UpdateAsyncTaskResultWithError(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, code int, message string, detail orm.JSONMap) error { // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() // Update with error information result := tx.WithContext(cancelCtx). Model(&orm.AsyncTaskResult{}). Where("task_id = ?", taskID). Updates(map[string]any{ "error_code": code, "error_message": message, "error_detail": detail, "result": nil, }) return result.Error } // UpdateAsyncTaskResultWithSuccess updates a task result with success information func UpdateAsyncTaskResultWithSuccess(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, result orm.JSONMap) error { // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() // First try to update existing record, if not found create new one existingResult := tx.WithContext(cancelCtx). Where("task_id = ?", taskID). FirstOrCreate(&orm.AsyncTaskResult{TaskID: taskID}) if existingResult.Error != nil { return existingResult.Error } // Update with success information updateResult := tx.WithContext(cancelCtx). Model(&orm.AsyncTaskResult{}). Where("task_id = ?", taskID). Updates(map[string]any{ "result": result, "error_code": nil, "error_message": nil, "error_detail": nil, }) return updateResult.Error } // GetAsyncTaskResult retrieves the result of an async task func GetAsyncTaskResult(ctx context.Context, tx *gorm.DB, taskID uuid.UUID) (*orm.AsyncTaskResult, error) { var taskResult orm.AsyncTaskResult // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Where("task_id = ?", taskID). First(&taskResult) if result.Error != nil { if result.Error == gorm.ErrRecordNotFound { return nil, nil } return nil, result.Error } return &taskResult, nil } // GetAsyncTaskResults retrieves multiple task results by task IDs func GetAsyncTaskResults(ctx context.Context, tx *gorm.DB, taskIDs []uuid.UUID) ([]orm.AsyncTaskResult, error) { var taskResults []orm.AsyncTaskResult if len(taskIDs) == 0 { return taskResults, nil } // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Where("task_id IN ?", taskIDs). Find(&taskResults) if result.Error != nil { return nil, result.Error } return taskResults, nil } // GetPendingTasks retrieves pending tasks (submitted but not yet running/completed) func GetPendingTasks(ctx context.Context, tx *gorm.DB, limit int) ([]orm.AsyncTask, error) { var tasks []orm.AsyncTask // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Where("status = ?", orm.AsyncTaskStatusSubmitted). Order("created_at ASC"). Limit(limit). Find(&tasks) if result.Error != nil { return nil, result.Error } return tasks, nil } // GetTasksByStatus retrieves tasks by status func GetTasksByStatus(ctx context.Context, tx *gorm.DB, status orm.AsyncTaskStatus, limit int) ([]orm.AsyncTask, error) { var tasks []orm.AsyncTask // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Where("status = ?", status). Order("created_at ASC"). Limit(limit). Find(&tasks) if result.Error != nil { return nil, result.Error } return tasks, nil } // DeleteOldTasks deletes tasks older than the specified timestamp func DeleteOldTasks(ctx context.Context, tx *gorm.DB, olderThan int64) error { // ctx timeout judgment cancelCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() // First delete task results result := tx.WithContext(cancelCtx). Where("task_id IN (SELECT task_id FROM async_task WHERE created_at < ?)", olderThan). Delete(&orm.AsyncTaskResult{}) if result.Error != nil { return result.Error } // Then delete tasks result = tx.WithContext(cancelCtx). Where("created_at < ?", olderThan). Delete(&orm.AsyncTask{}) return result.Error }