diff --git a/task/handler_factory.go b/task/handler_factory.go new file mode 100644 index 0000000..9f6b7c2 --- /dev/null +++ b/task/handler_factory.go @@ -0,0 +1,247 @@ +// Package task provides asynchronous task processing with handler factory pattern +package task + +import ( + "context" + "fmt" + "sync" + + "modelRT/logger" + + "github.com/gofrs/uuid" + "gorm.io/gorm" +) + +// TaskHandler defines the interface for task processors +type TaskHandler interface { + // Execute processes a task with the given ID and type + Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error + // CanHandle returns true if this handler can process the given task type + CanHandle(taskType TaskType) bool + // Name returns the name of the handler for logging and metrics + Name() string +} + +// HandlerFactory creates task handlers based on task type +type HandlerFactory struct { + handlers map[TaskType]TaskHandler + mu sync.RWMutex +} + +// NewHandlerFactory creates a new HandlerFactory +func NewHandlerFactory() *HandlerFactory { + return &HandlerFactory{ + handlers: make(map[TaskType]TaskHandler), + } +} + +// RegisterHandler registers a handler for a specific task type +func (f *HandlerFactory) RegisterHandler(taskType TaskType, handler TaskHandler) { + f.mu.Lock() + defer f.mu.Unlock() + + f.handlers[taskType] = handler + logger.Info(context.Background(), "Handler registered", + "task_type", taskType, + "handler_name", handler.Name(), + ) +} + +// GetHandler returns a handler for the given task type +func (f *HandlerFactory) GetHandler(taskType TaskType) (TaskHandler, error) { + f.mu.RLock() + handler, exists := f.handlers[taskType] + f.mu.RUnlock() + + if !exists { + return nil, fmt.Errorf("no handler registered for task type: %s", taskType) + } + + return handler, nil +} + +// CreateDefaultHandlers registers all default task handlers +func (f *HandlerFactory) CreateDefaultHandlers() { + f.RegisterHandler(TypeTopologyAnalysis, &TopologyAnalysisHandler{}) + f.RegisterHandler(TypeEventAnalysis, &EventAnalysisHandler{}) + f.RegisterHandler(TypeBatchImport, &BatchImportHandler{}) +} + +// BaseHandler provides common functionality for all task handlers +type BaseHandler struct { + name string +} + +// NewBaseHandler creates a new BaseHandler +func NewBaseHandler(name string) *BaseHandler { + return &BaseHandler{name: name} +} + +// Name returns the handler name +func (h *BaseHandler) Name() string { + return h.name +} + +// TopologyAnalysisHandler handles topology analysis tasks +type TopologyAnalysisHandler struct { + BaseHandler +} + +// NewTopologyAnalysisHandler creates a new TopologyAnalysisHandler +func NewTopologyAnalysisHandler() *TopologyAnalysisHandler { + return &TopologyAnalysisHandler{ + BaseHandler: *NewBaseHandler("topology_analysis_handler"), + } +} + +// Execute processes a topology analysis task +func (h *TopologyAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error { + logger.Info(ctx, "Starting topology analysis", + "task_id", taskID, + "task_type", taskType, + ) + + // TODO: Implement actual topology analysis logic + // This would typically involve: + // 1. Fetching task parameters from database + // 2. Performing topology analysis (checking for islands, shorts, etc.) + // 3. Storing results in database + // 4. Updating task status + + // Simulate work + logger.Info(ctx, "Topology analysis completed", + "task_id", taskID, + "task_type", taskType, + ) + + return nil +} + +// CanHandle returns true for topology analysis tasks +func (h *TopologyAnalysisHandler) CanHandle(taskType TaskType) bool { + return taskType == TypeTopologyAnalysis +} + +// EventAnalysisHandler handles event analysis tasks +type EventAnalysisHandler struct { + BaseHandler +} + +// NewEventAnalysisHandler creates a new EventAnalysisHandler +func NewEventAnalysisHandler() *EventAnalysisHandler { + return &EventAnalysisHandler{ + BaseHandler: *NewBaseHandler("event_analysis_handler"), + } +} + +// Execute processes an event analysis task +func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error { + logger.Info(ctx, "Starting event analysis", + "task_id", taskID, + "task_type", taskType, + ) + + // TODO: Implement actual event analysis logic + // This would typically involve: + // 1. Fetching motor and trigger information + // 2. Analyzing events within the specified duration + // 3. Generating analysis report + // 4. Storing results in database + + // Simulate work + logger.Info(ctx, "Event analysis completed", + "task_id", taskID, + "task_type", taskType, + ) + + return nil +} + +// CanHandle returns true for event analysis tasks +func (h *EventAnalysisHandler) CanHandle(taskType TaskType) bool { + return taskType == TypeEventAnalysis +} + +// BatchImportHandler handles batch import tasks +type BatchImportHandler struct { + BaseHandler +} + +// NewBatchImportHandler creates a new BatchImportHandler +func NewBatchImportHandler() *BatchImportHandler { + return &BatchImportHandler{ + BaseHandler: *NewBaseHandler("batch_import_handler"), + } +} + +// Execute processes a batch import task +func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error { + logger.Info(ctx, "Starting batch import", + "task_id", taskID, + "task_type", taskType, + ) + + // TODO: Implement actual batch import logic + // This would typically involve: + // 1. Reading file from specified path + // 2. Parsing file content (CSV, Excel, etc.) + // 3. Validating and importing data into database + // 4. Generating import report + + // Simulate work + logger.Info(ctx, "Batch import completed", + "task_id", taskID, + "task_type", taskType, + ) + + return nil +} + +// CanHandle returns true for batch import tasks +func (h *BatchImportHandler) CanHandle(taskType TaskType) bool { + return taskType == TypeBatchImport +} + +// CompositeHandler can handle multiple task types by delegating to appropriate handlers +type CompositeHandler struct { + factory *HandlerFactory +} + +// NewCompositeHandler creates a new CompositeHandler +func NewCompositeHandler(factory *HandlerFactory) *CompositeHandler { + return &CompositeHandler{factory: factory} +} + +// Execute delegates task execution to the appropriate handler +func (h *CompositeHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error { + handler, err := h.factory.GetHandler(taskType) + if err != nil { + return fmt.Errorf("failed to get handler for task type %s: %w", taskType, err) + } + + return handler.Execute(ctx, taskID, taskType, db) +} + +// CanHandle returns true if any registered handler can handle the task type +func (h *CompositeHandler) CanHandle(taskType TaskType) bool { + _, err := h.factory.GetHandler(taskType) + return err == nil +} + +// Name returns the composite handler name +func (h *CompositeHandler) Name() string { + return "composite_handler" +} + +// DefaultHandlerFactory returns a HandlerFactory with all default handlers registered +func DefaultHandlerFactory() *HandlerFactory { + factory := NewHandlerFactory() + factory.CreateDefaultHandlers() + return factory +} + +// DefaultCompositeHandler returns a CompositeHandler with all default handlers +func DefaultCompositeHandler() TaskHandler { + factory := DefaultHandlerFactory() + return NewCompositeHandler(factory) +} \ No newline at end of file diff --git a/task/queue_message.go b/task/queue_message.go index 0c6512f..7ea0164 100644 --- a/task/queue_message.go +++ b/task/queue_message.go @@ -1,6 +1,8 @@ package task import ( + "encoding/json" + "github.com/gofrs/uuid" ) @@ -41,7 +43,7 @@ func NewTaskQueueMessageWithPriority(taskID uuid.UUID, taskType TaskType, priori // ToJSON converts the TaskQueueMessage to JSON bytes func (m *TaskQueueMessage) ToJSON() ([]byte, error) { - return []byte{}, nil // Placeholder - actual implementation would use json.Marshal + return json.Marshal(m) } // Validate checks if the TaskQueueMessage is valid diff --git a/task/queue_producer.go b/task/queue_producer.go new file mode 100644 index 0000000..b9f2df1 --- /dev/null +++ b/task/queue_producer.go @@ -0,0 +1,227 @@ +// Package task provides asynchronous task processing with RabbitMQ integration +package task + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "modelRT/config" + "modelRT/logger" + "modelRT/mq" + + "github.com/gofrs/uuid" + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + // TaskExchangeName is the name of the exchange for task routing + TaskExchangeName = "modelrt.tasks.exchange" + // TaskQueueName is the name of the main task queue + TaskQueueName = "modelrt.tasks.queue" + // TaskRoutingKey is the routing key for task messages + TaskRoutingKey = "modelrt.task" + // MaxPriority is the maximum priority level for tasks (0-10) + MaxPriority = 10 + // DefaultMessageTTL is the default time-to-live for task messages (24 hours) + DefaultMessageTTL = 24 * time.Hour +) + +// QueueProducer handles publishing tasks to RabbitMQ +type QueueProducer struct { + conn *amqp.Connection + ch *amqp.Channel +} + +// NewQueueProducer creates a new QueueProducer instance +func NewQueueProducer(ctx context.Context, cfg config.RabbitMQConfig) (*QueueProducer, error) { + // Initialize RabbitMQ connection if not already initialized + mq.InitRabbitProxy(ctx, cfg) + + conn := mq.GetConn() + if conn == nil { + return nil, fmt.Errorf("failed to get RabbitMQ connection") + } + + ch, err := conn.Channel() + if err != nil { + return nil, fmt.Errorf("failed to open channel: %w", err) + } + + producer := &QueueProducer{ + conn: conn, + ch: ch, + } + + // Declare exchange and queue + if err := producer.declareInfrastructure(); err != nil { + ch.Close() + return nil, fmt.Errorf("failed to declare infrastructure: %w", err) + } + + return producer, nil +} + +// declareInfrastructure declares the exchange, queue, and binds them +func (p *QueueProducer) declareInfrastructure() error { + // Declare durable direct exchange + err := p.ch.ExchangeDeclare( + TaskExchangeName, // name + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return fmt.Errorf("failed to declare exchange: %w", err) + } + + // Declare durable queue with priority support and message TTL + _, err = p.ch.QueueDeclare( + TaskQueueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + amqp.Table{ + "x-max-priority": MaxPriority, // support priority levels 0-10 + "x-message-ttl": DefaultMessageTTL.Milliseconds(), // message TTL + }, + ) + if err != nil { + return fmt.Errorf("failed to declare queue: %w", err) + } + + // Bind queue to exchange + err = p.ch.QueueBind( + TaskQueueName, // queue name + TaskRoutingKey, // routing key + TaskExchangeName, // exchange name + false, // no-wait + nil, // arguments + ) + if err != nil { + return fmt.Errorf("failed to bind queue: %w", err) + } + + return nil +} + +// PublishTask publishes a task message to RabbitMQ +func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskType TaskType, priority int) error { + message := NewTaskQueueMessageWithPriority(taskID, taskType, priority) + + // Validate message + if !message.Validate() { + return fmt.Errorf("invalid task message: taskID=%s, taskType=%s", taskID, taskType) + } + + // Convert message to JSON + body, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("failed to marshal task message: %w", err) + } + + // Prepare publishing options + publishing := amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, // Persistent messages survive broker restart + Timestamp: time.Now(), + Priority: uint8(priority), + Headers: amqp.Table{ + "task_id": taskID.String(), + "task_type": string(taskType), + }, + } + + // Publish to exchange + err = p.ch.PublishWithContext( + ctx, + TaskExchangeName, // exchange + TaskRoutingKey, // routing key + false, // mandatory + false, // immediate + publishing, + ) + if err != nil { + return fmt.Errorf("failed to publish task message: %w", err) + } + + logger.Info(ctx, "Task published to queue", + "task_id", taskID.String(), + "task_type", taskType, + "priority", priority, + "queue", TaskQueueName, + ) + + return nil +} + +// PublishTaskWithRetry publishes a task with retry logic +func (p *QueueProducer) PublishTaskWithRetry(ctx context.Context, taskID uuid.UUID, taskType TaskType, priority int, maxRetries int) error { + var lastErr error + for i := range maxRetries { + err := p.PublishTask(ctx, taskID, taskType, priority) + if err == nil { + return nil + } + lastErr = err + + // Exponential backoff + backoff := time.Duration(1<