modelRT/task/queue_producer.go

266 lines
7.4 KiB
Go

// Package task provides asynchronous task processing with RabbitMQ integration
package task
import (
"context"
"encoding/json"
"fmt"
"time"
"modelRT/config"
"modelRT/constants"
"modelRT/logger"
"modelRT/mq"
"github.com/gofrs/uuid"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
oteltrace "go.opentelemetry.io/otel/trace"
)
// TaskMsgChan buffers task messages to be published to RabbitMQ asynchronously
var TaskMsgChan chan *TaskQueueMessage
func init() {
TaskMsgChan = make(chan *TaskQueueMessage, 10000)
}
// QueueProducer handles publishing tasks to RabbitMQ
type QueueProducer struct {
conn *amqp.Connection
ch *amqp.Channel
}
// NewQueueProducer creates a new QueueProducer instance
func NewQueueProducer(ctx context.Context, cfg config.RabbitMQConfig) (*QueueProducer, error) {
// Initialize RabbitMQ connection if not already initialized
mq.InitRabbitProxy(ctx, cfg)
conn := mq.GetConn()
if conn == nil {
return nil, fmt.Errorf("failed to get RabbitMQ connection")
}
ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("failed to open channel: %w", err)
}
producer := &QueueProducer{
conn: conn,
ch: ch,
}
// Declare exchange and queue
if err := producer.declareInfrastructure(); err != nil {
ch.Close()
return nil, fmt.Errorf("failed to declare infrastructure: %w", err)
}
return producer, nil
}
// declareInfrastructure declares the exchange, queue, and binds them
func (p *QueueProducer) declareInfrastructure() error {
// Declare durable direct exchange
err := p.ch.ExchangeDeclare(
constants.TaskExchangeName, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to declare exchange: %w", err)
}
// Declare durable queue with priority support and message TTL
_, err = p.ch.QueueDeclare(
constants.TaskQueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), // message TTL
},
)
if err != nil {
return fmt.Errorf("failed to declare queue: %w", err)
}
// Bind queue to exchange
err = p.ch.QueueBind(
constants.TaskQueueName, // queue name
constants.TaskRoutingKey, // routing key
constants.TaskExchangeName, // exchange name
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to bind queue: %w", err)
}
return nil
}
// PublishTask publishes a task message to RabbitMQ
func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskType TaskType, priority int) error {
message := NewTaskQueueMessageWithPriority(taskID, taskType, priority)
// Validate message
if !message.Validate() {
return fmt.Errorf("invalid task message: taskID=%s, taskType=%s", taskID, taskType)
}
// Inject OTel trace context so the consumer (worker) can restore the span chain
carrier := make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier))
message.TraceCarrier = carrier
// Convert message to JSON
body, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal task message: %w", err)
}
// Prepare publishing options
publishing := amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent, // Persistent messages survive broker restart
Timestamp: time.Now(),
Priority: uint8(priority),
Headers: amqp.Table{
"task_id": taskID.String(),
"task_type": string(taskType),
},
}
// Publish to exchange
err = p.ch.PublishWithContext(
ctx,
constants.TaskExchangeName, // exchange
constants.TaskRoutingKey, // routing key
false, // mandatory
false, // immediate
publishing,
)
if err != nil {
return fmt.Errorf("failed to publish task message: %w", err)
}
logger.Info(ctx, "Task published to queue",
"task_id", taskID.String(),
"task_type", taskType,
"priority", priority,
"queue", constants.TaskQueueName,
)
return nil
}
// PublishTaskWithRetry publishes a task with retry logic
func (p *QueueProducer) PublishTaskWithRetry(ctx context.Context, taskID uuid.UUID, taskType TaskType, priority int, maxRetries int) error {
var lastErr error
for i := range maxRetries {
err := p.PublishTask(ctx, taskID, taskType, priority)
if err == nil {
return nil
}
lastErr = err
// Exponential backoff
backoff := time.Duration(1<<uint(i)) * time.Second
backoff = min(backoff, 10*time.Second)
logger.Warn(ctx, "Failed to publish task, retrying",
"task_id", taskID.String(),
"attempt", i+1,
"max_retries", maxRetries,
"backoff", backoff,
"error", err,
)
select {
case <-time.After(backoff):
continue
case <-ctx.Done():
return ctx.Err()
}
}
return fmt.Errorf("failed to publish task after %d retries: %w", maxRetries, lastErr)
}
// Close closes the producer's channel
func (p *QueueProducer) Close() error {
if p.ch != nil {
return p.ch.Close()
}
return nil
}
// GetQueueInfo returns information about the task queue
func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) {
queue, err := p.ch.QueueDeclarePassive(
constants.TaskQueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-max-priority": constants.TaskMaxPriority,
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(),
},
)
if err != nil {
return nil, fmt.Errorf("failed to inspect queue: %w", err)
}
return &queue, nil
}
// PurgeQueue removes all messages from the task queue
func (p *QueueProducer) PurgeQueue() (int, error) {
return p.ch.QueuePurge(constants.TaskQueueName, false)
}
// PushTaskToRabbitMQ reads from taskChan and publishes to RabbitMQ.
// Must be run as a goroutine; blocks until ctx is cancelled or taskChan is closed.
func PushTaskToRabbitMQ(ctx context.Context, cfg config.RabbitMQConfig, taskChan chan *TaskQueueMessage) {
producer, err := NewQueueProducer(ctx, cfg)
if err != nil {
logger.Error(ctx, "init task queue producer failed", "error", err)
return
}
defer producer.Close()
for {
select {
case <-ctx.Done():
logger.Info(ctx, "push task to RabbitMQ stopped by context cancel")
return
case msg, ok := <-taskChan:
if !ok {
logger.Info(ctx, "task channel closed, exiting push loop")
return
}
// Restore trace context from the handler that enqueued this message
taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier))
taskCtx, pubSpan := otel.Tracer("modelRT/task").Start(taskCtx, "task.publish",
oteltrace.WithAttributes(attribute.String("task_id", msg.TaskID.String())),
)
if err := producer.PublishTaskWithRetry(taskCtx, msg.TaskID, msg.TaskType, msg.Priority, 3); err != nil {
pubSpan.RecordError(err)
logger.Error(taskCtx, "publish task to RabbitMQ failed",
"task_id", msg.TaskID, "error", err)
}
pubSpan.End()
}
}
}