optimize code of async task system
This commit is contained in:
parent
a94abdb479
commit
6e0d2186d8
|
|
@ -0,0 +1,115 @@
|
|||
// Package orm define database data struct
|
||||
package orm
|
||||
|
||||
import (
|
||||
"github.com/gofrs/uuid"
|
||||
)
|
||||
|
||||
// AsyncTaskType defines the type of asynchronous task
|
||||
type AsyncTaskType string
|
||||
|
||||
const (
|
||||
// AsyncTaskTypeTopologyAnalysis represents topology analysis task
|
||||
AsyncTaskTypeTopologyAnalysis AsyncTaskType = "TOPOLOGY_ANALYSIS"
|
||||
// AsyncTaskTypePerformanceAnalysis represents performance analysis task
|
||||
AsyncTaskTypePerformanceAnalysis AsyncTaskType = "PERFORMANCE_ANALYSIS"
|
||||
// AsyncTaskTypeEventAnalysis represents event analysis task
|
||||
AsyncTaskTypeEventAnalysis AsyncTaskType = "EVENT_ANALYSIS"
|
||||
// AsyncTaskTypeBatchImport represents batch import task
|
||||
AsyncTaskTypeBatchImport AsyncTaskType = "BATCH_IMPORT"
|
||||
)
|
||||
|
||||
// AsyncTaskStatus defines the status of asynchronous task
|
||||
type AsyncTaskStatus string
|
||||
|
||||
const (
|
||||
// AsyncTaskStatusSubmitted represents task has been submitted to queue
|
||||
AsyncTaskStatusSubmitted AsyncTaskStatus = "SUBMITTED"
|
||||
// AsyncTaskStatusRunning represents task is currently executing
|
||||
AsyncTaskStatusRunning AsyncTaskStatus = "RUNNING"
|
||||
// AsyncTaskStatusCompleted represents task completed successfully
|
||||
AsyncTaskStatusCompleted AsyncTaskStatus = "COMPLETED"
|
||||
// AsyncTaskStatusFailed represents task failed with error
|
||||
AsyncTaskStatusFailed AsyncTaskStatus = "FAILED"
|
||||
)
|
||||
|
||||
// AsyncTask defines the core task entity stored in database for task lifecycle tracking
|
||||
type AsyncTask struct {
|
||||
TaskID uuid.UUID `gorm:"column:task_id;primaryKey;type:uuid;default:gen_random_uuid()"`
|
||||
TaskType AsyncTaskType `gorm:"column:task_type;type:varchar(50);not null"`
|
||||
Status AsyncTaskStatus `gorm:"column:status;type:varchar(20);not null;index"`
|
||||
Params JSONMap `gorm:"column:params;type:jsonb"`
|
||||
CreatedAt int64 `gorm:"column:created_at;not null;index"`
|
||||
FinishedAt *int64 `gorm:"column:finished_at;index"`
|
||||
Progress *int `gorm:"column:progress"` // 0-100, nullable
|
||||
}
|
||||
|
||||
// TableName returns the table name for AsyncTask model
|
||||
func (a *AsyncTask) TableName() string {
|
||||
return "async_task"
|
||||
}
|
||||
|
||||
// SetSubmitted marks the task as submitted
|
||||
func (a *AsyncTask) SetSubmitted() {
|
||||
a.Status = AsyncTaskStatusSubmitted
|
||||
}
|
||||
|
||||
// SetRunning marks the task as running
|
||||
func (a *AsyncTask) SetRunning() {
|
||||
a.Status = AsyncTaskStatusRunning
|
||||
}
|
||||
|
||||
// SetCompleted marks the task as completed with finished timestamp
|
||||
func (a *AsyncTask) SetCompleted(timestamp int64) {
|
||||
a.Status = AsyncTaskStatusCompleted
|
||||
a.FinishedAt = ×tamp
|
||||
a.setProgress(100)
|
||||
}
|
||||
|
||||
// SetFailed marks the task as failed with finished timestamp
|
||||
func (a *AsyncTask) SetFailed(timestamp int64) {
|
||||
a.Status = AsyncTaskStatusFailed
|
||||
a.FinishedAt = ×tamp
|
||||
}
|
||||
|
||||
// setProgress updates the task progress (0-100)
|
||||
func (a *AsyncTask) setProgress(value int) {
|
||||
if value < 0 {
|
||||
value = 0
|
||||
}
|
||||
if value > 100 {
|
||||
value = 100
|
||||
}
|
||||
a.Progress = &value
|
||||
}
|
||||
|
||||
// UpdateProgress updates the task progress with validation
|
||||
func (a *AsyncTask) UpdateProgress(value int) {
|
||||
a.setProgress(value)
|
||||
}
|
||||
|
||||
// IsCompleted checks if the task is completed
|
||||
func (a *AsyncTask) IsCompleted() bool {
|
||||
return a.Status == AsyncTaskStatusCompleted
|
||||
}
|
||||
|
||||
// IsRunning checks if the task is running
|
||||
func (a *AsyncTask) IsRunning() bool {
|
||||
return a.Status == AsyncTaskStatusRunning
|
||||
}
|
||||
|
||||
// IsFailed checks if the task failed
|
||||
func (a *AsyncTask) IsFailed() bool {
|
||||
return a.Status == AsyncTaskStatusFailed
|
||||
}
|
||||
|
||||
// IsValidTaskType checks if the task type is valid
|
||||
func IsValidAsyncTaskType(taskType string) bool {
|
||||
switch AsyncTaskType(taskType) {
|
||||
case AsyncTaskTypeTopologyAnalysis, AsyncTaskTypePerformanceAnalysis,
|
||||
AsyncTaskTypeEventAnalysis, AsyncTaskTypeBatchImport:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,70 @@
|
|||
// Package orm define database data struct
|
||||
package orm
|
||||
|
||||
import (
|
||||
"github.com/gofrs/uuid"
|
||||
)
|
||||
|
||||
// AsyncTaskResult stores computation results, separate from AsyncTask model for flexibility
|
||||
type AsyncTaskResult struct {
|
||||
TaskID uuid.UUID `gorm:"column:task_id;primaryKey;type:uuid"`
|
||||
Result JSONMap `gorm:"column:result;type:jsonb"`
|
||||
ErrorCode *int `gorm:"column:error_code"`
|
||||
ErrorMessage *string `gorm:"column:error_message;type:text"`
|
||||
ErrorDetail JSONMap `gorm:"column:error_detail;type:jsonb"`
|
||||
}
|
||||
|
||||
// TableName returns the table name for AsyncTaskResult model
|
||||
func (a *AsyncTaskResult) TableName() string {
|
||||
return "async_task_result"
|
||||
}
|
||||
|
||||
// SetSuccess sets the result for successful task execution
|
||||
func (a *AsyncTaskResult) SetSuccess(result JSONMap) {
|
||||
a.Result = result
|
||||
a.ErrorCode = nil
|
||||
a.ErrorMessage = nil
|
||||
a.ErrorDetail = nil
|
||||
}
|
||||
|
||||
// SetError sets the error information for failed task execution
|
||||
func (a *AsyncTaskResult) SetError(code int, message string, detail JSONMap) {
|
||||
a.Result = nil
|
||||
a.ErrorCode = &code
|
||||
a.ErrorMessage = &message
|
||||
a.ErrorDetail = detail
|
||||
}
|
||||
|
||||
// HasError checks if the task result contains an error
|
||||
func (a *AsyncTaskResult) HasError() bool {
|
||||
return a.ErrorCode != nil || a.ErrorMessage != nil
|
||||
}
|
||||
|
||||
// GetErrorCode returns the error code or 0 if no error
|
||||
func (a *AsyncTaskResult) GetErrorCode() int {
|
||||
if a.ErrorCode == nil {
|
||||
return 0
|
||||
}
|
||||
return *a.ErrorCode
|
||||
}
|
||||
|
||||
// GetErrorMessage returns the error message or empty string if no error
|
||||
func (a *AsyncTaskResult) GetErrorMessage() string {
|
||||
if a.ErrorMessage == nil {
|
||||
return ""
|
||||
}
|
||||
return *a.ErrorMessage
|
||||
}
|
||||
|
||||
// IsSuccess checks if the task execution was successful
|
||||
func (a *AsyncTaskResult) IsSuccess() bool {
|
||||
return !a.HasError()
|
||||
}
|
||||
|
||||
// Clear clears all result data
|
||||
func (a *AsyncTaskResult) Clear() {
|
||||
a.Result = nil
|
||||
a.ErrorCode = nil
|
||||
a.ErrorMessage = nil
|
||||
a.ErrorDetail = nil
|
||||
}
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
package task
|
||||
|
||||
import (
|
||||
"github.com/gofrs/uuid"
|
||||
)
|
||||
|
||||
// DefaultPriority is the default task priority
|
||||
const DefaultPriority = 5
|
||||
|
||||
// HighPriority represents high priority tasks
|
||||
const HighPriority = 10
|
||||
|
||||
// LowPriority represents low priority tasks
|
||||
const LowPriority = 1
|
||||
|
||||
// TaskQueueMessage defines minimal message structure for RabbitMQ/Redis queue dispatch
|
||||
// This struct is designed to be lightweight for efficient message transport
|
||||
type TaskQueueMessage struct {
|
||||
TaskID uuid.UUID `json:"task_id"`
|
||||
TaskType TaskType `json:"task_type"`
|
||||
Priority int `json:"priority,omitempty"` // Optional, defaults to DefaultPriority
|
||||
}
|
||||
|
||||
// NewTaskQueueMessage creates a new TaskQueueMessage with default priority
|
||||
func NewTaskQueueMessage(taskID uuid.UUID, taskType TaskType) *TaskQueueMessage {
|
||||
return &TaskQueueMessage{
|
||||
TaskID: taskID,
|
||||
TaskType: taskType,
|
||||
Priority: DefaultPriority,
|
||||
}
|
||||
}
|
||||
|
||||
// NewTaskQueueMessageWithPriority creates a new TaskQueueMessage with specified priority
|
||||
func NewTaskQueueMessageWithPriority(taskID uuid.UUID, taskType TaskType, priority int) *TaskQueueMessage {
|
||||
return &TaskQueueMessage{
|
||||
TaskID: taskID,
|
||||
TaskType: taskType,
|
||||
Priority: priority,
|
||||
}
|
||||
}
|
||||
|
||||
// ToJSON converts the TaskQueueMessage to JSON bytes
|
||||
func (m *TaskQueueMessage) ToJSON() ([]byte, error) {
|
||||
return []byte{}, nil // Placeholder - actual implementation would use json.Marshal
|
||||
}
|
||||
|
||||
// Validate checks if the TaskQueueMessage is valid
|
||||
func (m *TaskQueueMessage) Validate() bool {
|
||||
// Check if TaskID is valid (not nil UUID)
|
||||
if m.TaskID == uuid.Nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if TaskType is valid
|
||||
switch m.TaskType {
|
||||
case TypeTopologyAnalysis, TypeEventAnalysis, TypeBatchImport:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// SetPriority sets the priority of the task queue message with validation
|
||||
func (m *TaskQueueMessage) SetPriority(priority int) {
|
||||
if priority < LowPriority {
|
||||
priority = LowPriority
|
||||
}
|
||||
if priority > HighPriority {
|
||||
priority = HighPriority
|
||||
}
|
||||
m.Priority = priority
|
||||
}
|
||||
|
||||
// GetPriority returns the priority of the task queue message
|
||||
func (m *TaskQueueMessage) GetPriority() int {
|
||||
return m.Priority
|
||||
}
|
||||
Loading…
Reference in New Issue