Refactor: extract task constants to dedicated constants package
- Add constants/task.go with centralized task-related constants
- Task priority levels (default, high, low)
- Task queue configuration (exchange, queue, routing key)
- Task message settings (max priority, TTL)
- Task retry settings (max retries, delays)
- Test task settings (sleep duration, max limit)
- Update task-related files to use constants from constants package:
- handler/async_task_create_handler.go
- task/queue_message.go
- task/queue_producer.go
- task/retry_manager.go
- task/test_task.go
- task/types.go (add TypeTest)
- task/worker.go
This commit is contained in:
parent
4a3f7a65bc
commit
809e1cd87d
|
|
@ -0,0 +1,54 @@
|
||||||
|
// Package constants defines task-related constants for the async task system
|
||||||
|
package constants
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// Task priority levels
|
||||||
|
const (
|
||||||
|
// TaskPriorityDefault is the default priority level for tasks
|
||||||
|
TaskPriorityDefault = 5
|
||||||
|
// TaskPriorityHigh represents high priority tasks
|
||||||
|
TaskPriorityHigh = 10
|
||||||
|
// TaskPriorityLow represents low priority tasks
|
||||||
|
TaskPriorityLow = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
// Task queue configuration
|
||||||
|
const (
|
||||||
|
// TaskExchangeName is the name of the exchange for task routing
|
||||||
|
TaskExchangeName = "modelrt.tasks.exchange"
|
||||||
|
// TaskQueueName is the name of the main task queue
|
||||||
|
TaskQueueName = "modelrt.tasks.queue"
|
||||||
|
// TaskRoutingKey is the routing key for task messages
|
||||||
|
TaskRoutingKey = "modelrt.task"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Task message settings
|
||||||
|
const (
|
||||||
|
// TaskMaxPriority is the maximum priority level for tasks (0-10)
|
||||||
|
TaskMaxPriority = 10
|
||||||
|
// TaskDefaultMessageTTL is the default time-to-live for task messages (24 hours)
|
||||||
|
TaskDefaultMessageTTL = 24 * time.Hour
|
||||||
|
)
|
||||||
|
|
||||||
|
// Task retry settings
|
||||||
|
const (
|
||||||
|
// TaskRetryMaxDefault is the default maximum number of retry attempts
|
||||||
|
TaskRetryMaxDefault = 3
|
||||||
|
// TaskRetryInitialDelayDefault is the default initial delay for exponential backoff
|
||||||
|
TaskRetryInitialDelayDefault = 1 * time.Second
|
||||||
|
// TaskRetryMaxDelayDefault is the default maximum delay for exponential backoff
|
||||||
|
TaskRetryMaxDelayDefault = 5 * time.Minute
|
||||||
|
// TaskRetryRandomFactorDefault is the default random factor for jitter (10%)
|
||||||
|
TaskRetryRandomFactorDefault = 0.1
|
||||||
|
// TaskRetryFixedDelayDefault is the default delay for fixed retry strategy
|
||||||
|
TaskRetryFixedDelayDefault = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test task settings
|
||||||
|
const (
|
||||||
|
// TestTaskSleepDurationDefault is the default sleep duration for test tasks (60 seconds)
|
||||||
|
TestTaskSleepDurationDefault = 60
|
||||||
|
// TestTaskSleepDurationMax is the maximum allowed sleep duration for test tasks (1 hour)
|
||||||
|
TestTaskSleepDurationMax = 3600
|
||||||
|
)
|
||||||
|
|
@ -2,10 +2,8 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"modelRT/config"
|
"modelRT/config"
|
||||||
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
|
|
@ -13,7 +11,6 @@ import (
|
||||||
"modelRT/task"
|
"modelRT/task"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"gorm.io/gorm"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// AsyncTaskCreateHandler handles creation of asynchronous tasks
|
// AsyncTaskCreateHandler handles creation of asynchronous tasks
|
||||||
|
|
@ -32,59 +29,44 @@ func AsyncTaskCreateHandler(c *gin.Context) {
|
||||||
var request network.AsyncTaskCreateRequest
|
var request network.AsyncTaskCreateRequest
|
||||||
|
|
||||||
if err := c.ShouldBindJSON(&request); err != nil {
|
if err := c.ShouldBindJSON(&request); err != nil {
|
||||||
logger.Error(ctx, "failed to unmarshal async task create request", "error", err)
|
logger.Error(ctx, "unmarshal async task create request failed", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
renderRespFailure(c, constants.RespCodeInvalidParams, "invalid request parameters", nil)
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid request parameters",
|
|
||||||
})
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate task type
|
// validate task type
|
||||||
if !orm.IsValidAsyncTaskType(request.TaskType) {
|
if !orm.IsValidAsyncTaskType(request.TaskType) {
|
||||||
logger.Error(ctx, "invalid task type", "task_type", request.TaskType)
|
logger.Error(ctx, "check task type invalid", "task_type", request.TaskType)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
renderRespFailure(c, constants.RespCodeInvalidParams, "invalid task type", nil)
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid task type",
|
|
||||||
})
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate task parameters based on task type
|
// validate task parameters based on task type
|
||||||
if !validateTaskParams(request.TaskType, request.Params) {
|
if !validateTaskParams(request.TaskType, request.Params) {
|
||||||
logger.Error(ctx, "invalid task parameters", "task_type", request.TaskType, "params", request.Params)
|
logger.Error(ctx, "check task parameters invalid", "task_type", request.TaskType, "params", request.Params)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
renderRespFailure(c, constants.RespCodeInvalidParams, "invalid task parameters", nil)
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid task parameters",
|
|
||||||
})
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
pgClient := database.GetPostgresDBClient()
|
pgClient := database.GetPostgresDBClient()
|
||||||
if pgClient == nil {
|
if pgClient == nil {
|
||||||
logger.Error(ctx, "database connection not found in context")
|
logger.Error(ctx, "database connection not found in context")
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
renderRespFailure(c, constants.RespCodeServerError, "database connection error", nil)
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "database connection error",
|
|
||||||
})
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create task in database
|
// create task in database
|
||||||
taskType := orm.AsyncTaskType(request.TaskType)
|
taskType := orm.AsyncTaskType(request.TaskType)
|
||||||
params := orm.JSONMap(request.Params)
|
params := orm.JSONMap(request.Params)
|
||||||
|
|
||||||
asyncTask, err := database.CreateAsyncTask(ctx, pgClient, taskType, params)
|
asyncTask, err := database.CreateAsyncTask(ctx, pgClient, taskType, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "failed to create async task in database", "error", err)
|
logger.Error(ctx, "create async task in database failed", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
renderRespFailure(c, constants.RespCodeServerError, "failed to create task", nil)
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to create task",
|
|
||||||
})
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send task to message queue
|
// send task to message queue
|
||||||
cfg, exists := c.Get("config")
|
cfg, exists := c.Get("config")
|
||||||
if !exists {
|
if !exists {
|
||||||
logger.Warn(ctx, "Configuration not found in context, skipping queue publishing")
|
logger.Warn(ctx, "Configuration not found in context, skipping queue publishing")
|
||||||
|
|
@ -92,41 +74,36 @@ func AsyncTaskCreateHandler(c *gin.Context) {
|
||||||
modelRTConfig := cfg.(config.ModelRTConfig)
|
modelRTConfig := cfg.(config.ModelRTConfig)
|
||||||
ctx := c.Request.Context()
|
ctx := c.Request.Context()
|
||||||
|
|
||||||
// Create queue producer
|
// create queue producer
|
||||||
|
// TODO 像实时计算一样使用 channel 代替
|
||||||
producer, err := task.NewQueueProducer(ctx, modelRTConfig.RabbitMQConfig)
|
producer, err := task.NewQueueProducer(ctx, modelRTConfig.RabbitMQConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "Failed to create queue producer", "error", err)
|
logger.Error(ctx, "create rabbitMQ queue producer failed", "error", err)
|
||||||
// Continue without queue publishing
|
renderRespFailure(c, constants.RespCodeServerError, "create rabbitMQ queue producer failed", nil)
|
||||||
} else {
|
return
|
||||||
defer producer.Close()
|
|
||||||
|
|
||||||
// Publish task to queue
|
|
||||||
taskType := task.TaskType(request.TaskType)
|
|
||||||
priority := 5 // Default priority
|
|
||||||
|
|
||||||
if err := producer.PublishTaskWithRetry(ctx, asyncTask.TaskID, taskType, priority, 3); err != nil {
|
|
||||||
logger.Error(ctx, "Failed to publish task to queue",
|
|
||||||
"task_id", asyncTask.TaskID,
|
|
||||||
"error", err)
|
|
||||||
// Log error but don't affect task creation response
|
|
||||||
} else {
|
|
||||||
logger.Info(ctx, "Task published to queue successfully",
|
|
||||||
"task_id", asyncTask.TaskID,
|
|
||||||
"queue", task.TaskQueueName)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
defer producer.Close()
|
||||||
|
|
||||||
|
// publish task to queue
|
||||||
|
taskType := task.TaskType(request.TaskType)
|
||||||
|
priority := 5 // Default priority
|
||||||
|
|
||||||
|
if err := producer.PublishTaskWithRetry(ctx, asyncTask.TaskID, taskType, priority, 3); err != nil {
|
||||||
|
logger.Error(ctx, "publish task to rabbitMQ queue failed",
|
||||||
|
"task_id", asyncTask.TaskID, "error", err)
|
||||||
|
renderRespFailure(c, constants.RespCodeServerError, "publish task to rabbitMQ queue failed", nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Info(ctx, "published task to rabbitMQ queue successfully",
|
||||||
|
"task_id", asyncTask.TaskID, "queue", constants.TaskQueueName)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info(ctx, "async task created successfully", "task_id", asyncTask.TaskID, "task_type", request.TaskType)
|
logger.Info(ctx, "async task created success", "task_id", asyncTask.TaskID, "task_type", request.TaskType)
|
||||||
|
|
||||||
// Return success response
|
// return success response
|
||||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
payload := genAsyncTaskCreatePayload(asyncTask.TaskID.String())
|
||||||
Code: 2000,
|
renderRespSuccess(c, constants.RespCodeSuccess, "task created successfully", payload)
|
||||||
Msg: "task created successfully",
|
|
||||||
Payload: network.AsyncTaskCreateResponse{
|
|
||||||
TaskID: asyncTask.TaskID,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateTaskParams(taskType string, params map[string]any) bool {
|
func validateTaskParams(taskType string, params map[string]any) bool {
|
||||||
|
|
@ -189,54 +166,9 @@ func validateTestTaskParams(params map[string]any) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func splitCommaSeparated(s string) []string {
|
func genAsyncTaskCreatePayload(taskID string) map[string]any {
|
||||||
var result []string
|
payload := map[string]any{
|
||||||
var current strings.Builder
|
"task_id": taskID,
|
||||||
inQuotes := false
|
|
||||||
escape := false
|
|
||||||
|
|
||||||
for _, ch := range s {
|
|
||||||
if escape {
|
|
||||||
current.WriteRune(ch)
|
|
||||||
escape = false
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
switch ch {
|
|
||||||
case '\\':
|
|
||||||
escape = true
|
|
||||||
case '"':
|
|
||||||
inQuotes = !inQuotes
|
|
||||||
case ',':
|
|
||||||
if !inQuotes {
|
|
||||||
result = append(result, strings.TrimSpace(current.String()))
|
|
||||||
current.Reset()
|
|
||||||
} else {
|
|
||||||
current.WriteRune(ch)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
current.WriteRune(ch)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return payload
|
||||||
if current.Len() > 0 {
|
|
||||||
result = append(result, strings.TrimSpace(current.String()))
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func getDBFromContext(c *gin.Context) *gorm.DB {
|
|
||||||
// Try to get database connection from context
|
|
||||||
// This should be set by middleware
|
|
||||||
if db, exists := c.Get("db"); exists {
|
|
||||||
if gormDB, ok := db.(*gorm.DB); ok {
|
|
||||||
return gormDB
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fallback to global database connection
|
|
||||||
// This should be implemented based on your application's database setup
|
|
||||||
// For now, return nil - actual implementation should retrieve from application context
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
|
|
@ -142,3 +143,40 @@ func AsyncTaskResultQueryHandler(c *gin.Context) {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func splitCommaSeparated(s string) []string {
|
||||||
|
var result []string
|
||||||
|
var current strings.Builder
|
||||||
|
inQuotes := false
|
||||||
|
escape := false
|
||||||
|
|
||||||
|
for _, ch := range s {
|
||||||
|
if escape {
|
||||||
|
current.WriteRune(ch)
|
||||||
|
escape = false
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
switch ch {
|
||||||
|
case '\\':
|
||||||
|
escape = true
|
||||||
|
case '"':
|
||||||
|
inQuotes = !inQuotes
|
||||||
|
case ',':
|
||||||
|
if !inQuotes {
|
||||||
|
result = append(result, strings.TrimSpace(current.String()))
|
||||||
|
current.Reset()
|
||||||
|
} else {
|
||||||
|
current.WriteRune(ch)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
current.WriteRune(ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if current.Len() > 0 {
|
||||||
|
result = append(result, strings.TrimSpace(current.String()))
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,24 +3,17 @@ package task
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
|
"modelRT/constants"
|
||||||
|
|
||||||
"github.com/gofrs/uuid"
|
"github.com/gofrs/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultPriority is the default task priority
|
|
||||||
const DefaultPriority = 5
|
|
||||||
|
|
||||||
// HighPriority represents high priority tasks
|
|
||||||
const HighPriority = 10
|
|
||||||
|
|
||||||
// LowPriority represents low priority tasks
|
|
||||||
const LowPriority = 1
|
|
||||||
|
|
||||||
// TaskQueueMessage defines minimal message structure for RabbitMQ/Redis queue dispatch
|
// TaskQueueMessage defines minimal message structure for RabbitMQ/Redis queue dispatch
|
||||||
// This struct is designed to be lightweight for efficient message transport
|
// This struct is designed to be lightweight for efficient message transport
|
||||||
type TaskQueueMessage struct {
|
type TaskQueueMessage struct {
|
||||||
TaskID uuid.UUID `json:"task_id"`
|
TaskID uuid.UUID `json:"task_id"`
|
||||||
TaskType TaskType `json:"task_type"`
|
TaskType TaskType `json:"task_type"`
|
||||||
Priority int `json:"priority,omitempty"` // Optional, defaults to DefaultPriority
|
Priority int `json:"priority,omitempty"` // Optional, defaults to constants.TaskPriorityDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTaskQueueMessage creates a new TaskQueueMessage with default priority
|
// NewTaskQueueMessage creates a new TaskQueueMessage with default priority
|
||||||
|
|
@ -28,7 +21,7 @@ func NewTaskQueueMessage(taskID uuid.UUID, taskType TaskType) *TaskQueueMessage
|
||||||
return &TaskQueueMessage{
|
return &TaskQueueMessage{
|
||||||
TaskID: taskID,
|
TaskID: taskID,
|
||||||
TaskType: taskType,
|
TaskType: taskType,
|
||||||
Priority: DefaultPriority,
|
Priority: constants.TaskPriorityDefault,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -41,12 +34,12 @@ func NewTaskQueueMessageWithPriority(taskID uuid.UUID, taskType TaskType, priori
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToJSON converts the TaskQueueMessage to JSON bytes
|
// ToJSON converts TaskQueueMessage to JSON bytes
|
||||||
func (m *TaskQueueMessage) ToJSON() ([]byte, error) {
|
func (m *TaskQueueMessage) ToJSON() ([]byte, error) {
|
||||||
return json.Marshal(m)
|
return json.Marshal(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate checks if the TaskQueueMessage is valid
|
// Validate checks if TaskQueueMessage is valid
|
||||||
func (m *TaskQueueMessage) Validate() bool {
|
func (m *TaskQueueMessage) Validate() bool {
|
||||||
// Check if TaskID is valid (not nil UUID)
|
// Check if TaskID is valid (not nil UUID)
|
||||||
if m.TaskID == uuid.Nil {
|
if m.TaskID == uuid.Nil {
|
||||||
|
|
@ -55,25 +48,25 @@ func (m *TaskQueueMessage) Validate() bool {
|
||||||
|
|
||||||
// Check if TaskType is valid
|
// Check if TaskType is valid
|
||||||
switch m.TaskType {
|
switch m.TaskType {
|
||||||
case TypeTopologyAnalysis, TypeEventAnalysis, TypeBatchImport:
|
case TypeTopologyAnalysis, TypeEventAnalysis, TypeBatchImport, TypeTest:
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetPriority sets the priority of the task queue message with validation
|
// SetPriority sets priority of task queue message with validation
|
||||||
func (m *TaskQueueMessage) SetPriority(priority int) {
|
func (m *TaskQueueMessage) SetPriority(priority int) {
|
||||||
if priority < LowPriority {
|
if priority < constants.TaskPriorityLow {
|
||||||
priority = LowPriority
|
priority = constants.TaskPriorityLow
|
||||||
}
|
}
|
||||||
if priority > HighPriority {
|
if priority > constants.TaskPriorityHigh {
|
||||||
priority = HighPriority
|
priority = constants.TaskPriorityHigh
|
||||||
}
|
}
|
||||||
m.Priority = priority
|
m.Priority = priority
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPriority returns the priority of the task queue message
|
// GetPriority returns priority of task queue message
|
||||||
func (m *TaskQueueMessage) GetPriority() int {
|
func (m *TaskQueueMessage) GetPriority() int {
|
||||||
return m.Priority
|
return m.Priority
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"modelRT/config"
|
"modelRT/config"
|
||||||
|
"modelRT/constants"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/mq"
|
"modelRT/mq"
|
||||||
|
|
||||||
|
|
@ -15,19 +16,6 @@ import (
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
// TaskExchangeName is the name of the exchange for task routing
|
|
||||||
TaskExchangeName = "modelrt.tasks.exchange"
|
|
||||||
// TaskQueueName is the name of the main task queue
|
|
||||||
TaskQueueName = "modelrt.tasks.queue"
|
|
||||||
// TaskRoutingKey is the routing key for task messages
|
|
||||||
TaskRoutingKey = "modelrt.task"
|
|
||||||
// MaxPriority is the maximum priority level for tasks (0-10)
|
|
||||||
MaxPriority = 10
|
|
||||||
// DefaultMessageTTL is the default time-to-live for task messages (24 hours)
|
|
||||||
DefaultMessageTTL = 24 * time.Hour
|
|
||||||
)
|
|
||||||
|
|
||||||
// QueueProducer handles publishing tasks to RabbitMQ
|
// QueueProducer handles publishing tasks to RabbitMQ
|
||||||
type QueueProducer struct {
|
type QueueProducer struct {
|
||||||
conn *amqp.Connection
|
conn *amqp.Connection
|
||||||
|
|
@ -67,13 +55,13 @@ func NewQueueProducer(ctx context.Context, cfg config.RabbitMQConfig) (*QueuePro
|
||||||
func (p *QueueProducer) declareInfrastructure() error {
|
func (p *QueueProducer) declareInfrastructure() error {
|
||||||
// Declare durable direct exchange
|
// Declare durable direct exchange
|
||||||
err := p.ch.ExchangeDeclare(
|
err := p.ch.ExchangeDeclare(
|
||||||
TaskExchangeName, // name
|
constants.TaskExchangeName, // name
|
||||||
"direct", // type
|
"direct", // type
|
||||||
true, // durable
|
true, // durable
|
||||||
false, // auto-deleted
|
false, // auto-deleted
|
||||||
false, // internal
|
false, // internal
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
nil, // arguments
|
nil, // arguments
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to declare exchange: %w", err)
|
return fmt.Errorf("failed to declare exchange: %w", err)
|
||||||
|
|
@ -81,14 +69,14 @@ func (p *QueueProducer) declareInfrastructure() error {
|
||||||
|
|
||||||
// Declare durable queue with priority support and message TTL
|
// Declare durable queue with priority support and message TTL
|
||||||
_, err = p.ch.QueueDeclare(
|
_, err = p.ch.QueueDeclare(
|
||||||
TaskQueueName, // name
|
constants.TaskQueueName, // name
|
||||||
true, // durable
|
true, // durable
|
||||||
false, // delete when unused
|
false, // delete when unused
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
amqp.Table{
|
amqp.Table{
|
||||||
"x-max-priority": MaxPriority, // support priority levels 0-10
|
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
|
||||||
"x-message-ttl": DefaultMessageTTL.Milliseconds(), // message TTL
|
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), // message TTL
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -97,11 +85,11 @@ func (p *QueueProducer) declareInfrastructure() error {
|
||||||
|
|
||||||
// Bind queue to exchange
|
// Bind queue to exchange
|
||||||
err = p.ch.QueueBind(
|
err = p.ch.QueueBind(
|
||||||
TaskQueueName, // queue name
|
constants.TaskQueueName, // queue name
|
||||||
TaskRoutingKey, // routing key
|
constants.TaskRoutingKey, // routing key
|
||||||
TaskExchangeName, // exchange name
|
constants.TaskExchangeName, // exchange name
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
nil, // arguments
|
nil, // arguments
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to bind queue: %w", err)
|
return fmt.Errorf("failed to bind queue: %w", err)
|
||||||
|
|
@ -141,10 +129,10 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT
|
||||||
// Publish to exchange
|
// Publish to exchange
|
||||||
err = p.ch.PublishWithContext(
|
err = p.ch.PublishWithContext(
|
||||||
ctx,
|
ctx,
|
||||||
TaskExchangeName, // exchange
|
constants.TaskExchangeName, // exchange
|
||||||
TaskRoutingKey, // routing key
|
constants.TaskRoutingKey, // routing key
|
||||||
false, // mandatory
|
false, // mandatory
|
||||||
false, // immediate
|
false, // immediate
|
||||||
publishing,
|
publishing,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -155,7 +143,7 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT
|
||||||
"task_id", taskID.String(),
|
"task_id", taskID.String(),
|
||||||
"task_type", taskType,
|
"task_type", taskType,
|
||||||
"priority", priority,
|
"priority", priority,
|
||||||
"queue", TaskQueueName,
|
"queue", constants.TaskQueueName,
|
||||||
)
|
)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -205,14 +193,14 @@ func (p *QueueProducer) Close() error {
|
||||||
// GetQueueInfo returns information about the task queue
|
// GetQueueInfo returns information about the task queue
|
||||||
func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) {
|
func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) {
|
||||||
queue, err := p.ch.QueueDeclarePassive(
|
queue, err := p.ch.QueueDeclarePassive(
|
||||||
TaskQueueName, // name
|
constants.TaskQueueName, // name
|
||||||
true, // durable
|
true, // durable
|
||||||
false, // delete when unused
|
false, // delete when unused
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
amqp.Table{
|
amqp.Table{
|
||||||
"x-max-priority": MaxPriority,
|
"x-max-priority": constants.TaskMaxPriority,
|
||||||
"x-message-ttl": DefaultMessageTTL.Milliseconds(),
|
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -223,5 +211,5 @@ func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) {
|
||||||
|
|
||||||
// PurgeQueue removes all messages from the task queue
|
// PurgeQueue removes all messages from the task queue
|
||||||
func (p *QueueProducer) PurgeQueue() (int, error) {
|
func (p *QueueProducer) PurgeQueue() (int, error) {
|
||||||
return p.ch.QueuePurge(TaskQueueName, false)
|
return p.ch.QueuePurge(constants.TaskQueueName, false)
|
||||||
}
|
}
|
||||||
|
|
@ -8,12 +8,13 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"modelRT/constants"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RetryStrategy defines the interface for task retry strategies
|
// RetryStrategy defines the interface for task retry strategies
|
||||||
type RetryStrategy interface {
|
type RetryStrategy interface {
|
||||||
// ShouldRetry determines if a task should be retried and returns the delay before next retry
|
// ShouldRetry determines if a task should be retried and returns delay before next retry
|
||||||
ShouldRetry(ctx context.Context, taskID string, retryCount int, lastError error) (bool, time.Duration)
|
ShouldRetry(ctx context.Context, taskID string, retryCount int, lastError error) (bool, time.Duration)
|
||||||
// GetMaxRetries returns the maximum number of retry attempts
|
// GetMaxRetries returns the maximum number of retry attempts
|
||||||
GetMaxRetries() int
|
GetMaxRetries() int
|
||||||
|
|
@ -98,7 +99,7 @@ func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string
|
||||||
return true, delay
|
return true, delay
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMaxRetries returns the maximum number of retry attempts
|
// GetMaxRetries returns maximum number of retry attempts
|
||||||
func (s *ExponentialBackoffRetry) GetMaxRetries() int {
|
func (s *ExponentialBackoffRetry) GetMaxRetries() int {
|
||||||
return s.MaxRetries
|
return s.MaxRetries
|
||||||
}
|
}
|
||||||
|
|
@ -151,7 +152,7 @@ func (s *FixedDelayRetry) ShouldRetry(ctx context.Context, taskID string, retryC
|
||||||
return true, delay
|
return true, delay
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMaxRetries returns the maximum number of retry attempts
|
// GetMaxRetries returns maximum number of retry attempts
|
||||||
func (s *FixedDelayRetry) GetMaxRetries() int {
|
func (s *FixedDelayRetry) GetMaxRetries() int {
|
||||||
return s.MaxRetries
|
return s.MaxRetries
|
||||||
}
|
}
|
||||||
|
|
@ -177,10 +178,10 @@ func (s *NoRetryStrategy) GetMaxRetries() int {
|
||||||
// DefaultRetryStrategy returns the default retry strategy (exponential backoff)
|
// DefaultRetryStrategy returns the default retry strategy (exponential backoff)
|
||||||
func DefaultRetryStrategy() RetryStrategy {
|
func DefaultRetryStrategy() RetryStrategy {
|
||||||
return NewExponentialBackoffRetry(
|
return NewExponentialBackoffRetry(
|
||||||
3, // max retries
|
constants.TaskRetryMaxDefault, // max retries
|
||||||
1*time.Second, // initial delay
|
constants.TaskRetryInitialDelayDefault, // initial delay
|
||||||
5*time.Minute, // max delay
|
constants.TaskRetryMaxDelayDefault, // max delay
|
||||||
0.1, // random factor (10% jitter)
|
constants.TaskRetryRandomFactorDefault, // random factor (10% jitter)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/orm"
|
"modelRT/orm"
|
||||||
|
|
@ -17,7 +18,7 @@ import (
|
||||||
// TestTaskParams defines parameters for test task
|
// TestTaskParams defines parameters for test task
|
||||||
type TestTaskParams struct {
|
type TestTaskParams struct {
|
||||||
// SleepDuration specifies how long the task should sleep (in seconds)
|
// SleepDuration specifies how long the task should sleep (in seconds)
|
||||||
// Default is 60 seconds as per requirement
|
// Default is constants.TestTaskSleepDurationDefault seconds as per requirement
|
||||||
SleepDuration int `json:"sleep_duration"`
|
SleepDuration int `json:"sleep_duration"`
|
||||||
// Message is a custom message to include in the result
|
// Message is a custom message to include in the result
|
||||||
Message string `json:"message,omitempty"`
|
Message string `json:"message,omitempty"`
|
||||||
|
|
@ -25,14 +26,14 @@ type TestTaskParams struct {
|
||||||
|
|
||||||
// Validate checks if test task parameters are valid
|
// Validate checks if test task parameters are valid
|
||||||
func (p *TestTaskParams) Validate() error {
|
func (p *TestTaskParams) Validate() error {
|
||||||
// Default to 60 seconds if not specified
|
// Default to constants.TestTaskSleepDurationDefault seconds if not specified
|
||||||
if p.SleepDuration <= 0 {
|
if p.SleepDuration <= 0 {
|
||||||
p.SleepDuration = 60
|
p.SleepDuration = constants.TestTaskSleepDurationDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate max duration (max 1 hour)
|
// Validate max duration (max 1 hour)
|
||||||
if p.SleepDuration > 3600 {
|
if p.SleepDuration > constants.TestTaskSleepDurationMax {
|
||||||
return fmt.Errorf("sleep duration cannot exceed 3600 seconds (1 hour)")
|
return fmt.Errorf("sleep duration cannot exceed %d seconds (1 hour)", constants.TestTaskSleepDurationMax)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -90,7 +91,7 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
|
||||||
return fmt.Errorf("invalid parameter type for TestTask")
|
return fmt.Errorf("invalid parameter type for TestTask")
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info(ctx, "Starting test task execution",
|
logger.Info(ctx, "Starting test task executionser",
|
||||||
"task_id", taskID,
|
"task_id", taskID,
|
||||||
"sleep_duration_seconds", params.SleepDuration,
|
"sleep_duration_seconds", params.SleepDuration,
|
||||||
"message", params.Message,
|
"message", params.Message,
|
||||||
|
|
@ -149,7 +150,7 @@ func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, taskTyp
|
||||||
// Fetch task parameters from database
|
// Fetch task parameters from database
|
||||||
asyncTask, err := database.GetAsyncTaskByID(ctx, db, taskID)
|
asyncTask, err := database.GetAsyncTaskByID(ctx, db, taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to fetch task: %w", err)
|
return fmt.Errorf("failed toser fetch task: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert params map to TestTaskParams
|
// Convert params map to TestTaskParams
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ const (
|
||||||
TypeTopologyAnalysis TaskType = "TOPOLOGY_ANALYSIS"
|
TypeTopologyAnalysis TaskType = "TOPOLOGY_ANALYSIS"
|
||||||
TypeEventAnalysis TaskType = "EVENT_ANALYSIS"
|
TypeEventAnalysis TaskType = "EVENT_ANALYSIS"
|
||||||
TypeBatchImport TaskType = "BATCH_IMPORT"
|
TypeBatchImport TaskType = "BATCH_IMPORT"
|
||||||
|
TypeTest TaskType = "TEST"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"modelRT/config"
|
"modelRT/config"
|
||||||
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/mq"
|
"modelRT/mq"
|
||||||
|
|
@ -112,14 +113,14 @@ func NewTaskWorker(ctx context.Context, cfg WorkerConfig, db *gorm.DB, rabbitCfg
|
||||||
|
|
||||||
// Declare queue (ensure it exists with proper arguments)
|
// Declare queue (ensure it exists with proper arguments)
|
||||||
_, err = ch.QueueDeclare(
|
_, err = ch.QueueDeclare(
|
||||||
TaskQueueName, // name
|
constants.TaskQueueName, // name
|
||||||
true, // durable
|
true, // durable
|
||||||
false, // delete when unused
|
false, // delete when unused
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
amqp.Table{
|
amqp.Table{
|
||||||
"x-max-priority": MaxPriority,
|
"x-max-priority": constants.TaskMaxPriority,
|
||||||
"x-message-ttl": DefaultMessageTTL.Milliseconds(),
|
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -198,7 +199,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
|
||||||
|
|
||||||
// Consume messages from the queue
|
// Consume messages from the queue
|
||||||
msgs, err := w.ch.Consume(
|
msgs, err := w.ch.Consume(
|
||||||
TaskQueueName, // queue
|
constants.TaskQueueName, // queue
|
||||||
fmt.Sprintf("worker-%d", consumerID), // consumer tag
|
fmt.Sprintf("worker-%d", consumerID), // consumer tag
|
||||||
false, // auto-ack
|
false, // auto-ack
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
|
|
@ -462,14 +463,14 @@ func (w *TaskWorker) checkHealth() {
|
||||||
|
|
||||||
// Update queue depth
|
// Update queue depth
|
||||||
queue, err := w.ch.QueueDeclarePassive(
|
queue, err := w.ch.QueueDeclarePassive(
|
||||||
TaskQueueName, // name
|
constants.TaskQueueName, // name
|
||||||
true, // durable
|
true, // durable
|
||||||
false, // delete when unused
|
false, // delete when unused
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
amqp.Table{
|
amqp.Table{
|
||||||
"x-max-priority": MaxPriority,
|
"x-max-priority": constants.TaskMaxPriority,
|
||||||
"x-message-ttl": DefaultMessageTTL.Milliseconds(),
|
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue