style: normalize log messages to lowercase across task package
- lowercase first letter of all logger.Info/Warn/Error message strings
in task/worker.go, task/retry_queue.go, task/handler_factory.go,
task/metrics_logger.go, task/retry_manager.go, task/queue_producer.go,
task/initializer.go, task/test_task.go, and main.go
- fix inline comments in main.go that mixed Chinese and uppercase English
- align Dockerfile comment casing with project convention
This commit is contained in:
parent
c17ddb80b9
commit
c6545e29ba
|
|
@ -11,8 +11,8 @@ RUN CGO_ENABLED=0 GOOS=linux go build \
|
|||
-mod=readonly \
|
||||
-o modelrt main.go
|
||||
|
||||
# Prepare runtime dependencies in a pinned Alpine stage so they can be
|
||||
# copied into scratch without pulling any vulnerable OS packages at run time.
|
||||
# prepare runtime dependencies in a pinned alpine stage so they can be
|
||||
# copied into scratch without pulling any vulnerable os packages at run time.
|
||||
FROM alpine:3.21 AS certs
|
||||
ARG USER_ID=1000
|
||||
RUN apk --no-cache add ca-certificates tzdata && \
|
||||
|
|
|
|||
8
main.go
8
main.go
|
|
@ -179,8 +179,8 @@ func main() {
|
|||
// init async task worker
|
||||
taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Failed to initialize task worker", "error", err)
|
||||
// Continue without task worker, but log warning
|
||||
logger.Error(ctx, "failed to initialize task worker", "error", err)
|
||||
// continue without task worker, but log warning
|
||||
} else {
|
||||
go taskWorker.Start()
|
||||
defer taskWorker.Stop()
|
||||
|
|
@ -258,7 +258,7 @@ func main() {
|
|||
gin.SetMode(gin.ReleaseMode)
|
||||
}
|
||||
engine := gin.New()
|
||||
// 添加CORS中间件
|
||||
// add CORS middleware
|
||||
engine.Use(cors.New(cors.Config{
|
||||
AllowOrigins: []string{"*"}, // 或指定具体域名
|
||||
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
|
||||
|
|
@ -278,7 +278,7 @@ func main() {
|
|||
Handler: engine,
|
||||
}
|
||||
|
||||
// creating a System Signal Receiver
|
||||
// creating a system signal receiver
|
||||
done := make(chan os.Signal, 10)
|
||||
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ func (f *HandlerFactory) RegisterHandler(ctx context.Context, taskType TaskType,
|
|||
defer f.mu.Unlock()
|
||||
|
||||
f.handlers[taskType] = handler
|
||||
logger.Info(ctx, "Handler registered",
|
||||
logger.Info(ctx, "handler registered",
|
||||
"task_type", taskType,
|
||||
"handler_name", handler.Name(),
|
||||
)
|
||||
|
|
@ -319,7 +319,7 @@ func NewEventAnalysisHandler() *EventAnalysisHandler {
|
|||
|
||||
// Execute processes an event analysis task
|
||||
func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
||||
logger.Info(ctx, "Starting event analysis",
|
||||
logger.Info(ctx, "starting event analysis",
|
||||
"task_id", taskID,
|
||||
"task_params", params,
|
||||
)
|
||||
|
|
@ -332,7 +332,7 @@ func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, pa
|
|||
// 4. Storing results in database
|
||||
|
||||
// Simulate work
|
||||
logger.Info(ctx, "Event analysis completed",
|
||||
logger.Info(ctx, "event analysis completed",
|
||||
"task_id", taskID,
|
||||
"task_params", params,
|
||||
"db", db,
|
||||
|
|
@ -360,7 +360,7 @@ func NewBatchImportHandler() *BatchImportHandler {
|
|||
|
||||
// Execute processes a batch import task
|
||||
func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
||||
logger.Info(ctx, "Starting batch import",
|
||||
logger.Info(ctx, "starting batch import",
|
||||
"task_id", taskID,
|
||||
"task_params", params,
|
||||
"db", db,
|
||||
|
|
@ -374,7 +374,7 @@ func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, para
|
|||
// 4. Generating import report
|
||||
|
||||
// Simulate work
|
||||
logger.Info(ctx, "Batch import completed",
|
||||
logger.Info(ctx, "batch import completed",
|
||||
"task_id", taskID,
|
||||
"task_params", params,
|
||||
"db", db,
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ func InitTaskWorker(ctx context.Context, config config.ModelRTConfig, db *gorm.D
|
|||
return nil, fmt.Errorf("failed to create task worker: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Task worker initialized",
|
||||
logger.Info(ctx, "task worker initialized",
|
||||
"worker_pool_size", workerCfg.PoolSize,
|
||||
"queue_consumers", workerCfg.QueueConsumerCount,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ func NewMetricsLogger(ctx context.Context) *MetricsLogger {
|
|||
|
||||
// LogTaskMetrics records task processing metrics
|
||||
func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, processingTime time.Duration, retryCount int) {
|
||||
logger.Info(m.ctx, "Task metrics",
|
||||
logger.Info(m.ctx, "task metrics",
|
||||
"task_type", taskType,
|
||||
"status", status,
|
||||
"processing_time_ms", processingTime.Milliseconds(),
|
||||
|
|
@ -33,7 +33,7 @@ func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, process
|
|||
|
||||
// LogQueueMetrics records queue metrics
|
||||
func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Duration) {
|
||||
logger.Info(m.ctx, "Queue metrics",
|
||||
logger.Info(m.ctx, "queue metrics",
|
||||
"queue_depth", queueDepth,
|
||||
"queue_latency_ms", queueLatency.Milliseconds(),
|
||||
"metric_type", "queue",
|
||||
|
|
@ -43,7 +43,7 @@ func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Durati
|
|||
|
||||
// LogWorkerMetrics records worker metrics
|
||||
func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorkers int, memoryUsage uint64, cpuLoad float64) {
|
||||
logger.Info(m.ctx, "Worker metrics",
|
||||
logger.Info(m.ctx, "worker metrics",
|
||||
"active_workers", activeWorkers,
|
||||
"idle_workers", idleWorkers,
|
||||
"total_workers", totalWorkers,
|
||||
|
|
@ -56,7 +56,7 @@ func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorker
|
|||
|
||||
// LogRetryMetrics records retry metrics
|
||||
func (m *MetricsLogger) LogRetryMetrics(taskType TaskType, retryCount int, success bool, delay time.Duration) {
|
||||
logger.Info(m.ctx, "Retry metrics",
|
||||
logger.Info(m.ctx, "retry metrics",
|
||||
"task_type", taskType,
|
||||
"retry_count", retryCount,
|
||||
"retry_success", success,
|
||||
|
|
@ -71,7 +71,7 @@ func (m *MetricsLogger) LogSystemMetrics() {
|
|||
var memStats runtime.MemStats
|
||||
runtime.ReadMemStats(&memStats)
|
||||
|
||||
logger.Info(m.ctx, "System metrics",
|
||||
logger.Info(m.ctx, "system metrics",
|
||||
"metric_type", "system",
|
||||
"timestamp", time.Now().Unix(),
|
||||
"goroutines", runtime.NumGoroutine(),
|
||||
|
|
@ -90,7 +90,7 @@ func (m *MetricsLogger) LogSystemMetrics() {
|
|||
func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string, startTime, endTime time.Time, retryCount int, errorMsg string) {
|
||||
duration := endTime.Sub(startTime)
|
||||
|
||||
logger.Info(m.ctx, "Task completion metrics",
|
||||
logger.Info(m.ctx, "task completion metrics",
|
||||
"metric_type", "task_completion",
|
||||
"timestamp", time.Now().Unix(),
|
||||
"task_id", taskID,
|
||||
|
|
@ -107,7 +107,7 @@ func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string
|
|||
|
||||
// LogHealthCheckMetrics records health check metrics
|
||||
func (m *MetricsLogger) LogHealthCheckMetrics(healthy bool, checkDuration time.Duration, components map[string]bool) {
|
||||
logger.Info(m.ctx, "Health check metrics",
|
||||
logger.Info(m.ctx, "health check metrics",
|
||||
"metric_type", "health_check",
|
||||
"timestamp", time.Now().Unix(),
|
||||
"healthy", healthy,
|
||||
|
|
|
|||
|
|
@ -156,7 +156,7 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT
|
|||
return fmt.Errorf("failed to publish task message: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Task published to queue",
|
||||
logger.Info(ctx, "task published to queue",
|
||||
"task_id", taskID.String(),
|
||||
"task_type", taskType,
|
||||
"priority", priority,
|
||||
|
|
@ -180,7 +180,7 @@ func (p *QueueProducer) PublishTaskWithRetry(ctx context.Context, taskID uuid.UU
|
|||
backoff := time.Duration(1<<uint(i)) * time.Second
|
||||
backoff = min(backoff, 10*time.Second)
|
||||
|
||||
logger.Warn(ctx, "Failed to publish task, retrying",
|
||||
logger.Warn(ctx, "failed to publish task, retrying",
|
||||
"task_id", taskID.String(),
|
||||
"attempt", i+1,
|
||||
"max_retries", maxRetries,
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ func NewExponentialBackoffRetry(maxRetries int, initialDelay, maxDelay time.Dura
|
|||
// ShouldRetry implements exponential backoff with jitter
|
||||
func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string, retryCount int, lastError error) (bool, time.Duration) {
|
||||
if retryCount >= s.MaxRetries {
|
||||
logger.Info(ctx, "Task reached maximum retry count",
|
||||
logger.Info(ctx, "task reached maximum retry count",
|
||||
"task_id", taskID,
|
||||
"retry_count", retryCount,
|
||||
"max_retries", s.MaxRetries,
|
||||
|
|
@ -86,7 +86,7 @@ func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string
|
|||
}
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Task will be retried",
|
||||
logger.Info(ctx, "task will be retried",
|
||||
"task_id", taskID,
|
||||
"retry_count", retryCount,
|
||||
"next_retry_in", delay,
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
|||
shouldRetry, delay := q.strategy.ShouldRetry(ctx, taskID.String(), retryCount, lastError)
|
||||
if !shouldRetry {
|
||||
// Mark task as permanently failed
|
||||
logger.Info(ctx, "Task will not be retried, marking as failed",
|
||||
logger.Info(ctx, "task will not be retried, marking as failed",
|
||||
"task_id", taskID,
|
||||
"retry_count", retryCount,
|
||||
"max_retries", q.strategy.GetMaxRetries(),
|
||||
|
|
@ -63,7 +63,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
|||
}
|
||||
if err := database.UpdateTaskErrorInfo(ctx, tx, taskID, errorMsg, ""); err != nil {
|
||||
// Log but don't fail the whole retry scheduling
|
||||
logger.Warn(ctx, "Failed to update task error info",
|
||||
logger.Warn(ctx, "failed to update task error info",
|
||||
"task_id", taskID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -74,7 +74,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
|||
})
|
||||
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Failed to schedule task retry",
|
||||
logger.Error(ctx, "failed to schedule task retry",
|
||||
"task_id", taskID,
|
||||
"task_type", taskType,
|
||||
"retry_count", retryCount,
|
||||
|
|
@ -84,7 +84,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy
|
|||
return err
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Task scheduled for retry",
|
||||
logger.Info(ctx, "task scheduled for retry",
|
||||
"task_id", taskID,
|
||||
"task_type", taskType,
|
||||
"retry_count", retryCount+1,
|
||||
|
|
@ -100,7 +100,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
|||
// Get tasks due for retry
|
||||
tasks, err := database.GetTasksForRetry(ctx, q.db, batchSize)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Failed to get tasks for retry", "error", err)
|
||||
logger.Error(ctx, "failed to get tasks for retry", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -108,7 +108,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
|||
return nil
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Processing retry queue",
|
||||
logger.Info(ctx, "processing retry queue",
|
||||
"task_count", len(tasks),
|
||||
"batch_size", batchSize,
|
||||
)
|
||||
|
|
@ -121,7 +121,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
|||
// Publish task to queue for immediate processing
|
||||
taskType := TaskType(task.TaskType)
|
||||
if err := q.producer.PublishTask(ctx, task.TaskID, taskType, task.Priority, map[string]any(task.Params)); err != nil {
|
||||
logger.Error(ctx, "Failed to publish retry task to queue",
|
||||
logger.Error(ctx, "failed to publish retry task to queue",
|
||||
"task_id", task.TaskID,
|
||||
"task_type", taskType,
|
||||
"error", err,
|
||||
|
|
@ -132,7 +132,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
|||
|
||||
// Update task status back to submitted
|
||||
if err := database.UpdateAsyncTaskStatus(ctx, q.db, task.TaskID, "SUBMITTED"); err != nil {
|
||||
logger.Warn(ctx, "Failed to update retry task status",
|
||||
logger.Warn(ctx, "failed to update retry task status",
|
||||
"task_id", task.TaskID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -140,13 +140,13 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error
|
|||
|
||||
// Clear next retry time since task is being retried now
|
||||
if err := database.UpdateTaskRetryInfo(ctx, q.db, task.TaskID, task.RetryCount, 0); err != nil {
|
||||
logger.Warn(ctx, "Failed to clear next retry time",
|
||||
logger.Warn(ctx, "failed to clear next retry time",
|
||||
"task_id", task.TaskID,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Retry task resubmitted",
|
||||
logger.Info(ctx, "retry task resubmitted",
|
||||
"task_id", task.TaskID,
|
||||
"task_type", taskType,
|
||||
"retry_count", task.RetryCount,
|
||||
|
|
@ -166,11 +166,11 @@ func (q *RetryQueue) StartRetryScheduler(ctx context.Context, interval time.Dura
|
|||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info(ctx, "Retry scheduler stopping")
|
||||
logger.Info(ctx, "retry scheduler stopping")
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := q.ProcessRetryQueue(ctx, batchSize); err != nil {
|
||||
logger.Error(ctx, "Error processing retry queue", "error", err)
|
||||
logger.Error(ctx, "error processing retry queue", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
|
|||
return fmt.Errorf("invalid parameter type for TestTask")
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Starting test task executionser",
|
||||
logger.Info(ctx, "starting test task executionser",
|
||||
"task_id", taskID,
|
||||
"sleep_duration_seconds", params.SleepDuration,
|
||||
"message", params.Message,
|
||||
|
|
@ -113,14 +113,14 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
|
|||
|
||||
// Save result to database
|
||||
if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil {
|
||||
logger.Error(ctx, "Failed to save test task result",
|
||||
logger.Error(ctx, "failed to save test task result",
|
||||
"task_id", taskID,
|
||||
"error", err,
|
||||
)
|
||||
return fmt.Errorf("failed to save task result: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Test task completed successfully",
|
||||
logger.Info(ctx, "test task completed successfully",
|
||||
"task_id", taskID,
|
||||
"sleep_duration_seconds", params.SleepDuration,
|
||||
)
|
||||
|
|
@ -142,7 +142,7 @@ func NewTestTaskHandler() *TestTaskHandler {
|
|||
|
||||
// Execute processes a test task using the unified task interface
|
||||
func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error {
|
||||
logger.Info(ctx, "Executing test task",
|
||||
logger.Info(ctx, "executing test task",
|
||||
"task_id", taskID,
|
||||
"task_params", params,
|
||||
"db", db,
|
||||
|
|
|
|||
|
|
@ -178,7 +178,7 @@ func NewTaskWorker(ctx context.Context, cfg WorkerConfig, db *gorm.DB, rabbitCfg
|
|||
|
||||
// Start begins consuming tasks from the queue
|
||||
func (w *TaskWorker) Start() error {
|
||||
logger.Info(w.ctx, "Starting task worker",
|
||||
logger.Info(w.ctx, "starting task worker",
|
||||
"pool_size", w.cfg.PoolSize,
|
||||
"queue_consumers", w.cfg.QueueConsumerCount,
|
||||
)
|
||||
|
|
@ -193,7 +193,7 @@ func (w *TaskWorker) Start() error {
|
|||
w.wg.Add(1)
|
||||
go w.healthCheckLoop()
|
||||
|
||||
logger.Info(w.ctx, "Task worker started successfully")
|
||||
logger.Info(w.ctx, "task worker started successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -201,7 +201,7 @@ func (w *TaskWorker) Start() error {
|
|||
func (w *TaskWorker) consumerLoop(consumerID int) {
|
||||
defer w.wg.Done()
|
||||
|
||||
logger.Info(w.ctx, "Starting consumer", "consumer_id", consumerID)
|
||||
logger.Info(w.ctx, "starting consumer", "consumer_id", consumerID)
|
||||
|
||||
// Consume messages from the queue
|
||||
msgs, err := w.ch.Consume(
|
||||
|
|
@ -214,7 +214,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
|
|||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
logger.Error(w.ctx, "Failed to start consumer",
|
||||
logger.Error(w.ctx, "failed to start consumer",
|
||||
"consumer_id", consumerID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -224,11 +224,11 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
|
|||
for {
|
||||
select {
|
||||
case <-w.stopChan:
|
||||
logger.Info(w.ctx, "Consumer stopping", "consumer_id", consumerID)
|
||||
logger.Info(w.ctx, "consumer stopping", "consumer_id", consumerID)
|
||||
return
|
||||
case msg, ok := <-msgs:
|
||||
if !ok {
|
||||
logger.Warn(w.ctx, "Consumer channel closed", "consumer_id", consumerID)
|
||||
logger.Warn(w.ctx, "consumer channel closed", "consumer_id", consumerID)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -237,7 +237,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) {
|
|||
w.handleMessage(msg)
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(w.ctx, "Failed to submit task to pool",
|
||||
logger.Error(w.ctx, "failed to submit task to pool",
|
||||
"consumer_id", consumerID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -265,7 +265,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
// Parse task message
|
||||
var taskMsg TaskQueueMessage
|
||||
if err := json.Unmarshal(msg.Body, &taskMsg); err != nil {
|
||||
logger.Error(ctx, "Failed to unmarshal task message", "error", err)
|
||||
logger.Error(ctx, "failed to unmarshal task message", "error", err)
|
||||
msg.Nack(false, false) // Reject without requeue
|
||||
w.metrics.mu.Lock()
|
||||
w.metrics.TotalFailed++
|
||||
|
|
@ -275,7 +275,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
|
||||
// Validate message
|
||||
if !taskMsg.Validate() {
|
||||
logger.Error(ctx, "Invalid task message",
|
||||
logger.Error(ctx, "invalid task message",
|
||||
"task_id", taskMsg.TaskID,
|
||||
"task_type", taskMsg.TaskType,
|
||||
)
|
||||
|
|
@ -299,7 +299,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
defer span.End()
|
||||
ctx = taskCtx
|
||||
|
||||
logger.Info(ctx, "Processing task",
|
||||
logger.Info(ctx, "processing task",
|
||||
"task_id", taskMsg.TaskID,
|
||||
"task_type", taskMsg.TaskType,
|
||||
"priority", taskMsg.Priority,
|
||||
|
|
@ -307,7 +307,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
|
||||
// Update task status to RUNNING in database
|
||||
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusRunning); err != nil {
|
||||
logger.Error(ctx, "Failed to update task status", "error", err)
|
||||
logger.Error(ctx, "failed to update task status", "error", err)
|
||||
msg.Nack(false, true) // Reject with requeue
|
||||
w.metrics.mu.Lock()
|
||||
w.metrics.TotalFailed++
|
||||
|
|
@ -326,7 +326,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
processingTime := time.Since(startTime)
|
||||
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Task execution failed",
|
||||
logger.Error(ctx, "task execution failed",
|
||||
"task_id", taskMsg.TaskID,
|
||||
"task_type", taskMsg.TaskType,
|
||||
"processing_time", processingTime,
|
||||
|
|
@ -335,7 +335,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
|
||||
// Update task status to FAILED
|
||||
if updateErr := w.updateTaskWithError(ctx, taskMsg.TaskID, err); updateErr != nil {
|
||||
logger.Error(ctx, "Failed to update task with error", "error", updateErr)
|
||||
logger.Error(ctx, "failed to update task with error", "error", updateErr)
|
||||
}
|
||||
|
||||
if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil {
|
||||
|
|
@ -353,7 +353,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
|
||||
// Update task status to COMPLETED
|
||||
if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusCompleted); err != nil {
|
||||
logger.Error(ctx, "Failed to update task status to completed", "error", err)
|
||||
logger.Error(ctx, "failed to update task status to completed", "error", err)
|
||||
// Still ack the message since task was processed successfully
|
||||
}
|
||||
|
||||
|
|
@ -364,7 +364,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) {
|
|||
// Acknowledge message
|
||||
msg.Ack(false)
|
||||
|
||||
logger.Info(ctx, "Task completed successfully",
|
||||
logger.Info(ctx, "task completed successfully",
|
||||
"task_id", taskMsg.TaskID,
|
||||
"task_type", taskMsg.TaskType,
|
||||
"processing_time", processingTime,
|
||||
|
|
@ -398,7 +398,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
|||
// Update task status in database
|
||||
err := database.UpdateAsyncTaskStatus(ctx, w.db, taskID, ormStatus)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "Failed to update task status in database",
|
||||
logger.Error(ctx, "failed to update task status in database",
|
||||
"task_id", taskID,
|
||||
"status", status,
|
||||
"error", err,
|
||||
|
|
@ -410,7 +410,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
|||
if status == StatusRunning {
|
||||
startedAt := time.Now().Unix()
|
||||
if err := database.UpdateTaskStarted(ctx, w.db, taskID, startedAt); err != nil {
|
||||
logger.Warn(ctx, "Failed to update task start time",
|
||||
logger.Warn(ctx, "failed to update task start time",
|
||||
"task_id", taskID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -423,14 +423,14 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
|||
finishedAt := time.Now().Unix()
|
||||
if status == StatusCompleted {
|
||||
if err := database.CompleteAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
|
||||
logger.Warn(ctx, "Failed to mark task as completed",
|
||||
logger.Warn(ctx, "failed to mark task as completed",
|
||||
"task_id", taskID,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
if err := database.FailAsyncTask(ctx, w.db, taskID, finishedAt); err != nil {
|
||||
logger.Warn(ctx, "Failed to mark task as failed",
|
||||
logger.Warn(ctx, "failed to mark task as failed",
|
||||
"task_id", taskID,
|
||||
"error", err,
|
||||
)
|
||||
|
|
@ -438,7 +438,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta
|
|||
}
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "Task status updated",
|
||||
logger.Debug(ctx, "task status updated",
|
||||
"task_id", taskID,
|
||||
"status", status,
|
||||
)
|
||||
|
|
@ -451,16 +451,16 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID,
|
|||
stackTrace := fmt.Sprintf("%+v", err)
|
||||
|
||||
if updateErr := database.UpdateTaskErrorInfo(ctx, w.db, taskID, errorMsg, stackTrace); updateErr != nil {
|
||||
logger.Error(ctx, "Failed to update task error info", "task_id", taskID, "error", updateErr)
|
||||
logger.Error(ctx, "failed to update task error info", "task_id", taskID, "error", updateErr)
|
||||
return updateErr
|
||||
}
|
||||
|
||||
if updateErr := database.UpdateAsyncTaskResultWithError(ctx, w.db, taskID, 500, errorMsg, nil); updateErr != nil {
|
||||
logger.Error(ctx, "Failed to update task result with error", "task_id", taskID, "error", updateErr)
|
||||
logger.Error(ctx, "failed to update task result with error", "task_id", taskID, "error", updateErr)
|
||||
return updateErr
|
||||
}
|
||||
|
||||
logger.Warn(ctx, "Task failed with error", "task_id", taskID, "error", errorMsg)
|
||||
logger.Warn(ctx, "task failed with error", "task_id", taskID, "error", errorMsg)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -469,7 +469,7 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID,
|
|||
func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uuid.UUID, params map[string]any, msg *amqp.Delivery) error {
|
||||
handler, err := w.factory.GetHandler(taskType)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "No handler for task type", "task_type", taskType)
|
||||
logger.Error(ctx, "no handler for task type", "task_type", taskType)
|
||||
msg.Nack(false, false)
|
||||
return err
|
||||
}
|
||||
|
|
@ -519,7 +519,7 @@ func (w *TaskWorker) checkHealth() {
|
|||
w.metrics.WorkersIdle = w.pool.Free()
|
||||
w.metrics.LastHealthCheck = time.Now()
|
||||
|
||||
logger.Info(w.ctx, "Worker health check",
|
||||
logger.Info(w.ctx, "worker health check",
|
||||
"tasks_processed", w.metrics.TotalProcessed,
|
||||
"tasks_failed", w.metrics.TotalFailed,
|
||||
"tasks_success", w.metrics.TotalSuccess,
|
||||
|
|
@ -536,7 +536,7 @@ func (w *TaskWorker) checkHealth() {
|
|||
|
||||
// Stop gracefully stops the task worker
|
||||
func (w *TaskWorker) Stop() error {
|
||||
logger.Info(w.ctx, "Stopping task worker")
|
||||
logger.Info(w.ctx, "stopping task worker")
|
||||
|
||||
// Signal all goroutines to stop
|
||||
close(w.stopChan)
|
||||
|
|
|
|||
Loading…
Reference in New Issue