diff --git a/handler/async_task_handler.go b/handler/async_task_handler.go index 2d7c3e9..a711177 100644 --- a/handler/async_task_handler.go +++ b/handler/async_task_handler.go @@ -395,6 +395,8 @@ func validateTaskParams(taskType string, params map[string]any) bool { return validateEventAnalysisParams(params) case string(orm.AsyncTaskTypeBatchImport): return validateBatchImportParams(params) + case string(orm.AsyncTaskTypeTest): + return validateTestTaskParams(params) default: return false } @@ -437,6 +439,12 @@ func validateBatchImportParams(params map[string]any) bool { return true } +func validateTestTaskParams(params map[string]any) bool { + // Test task has optional parameters, all are valid + // sleep_duration defaults to 60 seconds if not provided + return true +} + func splitCommaSeparated(s string) []string { var result []string var current strings.Builder diff --git a/orm/async_task.go b/orm/async_task.go index 40e312d..37709bf 100644 --- a/orm/async_task.go +++ b/orm/async_task.go @@ -17,6 +17,8 @@ const ( AsyncTaskTypeEventAnalysis AsyncTaskType = "EVENT_ANALYSIS" // AsyncTaskTypeBatchImport represents batch import task AsyncTaskTypeBatchImport AsyncTaskType = "BATCH_IMPORT" + // AsyncTaskTypeTest represents test task for system verification + AsyncTaskTypeTest AsyncTaskType = "TEST" ) // AsyncTaskStatus defines the status of asynchronous task @@ -119,7 +121,7 @@ func (a *AsyncTask) IsFailed() bool { func IsValidAsyncTaskType(taskType string) bool { switch AsyncTaskType(taskType) { case AsyncTaskTypeTopologyAnalysis, AsyncTaskTypePerformanceAnalysis, - AsyncTaskTypeEventAnalysis, AsyncTaskTypeBatchImport: + AsyncTaskTypeEventAnalysis, AsyncTaskTypeBatchImport, AsyncTaskTypeTest: return true default: return false diff --git a/task/handler_factory.go b/task/handler_factory.go index 9f6b7c2..c472fef 100644 --- a/task/handler_factory.go +++ b/task/handler_factory.go @@ -65,6 +65,7 @@ func (f *HandlerFactory) CreateDefaultHandlers() { f.RegisterHandler(TypeTopologyAnalysis, &TopologyAnalysisHandler{}) f.RegisterHandler(TypeEventAnalysis, &EventAnalysisHandler{}) f.RegisterHandler(TypeBatchImport, &BatchImportHandler{}) + f.RegisterHandler(TaskType(TaskTypeTest), NewTestTaskHandler()) } // BaseHandler provides common functionality for all task handlers diff --git a/task/test_task.go b/task/test_task.go new file mode 100644 index 0000000..2d60bd4 --- /dev/null +++ b/task/test_task.go @@ -0,0 +1,169 @@ +// Package task provides test task implementation for system verification +package task + +import ( + "context" + "fmt" + "time" + + "modelRT/database" + "modelRT/logger" + "modelRT/orm" + + "github.com/gofrs/uuid" + "gorm.io/gorm" +) + +// TestTaskParams defines parameters for test task +type TestTaskParams struct { + // SleepDuration specifies how long the task should sleep (in seconds) + // Default is 60 seconds as per requirement + SleepDuration int `json:"sleep_duration"` + // Message is a custom message to include in the result + Message string `json:"message,omitempty"` +} + +// Validate checks if test task parameters are valid +func (p *TestTaskParams) Validate() error { + // Default to 60 seconds if not specified + if p.SleepDuration <= 0 { + p.SleepDuration = 60 + } + + // Validate max duration (max 1 hour) + if p.SleepDuration > 3600 { + return fmt.Errorf("sleep duration cannot exceed 3600 seconds (1 hour)") + } + + return nil +} + +// GetType returns the task type +func (p *TestTaskParams) GetType() UnifiedTaskType { + return TaskTypeTest +} + +// ToMap converts parameters to map for database storage +func (p *TestTaskParams) ToMap() map[string]interface{} { + return map[string]interface{}{ + "sleep_duration": p.SleepDuration, + "message": p.Message, + } +} + +// FromMap populates parameters from map (for database retrieval) +func (p *TestTaskParams) FromMap(params map[string]interface{}) error { + if v, ok := params["sleep_duration"]; ok { + if duration, isFloat := v.(float64); isFloat { + p.SleepDuration = int(duration) + } else if duration, isInt := v.(int); isInt { + p.SleepDuration = duration + } + } + + if v, ok := params["message"]; ok { + if msg, isString := v.(string); isString { + p.Message = msg + } + } + + return nil +} + +// TestTask implements a test task that sleeps for specified duration +// This task contains no I/O operations as per requirements +type TestTask struct { + *BaseTask +} + +// NewTestTask creates a new TestTask instance +func NewTestTask(params TestTaskParams) *TestTask { + return &TestTask{ + BaseTask: NewBaseTask(TaskTypeTest, ¶ms, "test_task"), + } +} + +// Execute performs the test task logic (sleep without I/O operations) +func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) error { + params, ok := t.GetParams().(*TestTaskParams) + if !ok { + return fmt.Errorf("invalid parameter type for TestTask") + } + + logger.Info(ctx, "Starting test task execution", + "task_id", taskID, + "sleep_duration_seconds", params.SleepDuration, + "message", params.Message, + ) + + // Sleep for the specified duration without any I/O operations + // This is pure CPU-time wait as per requirements + sleepDuration := time.Duration(params.SleepDuration) * time.Second + time.Sleep(sleepDuration) + + // Build result + result := map[string]interface{}{ + "status": "completed", + "sleep_duration": params.SleepDuration, + "message": params.Message, + "executed_at": time.Now().Unix(), + "task_id": taskID.String(), + } + + // Save result to database + if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil { + logger.Error(ctx, "Failed to save test task result", + "task_id", taskID, + "error", err, + ) + return fmt.Errorf("failed to save task result: %w", err) + } + + logger.Info(ctx, "Test task completed successfully", + "task_id", taskID, + "sleep_duration_seconds", params.SleepDuration, + ) + + return nil +} + +// TestTaskHandler handles test task execution +type TestTaskHandler struct { + *BaseHandler +} + +// NewTestTaskHandler creates a new TestTaskHandler +func NewTestTaskHandler() *TestTaskHandler { + return &TestTaskHandler{ + BaseHandler: NewBaseHandler("test_task_handler"), + } +} + +// Execute processes a test task using the unified task interface +func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error { + logger.Info(ctx, "Executing test task", + "task_id", taskID, + "task_type", taskType, + ) + + // Fetch task parameters from database + asyncTask, err := database.GetAsyncTaskByID(ctx, db, taskID) + if err != nil { + return fmt.Errorf("failed to fetch task: %w", err) + } + + // Convert params map to TestTaskParams + params := &TestTaskParams{} + if err := params.FromMap(map[string]interface{}(asyncTask.Params)); err != nil { + return fmt.Errorf("failed to parse task params: %w", err) + } + + // Create and execute test task + testTask := NewTestTask(*params) + return testTask.Execute(ctx, taskID, db) +} + +// CanHandle returns true for test tasks +func (h *TestTaskHandler) CanHandle(taskType TaskType) bool { + return string(TaskTypeTest) == string(taskType) +} diff --git a/task/types_v2.go b/task/types_v2.go new file mode 100644 index 0000000..aed3558 --- /dev/null +++ b/task/types_v2.go @@ -0,0 +1,138 @@ +// Package task provides unified task type definitions and interfaces +package task + +import ( + "context" + "fmt" + + "github.com/gofrs/uuid" + "gorm.io/gorm" +) + +// UnifiedTaskType defines all async task types in a single location +type UnifiedTaskType string + +const ( + // TaskTypeTopologyAnalysis represents topology analysis task + TaskTypeTopologyAnalysis UnifiedTaskType = "TOPOLOGY_ANALYSIS" + // TaskTypePerformanceAnalysis represents performance analysis task + TaskTypePerformanceAnalysis UnifiedTaskType = "PERFORMANCE_ANALYSIS" + // TaskTypeEventAnalysis represents event analysis task + TaskTypeEventAnalysis UnifiedTaskType = "EVENT_ANALYSIS" + // TaskTypeBatchImport represents batch import task + TaskTypeBatchImport UnifiedTaskType = "BATCH_IMPORT" + // TaskTypeTest represents test task for system verification + TaskTypeTest UnifiedTaskType = "TEST" +) + +// UnifiedTaskStatus defines task status constants +type UnifiedTaskStatus string + +const ( + // TaskStatusPending represents task waiting to be processed + TaskStatusPending UnifiedTaskStatus = "PENDING" + // TaskStatusRunning represents task currently executing + TaskStatusRunning UnifiedTaskStatus = "RUNNING" + // TaskStatusCompleted represents task finished successfully + TaskStatusCompleted UnifiedTaskStatus = "COMPLETED" + // TaskStatusFailed represents task failed with error + TaskStatusFailed UnifiedTaskStatus = "FAILED" +) + +// TaskParams defines the interface for task-specific parameters +// All task types must implement this interface to provide their parameter structure +type TaskParams interface { + // Validate checks if the parameters are valid for this task type + Validate() error + // GetType returns the task type these parameters are for + GetType() UnifiedTaskType + // ToMap converts parameters to map for database storage + ToMap() map[string]interface{} + // FromMap populates parameters from map (for database retrieval) + FromMap(params map[string]interface{}) error +} + +// UnifiedTask defines the base interface that all tasks must implement +// This provides a clean abstraction for task execution and management +type UnifiedTask interface { + // GetType returns the task type + GetType() UnifiedTaskType + + // GetParams returns the task parameters + GetParams() TaskParams + + // Execute performs the actual task logic + Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) error + + // GetName returns a human-readable task name for logging + GetName() string + + // Validate checks if the task is valid before execution + Validate() error +} + +// BaseTask provides common functionality for all task implementations +type BaseTask struct { + taskType UnifiedTaskType + params TaskParams + name string +} + +// NewBaseTask creates a new BaseTask instance +func NewBaseTask(taskType UnifiedTaskType, params TaskParams, name string) *BaseTask { + return &BaseTask{ + taskType: taskType, + params: params, + name: name, + } +} + +// GetType returns the task type +func (t *BaseTask) GetType() UnifiedTaskType { + return t.taskType +} + +// GetParams returns the task parameters +func (t *BaseTask) GetParams() TaskParams { + return t.params +} + +// GetName returns the task name +func (t *BaseTask) GetName() string { + return t.name +} + +// Validate checks if the task is valid +func (t *BaseTask) Validate() error { + if t.params == nil { + return fmt.Errorf("task parameters cannot be nil") + } + + if t.taskType != t.params.GetType() { + return fmt.Errorf("task type mismatch: expected %s, got %s", t.taskType, t.params.GetType()) + } + + return t.params.Validate() +} + +// IsTaskType checks if a task type string is valid +func IsTaskType(taskType string) bool { + switch UnifiedTaskType(taskType) { + case TaskTypeTopologyAnalysis, TaskTypePerformanceAnalysis, + TaskTypeEventAnalysis, TaskTypeBatchImport, TaskTypeTest: + return true + default: + return false + } +} + +// GetTaskTypes returns all registered task types +func GetTaskTypes() []UnifiedTaskType { + return []UnifiedTaskType{ + TaskTypeTopologyAnalysis, + TaskTypePerformanceAnalysis, + TaskTypeEventAnalysis, + TaskTypeBatchImport, + TaskTypeTest, + } +}