Refactor async task system with unified task interfaces and add test task type

- Create task/types_v2.go with unified task type definitions and interfaces
    * Add UnifiedTaskType and UnifiedTaskStatus constants
    * Define Task:Params interface for parameter validation and serialization
    * Define UnifiedTask interface as base for all task implementations
    * Add BaseTask for common task functionality
This commit is contained in:
douxu 2026-04-14 17:00:30 +08:00
parent f8c0951a13
commit 4d5fcbc376
5 changed files with 319 additions and 1 deletions

View File

@ -395,6 +395,8 @@ func validateTaskParams(taskType string, params map[string]any) bool {
return validateEventAnalysisParams(params)
case string(orm.AsyncTaskTypeBatchImport):
return validateBatchImportParams(params)
case string(orm.AsyncTaskTypeTest):
return validateTestTaskParams(params)
default:
return false
}
@ -437,6 +439,12 @@ func validateBatchImportParams(params map[string]any) bool {
return true
}
func validateTestTaskParams(params map[string]any) bool {
// Test task has optional parameters, all are valid
// sleep_duration defaults to 60 seconds if not provided
return true
}
func splitCommaSeparated(s string) []string {
var result []string
var current strings.Builder

View File

@ -17,6 +17,8 @@ const (
AsyncTaskTypeEventAnalysis AsyncTaskType = "EVENT_ANALYSIS"
// AsyncTaskTypeBatchImport represents batch import task
AsyncTaskTypeBatchImport AsyncTaskType = "BATCH_IMPORT"
// AsyncTaskTypeTest represents test task for system verification
AsyncTaskTypeTest AsyncTaskType = "TEST"
)
// AsyncTaskStatus defines the status of asynchronous task
@ -119,7 +121,7 @@ func (a *AsyncTask) IsFailed() bool {
func IsValidAsyncTaskType(taskType string) bool {
switch AsyncTaskType(taskType) {
case AsyncTaskTypeTopologyAnalysis, AsyncTaskTypePerformanceAnalysis,
AsyncTaskTypeEventAnalysis, AsyncTaskTypeBatchImport:
AsyncTaskTypeEventAnalysis, AsyncTaskTypeBatchImport, AsyncTaskTypeTest:
return true
default:
return false

View File

@ -65,6 +65,7 @@ func (f *HandlerFactory) CreateDefaultHandlers() {
f.RegisterHandler(TypeTopologyAnalysis, &TopologyAnalysisHandler{})
f.RegisterHandler(TypeEventAnalysis, &EventAnalysisHandler{})
f.RegisterHandler(TypeBatchImport, &BatchImportHandler{})
f.RegisterHandler(TaskType(TaskTypeTest), NewTestTaskHandler())
}
// BaseHandler provides common functionality for all task handlers

169
task/test_task.go Normal file
View File

@ -0,0 +1,169 @@
// Package task provides test task implementation for system verification
package task
import (
"context"
"fmt"
"time"
"modelRT/database"
"modelRT/logger"
"modelRT/orm"
"github.com/gofrs/uuid"
"gorm.io/gorm"
)
// TestTaskParams defines parameters for test task
type TestTaskParams struct {
// SleepDuration specifies how long the task should sleep (in seconds)
// Default is 60 seconds as per requirement
SleepDuration int `json:"sleep_duration"`
// Message is a custom message to include in the result
Message string `json:"message,omitempty"`
}
// Validate checks if test task parameters are valid
func (p *TestTaskParams) Validate() error {
// Default to 60 seconds if not specified
if p.SleepDuration <= 0 {
p.SleepDuration = 60
}
// Validate max duration (max 1 hour)
if p.SleepDuration > 3600 {
return fmt.Errorf("sleep duration cannot exceed 3600 seconds (1 hour)")
}
return nil
}
// GetType returns the task type
func (p *TestTaskParams) GetType() UnifiedTaskType {
return TaskTypeTest
}
// ToMap converts parameters to map for database storage
func (p *TestTaskParams) ToMap() map[string]interface{} {
return map[string]interface{}{
"sleep_duration": p.SleepDuration,
"message": p.Message,
}
}
// FromMap populates parameters from map (for database retrieval)
func (p *TestTaskParams) FromMap(params map[string]interface{}) error {
if v, ok := params["sleep_duration"]; ok {
if duration, isFloat := v.(float64); isFloat {
p.SleepDuration = int(duration)
} else if duration, isInt := v.(int); isInt {
p.SleepDuration = duration
}
}
if v, ok := params["message"]; ok {
if msg, isString := v.(string); isString {
p.Message = msg
}
}
return nil
}
// TestTask implements a test task that sleeps for specified duration
// This task contains no I/O operations as per requirements
type TestTask struct {
*BaseTask
}
// NewTestTask creates a new TestTask instance
func NewTestTask(params TestTaskParams) *TestTask {
return &TestTask{
BaseTask: NewBaseTask(TaskTypeTest, &params, "test_task"),
}
}
// Execute performs the test task logic (sleep without I/O operations)
func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) error {
params, ok := t.GetParams().(*TestTaskParams)
if !ok {
return fmt.Errorf("invalid parameter type for TestTask")
}
logger.Info(ctx, "Starting test task execution",
"task_id", taskID,
"sleep_duration_seconds", params.SleepDuration,
"message", params.Message,
)
// Sleep for the specified duration without any I/O operations
// This is pure CPU-time wait as per requirements
sleepDuration := time.Duration(params.SleepDuration) * time.Second
time.Sleep(sleepDuration)
// Build result
result := map[string]interface{}{
"status": "completed",
"sleep_duration": params.SleepDuration,
"message": params.Message,
"executed_at": time.Now().Unix(),
"task_id": taskID.String(),
}
// Save result to database
if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil {
logger.Error(ctx, "Failed to save test task result",
"task_id", taskID,
"error", err,
)
return fmt.Errorf("failed to save task result: %w", err)
}
logger.Info(ctx, "Test task completed successfully",
"task_id", taskID,
"sleep_duration_seconds", params.SleepDuration,
)
return nil
}
// TestTaskHandler handles test task execution
type TestTaskHandler struct {
*BaseHandler
}
// NewTestTaskHandler creates a new TestTaskHandler
func NewTestTaskHandler() *TestTaskHandler {
return &TestTaskHandler{
BaseHandler: NewBaseHandler("test_task_handler"),
}
}
// Execute processes a test task using the unified task interface
func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error {
logger.Info(ctx, "Executing test task",
"task_id", taskID,
"task_type", taskType,
)
// Fetch task parameters from database
asyncTask, err := database.GetAsyncTaskByID(ctx, db, taskID)
if err != nil {
return fmt.Errorf("failed to fetch task: %w", err)
}
// Convert params map to TestTaskParams
params := &TestTaskParams{}
if err := params.FromMap(map[string]interface{}(asyncTask.Params)); err != nil {
return fmt.Errorf("failed to parse task params: %w", err)
}
// Create and execute test task
testTask := NewTestTask(*params)
return testTask.Execute(ctx, taskID, db)
}
// CanHandle returns true for test tasks
func (h *TestTaskHandler) CanHandle(taskType TaskType) bool {
return string(TaskTypeTest) == string(taskType)
}

138
task/types_v2.go Normal file
View File

@ -0,0 +1,138 @@
// Package task provides unified task type definitions and interfaces
package task
import (
"context"
"fmt"
"github.com/gofrs/uuid"
"gorm.io/gorm"
)
// UnifiedTaskType defines all async task types in a single location
type UnifiedTaskType string
const (
// TaskTypeTopologyAnalysis represents topology analysis task
TaskTypeTopologyAnalysis UnifiedTaskType = "TOPOLOGY_ANALYSIS"
// TaskTypePerformanceAnalysis represents performance analysis task
TaskTypePerformanceAnalysis UnifiedTaskType = "PERFORMANCE_ANALYSIS"
// TaskTypeEventAnalysis represents event analysis task
TaskTypeEventAnalysis UnifiedTaskType = "EVENT_ANALYSIS"
// TaskTypeBatchImport represents batch import task
TaskTypeBatchImport UnifiedTaskType = "BATCH_IMPORT"
// TaskTypeTest represents test task for system verification
TaskTypeTest UnifiedTaskType = "TEST"
)
// UnifiedTaskStatus defines task status constants
type UnifiedTaskStatus string
const (
// TaskStatusPending represents task waiting to be processed
TaskStatusPending UnifiedTaskStatus = "PENDING"
// TaskStatusRunning represents task currently executing
TaskStatusRunning UnifiedTaskStatus = "RUNNING"
// TaskStatusCompleted represents task finished successfully
TaskStatusCompleted UnifiedTaskStatus = "COMPLETED"
// TaskStatusFailed represents task failed with error
TaskStatusFailed UnifiedTaskStatus = "FAILED"
)
// TaskParams defines the interface for task-specific parameters
// All task types must implement this interface to provide their parameter structure
type TaskParams interface {
// Validate checks if the parameters are valid for this task type
Validate() error
// GetType returns the task type these parameters are for
GetType() UnifiedTaskType
// ToMap converts parameters to map for database storage
ToMap() map[string]interface{}
// FromMap populates parameters from map (for database retrieval)
FromMap(params map[string]interface{}) error
}
// UnifiedTask defines the base interface that all tasks must implement
// This provides a clean abstraction for task execution and management
type UnifiedTask interface {
// GetType returns the task type
GetType() UnifiedTaskType
// GetParams returns the task parameters
GetParams() TaskParams
// Execute performs the actual task logic
Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) error
// GetName returns a human-readable task name for logging
GetName() string
// Validate checks if the task is valid before execution
Validate() error
}
// BaseTask provides common functionality for all task implementations
type BaseTask struct {
taskType UnifiedTaskType
params TaskParams
name string
}
// NewBaseTask creates a new BaseTask instance
func NewBaseTask(taskType UnifiedTaskType, params TaskParams, name string) *BaseTask {
return &BaseTask{
taskType: taskType,
params: params,
name: name,
}
}
// GetType returns the task type
func (t *BaseTask) GetType() UnifiedTaskType {
return t.taskType
}
// GetParams returns the task parameters
func (t *BaseTask) GetParams() TaskParams {
return t.params
}
// GetName returns the task name
func (t *BaseTask) GetName() string {
return t.name
}
// Validate checks if the task is valid
func (t *BaseTask) Validate() error {
if t.params == nil {
return fmt.Errorf("task parameters cannot be nil")
}
if t.taskType != t.params.GetType() {
return fmt.Errorf("task type mismatch: expected %s, got %s", t.taskType, t.params.GetType())
}
return t.params.Validate()
}
// IsTaskType checks if a task type string is valid
func IsTaskType(taskType string) bool {
switch UnifiedTaskType(taskType) {
case TaskTypeTopologyAnalysis, TaskTypePerformanceAnalysis,
TaskTypeEventAnalysis, TaskTypeBatchImport, TaskTypeTest:
return true
default:
return false
}
}
// GetTaskTypes returns all registered task types
func GetTaskTypes() []UnifiedTaskType {
return []UnifiedTaskType{
TaskTypeTopologyAnalysis,
TaskTypePerformanceAnalysis,
TaskTypeEventAnalysis,
TaskTypeBatchImport,
TaskTypeTest,
}
}