From 6e0d2186d81c10105314a8a9f57a254834526c8e Mon Sep 17 00:00:00 2001 From: douxu Date: Thu, 12 Mar 2026 16:37:06 +0800 Subject: [PATCH] optimize code of async task system --- orm/async_task.go | 115 +++++++++++++++++++++++++++++++++++++++ orm/async_task_result.go | 70 ++++++++++++++++++++++++ task/queue_message.go | 77 ++++++++++++++++++++++++++ 3 files changed, 262 insertions(+) create mode 100644 orm/async_task.go create mode 100644 orm/async_task_result.go create mode 100644 task/queue_message.go diff --git a/orm/async_task.go b/orm/async_task.go new file mode 100644 index 0000000..52a03c6 --- /dev/null +++ b/orm/async_task.go @@ -0,0 +1,115 @@ +// Package orm define database data struct +package orm + +import ( + "github.com/gofrs/uuid" +) + +// AsyncTaskType defines the type of asynchronous task +type AsyncTaskType string + +const ( + // AsyncTaskTypeTopologyAnalysis represents topology analysis task + AsyncTaskTypeTopologyAnalysis AsyncTaskType = "TOPOLOGY_ANALYSIS" + // AsyncTaskTypePerformanceAnalysis represents performance analysis task + AsyncTaskTypePerformanceAnalysis AsyncTaskType = "PERFORMANCE_ANALYSIS" + // AsyncTaskTypeEventAnalysis represents event analysis task + AsyncTaskTypeEventAnalysis AsyncTaskType = "EVENT_ANALYSIS" + // AsyncTaskTypeBatchImport represents batch import task + AsyncTaskTypeBatchImport AsyncTaskType = "BATCH_IMPORT" +) + +// AsyncTaskStatus defines the status of asynchronous task +type AsyncTaskStatus string + +const ( + // AsyncTaskStatusSubmitted represents task has been submitted to queue + AsyncTaskStatusSubmitted AsyncTaskStatus = "SUBMITTED" + // AsyncTaskStatusRunning represents task is currently executing + AsyncTaskStatusRunning AsyncTaskStatus = "RUNNING" + // AsyncTaskStatusCompleted represents task completed successfully + AsyncTaskStatusCompleted AsyncTaskStatus = "COMPLETED" + // AsyncTaskStatusFailed represents task failed with error + AsyncTaskStatusFailed AsyncTaskStatus = "FAILED" +) + +// AsyncTask defines the core task entity stored in database for task lifecycle tracking +type AsyncTask struct { + TaskID uuid.UUID `gorm:"column:task_id;primaryKey;type:uuid;default:gen_random_uuid()"` + TaskType AsyncTaskType `gorm:"column:task_type;type:varchar(50);not null"` + Status AsyncTaskStatus `gorm:"column:status;type:varchar(20);not null;index"` + Params JSONMap `gorm:"column:params;type:jsonb"` + CreatedAt int64 `gorm:"column:created_at;not null;index"` + FinishedAt *int64 `gorm:"column:finished_at;index"` + Progress *int `gorm:"column:progress"` // 0-100, nullable +} + +// TableName returns the table name for AsyncTask model +func (a *AsyncTask) TableName() string { + return "async_task" +} + +// SetSubmitted marks the task as submitted +func (a *AsyncTask) SetSubmitted() { + a.Status = AsyncTaskStatusSubmitted +} + +// SetRunning marks the task as running +func (a *AsyncTask) SetRunning() { + a.Status = AsyncTaskStatusRunning +} + +// SetCompleted marks the task as completed with finished timestamp +func (a *AsyncTask) SetCompleted(timestamp int64) { + a.Status = AsyncTaskStatusCompleted + a.FinishedAt = ×tamp + a.setProgress(100) +} + +// SetFailed marks the task as failed with finished timestamp +func (a *AsyncTask) SetFailed(timestamp int64) { + a.Status = AsyncTaskStatusFailed + a.FinishedAt = ×tamp +} + +// setProgress updates the task progress (0-100) +func (a *AsyncTask) setProgress(value int) { + if value < 0 { + value = 0 + } + if value > 100 { + value = 100 + } + a.Progress = &value +} + +// UpdateProgress updates the task progress with validation +func (a *AsyncTask) UpdateProgress(value int) { + a.setProgress(value) +} + +// IsCompleted checks if the task is completed +func (a *AsyncTask) IsCompleted() bool { + return a.Status == AsyncTaskStatusCompleted +} + +// IsRunning checks if the task is running +func (a *AsyncTask) IsRunning() bool { + return a.Status == AsyncTaskStatusRunning +} + +// IsFailed checks if the task failed +func (a *AsyncTask) IsFailed() bool { + return a.Status == AsyncTaskStatusFailed +} + +// IsValidTaskType checks if the task type is valid +func IsValidAsyncTaskType(taskType string) bool { + switch AsyncTaskType(taskType) { + case AsyncTaskTypeTopologyAnalysis, AsyncTaskTypePerformanceAnalysis, + AsyncTaskTypeEventAnalysis, AsyncTaskTypeBatchImport: + return true + default: + return false + } +} diff --git a/orm/async_task_result.go b/orm/async_task_result.go new file mode 100644 index 0000000..a3c6213 --- /dev/null +++ b/orm/async_task_result.go @@ -0,0 +1,70 @@ +// Package orm define database data struct +package orm + +import ( + "github.com/gofrs/uuid" +) + +// AsyncTaskResult stores computation results, separate from AsyncTask model for flexibility +type AsyncTaskResult struct { + TaskID uuid.UUID `gorm:"column:task_id;primaryKey;type:uuid"` + Result JSONMap `gorm:"column:result;type:jsonb"` + ErrorCode *int `gorm:"column:error_code"` + ErrorMessage *string `gorm:"column:error_message;type:text"` + ErrorDetail JSONMap `gorm:"column:error_detail;type:jsonb"` +} + +// TableName returns the table name for AsyncTaskResult model +func (a *AsyncTaskResult) TableName() string { + return "async_task_result" +} + +// SetSuccess sets the result for successful task execution +func (a *AsyncTaskResult) SetSuccess(result JSONMap) { + a.Result = result + a.ErrorCode = nil + a.ErrorMessage = nil + a.ErrorDetail = nil +} + +// SetError sets the error information for failed task execution +func (a *AsyncTaskResult) SetError(code int, message string, detail JSONMap) { + a.Result = nil + a.ErrorCode = &code + a.ErrorMessage = &message + a.ErrorDetail = detail +} + +// HasError checks if the task result contains an error +func (a *AsyncTaskResult) HasError() bool { + return a.ErrorCode != nil || a.ErrorMessage != nil +} + +// GetErrorCode returns the error code or 0 if no error +func (a *AsyncTaskResult) GetErrorCode() int { + if a.ErrorCode == nil { + return 0 + } + return *a.ErrorCode +} + +// GetErrorMessage returns the error message or empty string if no error +func (a *AsyncTaskResult) GetErrorMessage() string { + if a.ErrorMessage == nil { + return "" + } + return *a.ErrorMessage +} + +// IsSuccess checks if the task execution was successful +func (a *AsyncTaskResult) IsSuccess() bool { + return !a.HasError() +} + +// Clear clears all result data +func (a *AsyncTaskResult) Clear() { + a.Result = nil + a.ErrorCode = nil + a.ErrorMessage = nil + a.ErrorDetail = nil +} diff --git a/task/queue_message.go b/task/queue_message.go new file mode 100644 index 0000000..0c6512f --- /dev/null +++ b/task/queue_message.go @@ -0,0 +1,77 @@ +package task + +import ( + "github.com/gofrs/uuid" +) + +// DefaultPriority is the default task priority +const DefaultPriority = 5 + +// HighPriority represents high priority tasks +const HighPriority = 10 + +// LowPriority represents low priority tasks +const LowPriority = 1 + +// TaskQueueMessage defines minimal message structure for RabbitMQ/Redis queue dispatch +// This struct is designed to be lightweight for efficient message transport +type TaskQueueMessage struct { + TaskID uuid.UUID `json:"task_id"` + TaskType TaskType `json:"task_type"` + Priority int `json:"priority,omitempty"` // Optional, defaults to DefaultPriority +} + +// NewTaskQueueMessage creates a new TaskQueueMessage with default priority +func NewTaskQueueMessage(taskID uuid.UUID, taskType TaskType) *TaskQueueMessage { + return &TaskQueueMessage{ + TaskID: taskID, + TaskType: taskType, + Priority: DefaultPriority, + } +} + +// NewTaskQueueMessageWithPriority creates a new TaskQueueMessage with specified priority +func NewTaskQueueMessageWithPriority(taskID uuid.UUID, taskType TaskType, priority int) *TaskQueueMessage { + return &TaskQueueMessage{ + TaskID: taskID, + TaskType: taskType, + Priority: priority, + } +} + +// ToJSON converts the TaskQueueMessage to JSON bytes +func (m *TaskQueueMessage) ToJSON() ([]byte, error) { + return []byte{}, nil // Placeholder - actual implementation would use json.Marshal +} + +// Validate checks if the TaskQueueMessage is valid +func (m *TaskQueueMessage) Validate() bool { + // Check if TaskID is valid (not nil UUID) + if m.TaskID == uuid.Nil { + return false + } + + // Check if TaskType is valid + switch m.TaskType { + case TypeTopologyAnalysis, TypeEventAnalysis, TypeBatchImport: + return true + default: + return false + } +} + +// SetPriority sets the priority of the task queue message with validation +func (m *TaskQueueMessage) SetPriority(priority int) { + if priority < LowPriority { + priority = LowPriority + } + if priority > HighPriority { + priority = HighPriority + } + m.Priority = priority +} + +// GetPriority returns the priority of the task queue message +func (m *TaskQueueMessage) GetPriority() int { + return m.Priority +}