diff --git a/deploy/dockerfile/modelrt.Dockerfile b/deploy/dockerfile/modelrt.Dockerfile index 4f83bb4..57819b9 100644 --- a/deploy/dockerfile/modelrt.Dockerfile +++ b/deploy/dockerfile/modelrt.Dockerfile @@ -11,8 +11,8 @@ RUN CGO_ENABLED=0 GOOS=linux go build \ -mod=readonly \ -o modelrt main.go -# Prepare runtime dependencies in a pinned Alpine stage so they can be -# copied into scratch without pulling any vulnerable OS packages at run time. +# prepare runtime dependencies in a pinned alpine stage so they can be +# copied into scratch without pulling any vulnerable os packages at run time. FROM alpine:3.21 AS certs ARG USER_ID=1000 RUN apk --no-cache add ca-certificates tzdata && \ diff --git a/main.go b/main.go index 259b269..ea1a710 100644 --- a/main.go +++ b/main.go @@ -179,8 +179,8 @@ func main() { // init async task worker taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient) if err != nil { - logger.Error(ctx, "Failed to initialize task worker", "error", err) - // Continue without task worker, but log warning + logger.Error(ctx, "failed to initialize task worker", "error", err) + // continue without task worker, but log warning } else { go taskWorker.Start() defer taskWorker.Stop() @@ -258,7 +258,7 @@ func main() { gin.SetMode(gin.ReleaseMode) } engine := gin.New() - // 添加CORS中间件 + // add CORS middleware engine.Use(cors.New(cors.Config{ AllowOrigins: []string{"*"}, // 或指定具体域名 AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, @@ -278,7 +278,7 @@ func main() { Handler: engine, } - // creating a System Signal Receiver + // creating a system signal receiver done := make(chan os.Signal, 10) signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { diff --git a/task/handler_factory.go b/task/handler_factory.go index b705e27..26e295a 100644 --- a/task/handler_factory.go +++ b/task/handler_factory.go @@ -44,7 +44,7 @@ func (f *HandlerFactory) RegisterHandler(ctx context.Context, taskType TaskType, defer f.mu.Unlock() f.handlers[taskType] = handler - logger.Info(ctx, "Handler registered", + logger.Info(ctx, "handler registered", "task_type", taskType, "handler_name", handler.Name(), ) @@ -319,7 +319,7 @@ func NewEventAnalysisHandler() *EventAnalysisHandler { // Execute processes an event analysis task func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error { - logger.Info(ctx, "Starting event analysis", + logger.Info(ctx, "starting event analysis", "task_id", taskID, "task_params", params, ) @@ -332,7 +332,7 @@ func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, pa // 4. Storing results in database // Simulate work - logger.Info(ctx, "Event analysis completed", + logger.Info(ctx, "event analysis completed", "task_id", taskID, "task_params", params, "db", db, @@ -360,7 +360,7 @@ func NewBatchImportHandler() *BatchImportHandler { // Execute processes a batch import task func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error { - logger.Info(ctx, "Starting batch import", + logger.Info(ctx, "starting batch import", "task_id", taskID, "task_params", params, "db", db, @@ -374,7 +374,7 @@ func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, para // 4. Generating import report // Simulate work - logger.Info(ctx, "Batch import completed", + logger.Info(ctx, "batch import completed", "task_id", taskID, "task_params", params, "db", db, diff --git a/task/initializer.go b/task/initializer.go index 008d721..91db9b4 100644 --- a/task/initializer.go +++ b/task/initializer.go @@ -30,7 +30,7 @@ func InitTaskWorker(ctx context.Context, config config.ModelRTConfig, db *gorm.D return nil, fmt.Errorf("failed to create task worker: %w", err) } - logger.Info(ctx, "Task worker initialized", + logger.Info(ctx, "task worker initialized", "worker_pool_size", workerCfg.PoolSize, "queue_consumers", workerCfg.QueueConsumerCount, ) diff --git a/task/metrics_logger.go b/task/metrics_logger.go index d8c0675..8b8521f 100644 --- a/task/metrics_logger.go +++ b/task/metrics_logger.go @@ -21,7 +21,7 @@ func NewMetricsLogger(ctx context.Context) *MetricsLogger { // LogTaskMetrics records task processing metrics func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, processingTime time.Duration, retryCount int) { - logger.Info(m.ctx, "Task metrics", + logger.Info(m.ctx, "task metrics", "task_type", taskType, "status", status, "processing_time_ms", processingTime.Milliseconds(), @@ -33,7 +33,7 @@ func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, process // LogQueueMetrics records queue metrics func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Duration) { - logger.Info(m.ctx, "Queue metrics", + logger.Info(m.ctx, "queue metrics", "queue_depth", queueDepth, "queue_latency_ms", queueLatency.Milliseconds(), "metric_type", "queue", @@ -43,7 +43,7 @@ func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Durati // LogWorkerMetrics records worker metrics func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorkers int, memoryUsage uint64, cpuLoad float64) { - logger.Info(m.ctx, "Worker metrics", + logger.Info(m.ctx, "worker metrics", "active_workers", activeWorkers, "idle_workers", idleWorkers, "total_workers", totalWorkers, @@ -56,7 +56,7 @@ func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorker // LogRetryMetrics records retry metrics func (m *MetricsLogger) LogRetryMetrics(taskType TaskType, retryCount int, success bool, delay time.Duration) { - logger.Info(m.ctx, "Retry metrics", + logger.Info(m.ctx, "retry metrics", "task_type", taskType, "retry_count", retryCount, "retry_success", success, @@ -71,7 +71,7 @@ func (m *MetricsLogger) LogSystemMetrics() { var memStats runtime.MemStats runtime.ReadMemStats(&memStats) - logger.Info(m.ctx, "System metrics", + logger.Info(m.ctx, "system metrics", "metric_type", "system", "timestamp", time.Now().Unix(), "goroutines", runtime.NumGoroutine(), @@ -90,7 +90,7 @@ func (m *MetricsLogger) LogSystemMetrics() { func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string, startTime, endTime time.Time, retryCount int, errorMsg string) { duration := endTime.Sub(startTime) - logger.Info(m.ctx, "Task completion metrics", + logger.Info(m.ctx, "task completion metrics", "metric_type", "task_completion", "timestamp", time.Now().Unix(), "task_id", taskID, @@ -107,7 +107,7 @@ func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string // LogHealthCheckMetrics records health check metrics func (m *MetricsLogger) LogHealthCheckMetrics(healthy bool, checkDuration time.Duration, components map[string]bool) { - logger.Info(m.ctx, "Health check metrics", + logger.Info(m.ctx, "health check metrics", "metric_type", "health_check", "timestamp", time.Now().Unix(), "healthy", healthy, diff --git a/task/queue_producer.go b/task/queue_producer.go index 3ae6185..cfacad6 100644 --- a/task/queue_producer.go +++ b/task/queue_producer.go @@ -156,7 +156,7 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT return fmt.Errorf("failed to publish task message: %w", err) } - logger.Info(ctx, "Task published to queue", + logger.Info(ctx, "task published to queue", "task_id", taskID.String(), "task_type", taskType, "priority", priority, @@ -180,7 +180,7 @@ func (p *QueueProducer) PublishTaskWithRetry(ctx context.Context, taskID uuid.UU backoff := time.Duration(1<= s.MaxRetries { - logger.Info(ctx, "Task reached maximum retry count", + logger.Info(ctx, "task reached maximum retry count", "task_id", taskID, "retry_count", retryCount, "max_retries", s.MaxRetries, @@ -86,7 +86,7 @@ func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string } } - logger.Info(ctx, "Task will be retried", + logger.Info(ctx, "task will be retried", "task_id", taskID, "retry_count", retryCount, "next_retry_in", delay, diff --git a/task/retry_queue.go b/task/retry_queue.go index 34499bd..887a687 100644 --- a/task/retry_queue.go +++ b/task/retry_queue.go @@ -38,7 +38,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy shouldRetry, delay := q.strategy.ShouldRetry(ctx, taskID.String(), retryCount, lastError) if !shouldRetry { // Mark task as permanently failed - logger.Info(ctx, "Task will not be retried, marking as failed", + logger.Info(ctx, "task will not be retried, marking as failed", "task_id", taskID, "retry_count", retryCount, "max_retries", q.strategy.GetMaxRetries(), @@ -63,7 +63,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy } if err := database.UpdateTaskErrorInfo(ctx, tx, taskID, errorMsg, ""); err != nil { // Log but don't fail the whole retry scheduling - logger.Warn(ctx, "Failed to update task error info", + logger.Warn(ctx, "failed to update task error info", "task_id", taskID, "error", err, ) @@ -74,7 +74,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy }) if err != nil { - logger.Error(ctx, "Failed to schedule task retry", + logger.Error(ctx, "failed to schedule task retry", "task_id", taskID, "task_type", taskType, "retry_count", retryCount, @@ -84,7 +84,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy return err } - logger.Info(ctx, "Task scheduled for retry", + logger.Info(ctx, "task scheduled for retry", "task_id", taskID, "task_type", taskType, "retry_count", retryCount+1, @@ -100,7 +100,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error // Get tasks due for retry tasks, err := database.GetTasksForRetry(ctx, q.db, batchSize) if err != nil { - logger.Error(ctx, "Failed to get tasks for retry", "error", err) + logger.Error(ctx, "failed to get tasks for retry", "error", err) return err } @@ -108,7 +108,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error return nil } - logger.Info(ctx, "Processing retry queue", + logger.Info(ctx, "processing retry queue", "task_count", len(tasks), "batch_size", batchSize, ) @@ -121,7 +121,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error // Publish task to queue for immediate processing taskType := TaskType(task.TaskType) if err := q.producer.PublishTask(ctx, task.TaskID, taskType, task.Priority, map[string]any(task.Params)); err != nil { - logger.Error(ctx, "Failed to publish retry task to queue", + logger.Error(ctx, "failed to publish retry task to queue", "task_id", task.TaskID, "task_type", taskType, "error", err, @@ -132,7 +132,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error // Update task status back to submitted if err := database.UpdateAsyncTaskStatus(ctx, q.db, task.TaskID, "SUBMITTED"); err != nil { - logger.Warn(ctx, "Failed to update retry task status", + logger.Warn(ctx, "failed to update retry task status", "task_id", task.TaskID, "error", err, ) @@ -140,13 +140,13 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error // Clear next retry time since task is being retried now if err := database.UpdateTaskRetryInfo(ctx, q.db, task.TaskID, task.RetryCount, 0); err != nil { - logger.Warn(ctx, "Failed to clear next retry time", + logger.Warn(ctx, "failed to clear next retry time", "task_id", task.TaskID, "error", err, ) } - logger.Info(ctx, "Retry task resubmitted", + logger.Info(ctx, "retry task resubmitted", "task_id", task.TaskID, "task_type", taskType, "retry_count", task.RetryCount, @@ -166,11 +166,11 @@ func (q *RetryQueue) StartRetryScheduler(ctx context.Context, interval time.Dura for { select { case <-ctx.Done(): - logger.Info(ctx, "Retry scheduler stopping") + logger.Info(ctx, "retry scheduler stopping") return case <-ticker.C: if err := q.ProcessRetryQueue(ctx, batchSize); err != nil { - logger.Error(ctx, "Error processing retry queue", "error", err) + logger.Error(ctx, "error processing retry queue", "error", err) } } } diff --git a/task/test_task.go b/task/test_task.go index 14edb14..562dbb8 100644 --- a/task/test_task.go +++ b/task/test_task.go @@ -91,7 +91,7 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e return fmt.Errorf("invalid parameter type for TestTask") } - logger.Info(ctx, "Starting test task executionser", + logger.Info(ctx, "starting test task executionser", "task_id", taskID, "sleep_duration_seconds", params.SleepDuration, "message", params.Message, @@ -113,14 +113,14 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e // Save result to database if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil { - logger.Error(ctx, "Failed to save test task result", + logger.Error(ctx, "failed to save test task result", "task_id", taskID, "error", err, ) return fmt.Errorf("failed to save task result: %w", err) } - logger.Info(ctx, "Test task completed successfully", + logger.Info(ctx, "test task completed successfully", "task_id", taskID, "sleep_duration_seconds", params.SleepDuration, ) @@ -142,7 +142,7 @@ func NewTestTaskHandler() *TestTaskHandler { // Execute processes a test task using the unified task interface func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error { - logger.Info(ctx, "Executing test task", + logger.Info(ctx, "executing test task", "task_id", taskID, "task_params", params, "db", db, diff --git a/task/worker.go b/task/worker.go index d60c1a3..f9ceeed 100644 --- a/task/worker.go +++ b/task/worker.go @@ -178,7 +178,7 @@ func NewTaskWorker(ctx context.Context, cfg WorkerConfig, db *gorm.DB, rabbitCfg // Start begins consuming tasks from the queue func (w *TaskWorker) Start() error { - logger.Info(w.ctx, "Starting task worker", + logger.Info(w.ctx, "starting task worker", "pool_size", w.cfg.PoolSize, "queue_consumers", w.cfg.QueueConsumerCount, ) @@ -193,7 +193,7 @@ func (w *TaskWorker) Start() error { w.wg.Add(1) go w.healthCheckLoop() - logger.Info(w.ctx, "Task worker started successfully") + logger.Info(w.ctx, "task worker started successfully") return nil } @@ -201,7 +201,7 @@ func (w *TaskWorker) Start() error { func (w *TaskWorker) consumerLoop(consumerID int) { defer w.wg.Done() - logger.Info(w.ctx, "Starting consumer", "consumer_id", consumerID) + logger.Info(w.ctx, "starting consumer", "consumer_id", consumerID) // Consume messages from the queue msgs, err := w.ch.Consume( @@ -214,7 +214,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) { nil, // args ) if err != nil { - logger.Error(w.ctx, "Failed to start consumer", + logger.Error(w.ctx, "failed to start consumer", "consumer_id", consumerID, "error", err, ) @@ -224,11 +224,11 @@ func (w *TaskWorker) consumerLoop(consumerID int) { for { select { case <-w.stopChan: - logger.Info(w.ctx, "Consumer stopping", "consumer_id", consumerID) + logger.Info(w.ctx, "consumer stopping", "consumer_id", consumerID) return case msg, ok := <-msgs: if !ok { - logger.Warn(w.ctx, "Consumer channel closed", "consumer_id", consumerID) + logger.Warn(w.ctx, "consumer channel closed", "consumer_id", consumerID) return } @@ -237,7 +237,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) { w.handleMessage(msg) }) if err != nil { - logger.Error(w.ctx, "Failed to submit task to pool", + logger.Error(w.ctx, "failed to submit task to pool", "consumer_id", consumerID, "error", err, ) @@ -265,7 +265,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Parse task message var taskMsg TaskQueueMessage if err := json.Unmarshal(msg.Body, &taskMsg); err != nil { - logger.Error(ctx, "Failed to unmarshal task message", "error", err) + logger.Error(ctx, "failed to unmarshal task message", "error", err) msg.Nack(false, false) // Reject without requeue w.metrics.mu.Lock() w.metrics.TotalFailed++ @@ -275,7 +275,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Validate message if !taskMsg.Validate() { - logger.Error(ctx, "Invalid task message", + logger.Error(ctx, "invalid task message", "task_id", taskMsg.TaskID, "task_type", taskMsg.TaskType, ) @@ -299,7 +299,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { defer span.End() ctx = taskCtx - logger.Info(ctx, "Processing task", + logger.Info(ctx, "processing task", "task_id", taskMsg.TaskID, "task_type", taskMsg.TaskType, "priority", taskMsg.Priority, @@ -307,7 +307,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Update task status to RUNNING in database if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusRunning); err != nil { - logger.Error(ctx, "Failed to update task status", "error", err) + logger.Error(ctx, "failed to update task status", "error", err) msg.Nack(false, true) // Reject with requeue w.metrics.mu.Lock() w.metrics.TotalFailed++ @@ -326,7 +326,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { processingTime := time.Since(startTime) if err != nil { - logger.Error(ctx, "Task execution failed", + logger.Error(ctx, "task execution failed", "task_id", taskMsg.TaskID, "task_type", taskMsg.TaskType, "processing_time", processingTime, @@ -335,7 +335,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Update task status to FAILED if updateErr := w.updateTaskWithError(ctx, taskMsg.TaskID, err); updateErr != nil { - logger.Error(ctx, "Failed to update task with error", "error", updateErr) + logger.Error(ctx, "failed to update task with error", "error", updateErr) } if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil { @@ -353,7 +353,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Update task status to COMPLETED if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusCompleted); err != nil { - logger.Error(ctx, "Failed to update task status to completed", "error", err) + logger.Error(ctx, "failed to update task status to completed", "error", err) // Still ack the message since task was processed successfully } @@ -364,7 +364,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Acknowledge message msg.Ack(false) - logger.Info(ctx, "Task completed successfully", + logger.Info(ctx, "task completed successfully", "task_id", taskMsg.TaskID, "task_type", taskMsg.TaskType, "processing_time", processingTime, @@ -398,7 +398,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta // Update task status in database err := database.UpdateAsyncTaskStatus(ctx, w.db, taskID, ormStatus) if err != nil { - logger.Error(ctx, "Failed to update task status in database", + logger.Error(ctx, "failed to update task status in database", "task_id", taskID, "status", status, "error", err, @@ -410,7 +410,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta if status == StatusRunning { startedAt := time.Now().Unix() if err := database.UpdateTaskStarted(ctx, w.db, taskID, startedAt); err != nil { - logger.Warn(ctx, "Failed to update task start time", + logger.Warn(ctx, "failed to update task start time", "task_id", taskID, "error", err, ) @@ -423,14 +423,14 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta finishedAt := time.Now().Unix() if status == StatusCompleted { if err := database.CompleteAsyncTask(ctx, w.db, taskID, finishedAt); err != nil { - logger.Warn(ctx, "Failed to mark task as completed", + logger.Warn(ctx, "failed to mark task as completed", "task_id", taskID, "error", err, ) } } else { if err := database.FailAsyncTask(ctx, w.db, taskID, finishedAt); err != nil { - logger.Warn(ctx, "Failed to mark task as failed", + logger.Warn(ctx, "failed to mark task as failed", "task_id", taskID, "error", err, ) @@ -438,7 +438,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta } } - logger.Debug(ctx, "Task status updated", + logger.Debug(ctx, "task status updated", "task_id", taskID, "status", status, ) @@ -451,16 +451,16 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID, stackTrace := fmt.Sprintf("%+v", err) if updateErr := database.UpdateTaskErrorInfo(ctx, w.db, taskID, errorMsg, stackTrace); updateErr != nil { - logger.Error(ctx, "Failed to update task error info", "task_id", taskID, "error", updateErr) + logger.Error(ctx, "failed to update task error info", "task_id", taskID, "error", updateErr) return updateErr } if updateErr := database.UpdateAsyncTaskResultWithError(ctx, w.db, taskID, 500, errorMsg, nil); updateErr != nil { - logger.Error(ctx, "Failed to update task result with error", "task_id", taskID, "error", updateErr) + logger.Error(ctx, "failed to update task result with error", "task_id", taskID, "error", updateErr) return updateErr } - logger.Warn(ctx, "Task failed with error", "task_id", taskID, "error", errorMsg) + logger.Warn(ctx, "task failed with error", "task_id", taskID, "error", errorMsg) return nil } @@ -469,7 +469,7 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID, func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uuid.UUID, params map[string]any, msg *amqp.Delivery) error { handler, err := w.factory.GetHandler(taskType) if err != nil { - logger.Error(ctx, "No handler for task type", "task_type", taskType) + logger.Error(ctx, "no handler for task type", "task_type", taskType) msg.Nack(false, false) return err } @@ -519,7 +519,7 @@ func (w *TaskWorker) checkHealth() { w.metrics.WorkersIdle = w.pool.Free() w.metrics.LastHealthCheck = time.Now() - logger.Info(w.ctx, "Worker health check", + logger.Info(w.ctx, "worker health check", "tasks_processed", w.metrics.TotalProcessed, "tasks_failed", w.metrics.TotalFailed, "tasks_success", w.metrics.TotalSuccess, @@ -536,7 +536,7 @@ func (w *TaskWorker) checkHealth() { // Stop gracefully stops the task worker func (w *TaskWorker) Stop() error { - logger.Info(w.ctx, "Stopping task worker") + logger.Info(w.ctx, "stopping task worker") // Signal all goroutines to stop close(w.stopChan)