From c6545e29ba564c2ad67c4b4371df66171a5cdda1 Mon Sep 17 00:00:00 2001 From: douxu Date: Mon, 1 Jun 2026 15:50:11 +0800 Subject: [PATCH 01/12] style: normalize log messages to lowercase across task package - lowercase first letter of all logger.Info/Warn/Error message strings in task/worker.go, task/retry_queue.go, task/handler_factory.go, task/metrics_logger.go, task/retry_manager.go, task/queue_producer.go, task/initializer.go, task/test_task.go, and main.go - fix inline comments in main.go that mixed Chinese and uppercase English - align Dockerfile comment casing with project convention --- deploy/dockerfile/modelrt.Dockerfile | 4 +-- main.go | 8 ++--- task/handler_factory.go | 10 +++--- task/initializer.go | 2 +- task/metrics_logger.go | 14 ++++---- task/queue_producer.go | 4 +-- task/retry_manager.go | 4 +-- task/retry_queue.go | 24 ++++++------- task/test_task.go | 8 ++--- task/worker.go | 52 ++++++++++++++-------------- 10 files changed, 65 insertions(+), 65 deletions(-) diff --git a/deploy/dockerfile/modelrt.Dockerfile b/deploy/dockerfile/modelrt.Dockerfile index 4f83bb4..57819b9 100644 --- a/deploy/dockerfile/modelrt.Dockerfile +++ b/deploy/dockerfile/modelrt.Dockerfile @@ -11,8 +11,8 @@ RUN CGO_ENABLED=0 GOOS=linux go build \ -mod=readonly \ -o modelrt main.go -# Prepare runtime dependencies in a pinned Alpine stage so they can be -# copied into scratch without pulling any vulnerable OS packages at run time. +# prepare runtime dependencies in a pinned alpine stage so they can be +# copied into scratch without pulling any vulnerable os packages at run time. FROM alpine:3.21 AS certs ARG USER_ID=1000 RUN apk --no-cache add ca-certificates tzdata && \ diff --git a/main.go b/main.go index 259b269..ea1a710 100644 --- a/main.go +++ b/main.go @@ -179,8 +179,8 @@ func main() { // init async task worker taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient) if err != nil { - logger.Error(ctx, "Failed to initialize task worker", "error", err) - // Continue without task worker, but log warning + logger.Error(ctx, "failed to initialize task worker", "error", err) + // continue without task worker, but log warning } else { go taskWorker.Start() defer taskWorker.Stop() @@ -258,7 +258,7 @@ func main() { gin.SetMode(gin.ReleaseMode) } engine := gin.New() - // 添加CORS中间件 + // add CORS middleware engine.Use(cors.New(cors.Config{ AllowOrigins: []string{"*"}, // 或指定具体域名 AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, @@ -278,7 +278,7 @@ func main() { Handler: engine, } - // creating a System Signal Receiver + // creating a system signal receiver done := make(chan os.Signal, 10) signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { diff --git a/task/handler_factory.go b/task/handler_factory.go index b705e27..26e295a 100644 --- a/task/handler_factory.go +++ b/task/handler_factory.go @@ -44,7 +44,7 @@ func (f *HandlerFactory) RegisterHandler(ctx context.Context, taskType TaskType, defer f.mu.Unlock() f.handlers[taskType] = handler - logger.Info(ctx, "Handler registered", + logger.Info(ctx, "handler registered", "task_type", taskType, "handler_name", handler.Name(), ) @@ -319,7 +319,7 @@ func NewEventAnalysisHandler() *EventAnalysisHandler { // Execute processes an event analysis task func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error { - logger.Info(ctx, "Starting event analysis", + logger.Info(ctx, "starting event analysis", "task_id", taskID, "task_params", params, ) @@ -332,7 +332,7 @@ func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, pa // 4. Storing results in database // Simulate work - logger.Info(ctx, "Event analysis completed", + logger.Info(ctx, "event analysis completed", "task_id", taskID, "task_params", params, "db", db, @@ -360,7 +360,7 @@ func NewBatchImportHandler() *BatchImportHandler { // Execute processes a batch import task func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error { - logger.Info(ctx, "Starting batch import", + logger.Info(ctx, "starting batch import", "task_id", taskID, "task_params", params, "db", db, @@ -374,7 +374,7 @@ func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, para // 4. Generating import report // Simulate work - logger.Info(ctx, "Batch import completed", + logger.Info(ctx, "batch import completed", "task_id", taskID, "task_params", params, "db", db, diff --git a/task/initializer.go b/task/initializer.go index 008d721..91db9b4 100644 --- a/task/initializer.go +++ b/task/initializer.go @@ -30,7 +30,7 @@ func InitTaskWorker(ctx context.Context, config config.ModelRTConfig, db *gorm.D return nil, fmt.Errorf("failed to create task worker: %w", err) } - logger.Info(ctx, "Task worker initialized", + logger.Info(ctx, "task worker initialized", "worker_pool_size", workerCfg.PoolSize, "queue_consumers", workerCfg.QueueConsumerCount, ) diff --git a/task/metrics_logger.go b/task/metrics_logger.go index d8c0675..8b8521f 100644 --- a/task/metrics_logger.go +++ b/task/metrics_logger.go @@ -21,7 +21,7 @@ func NewMetricsLogger(ctx context.Context) *MetricsLogger { // LogTaskMetrics records task processing metrics func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, processingTime time.Duration, retryCount int) { - logger.Info(m.ctx, "Task metrics", + logger.Info(m.ctx, "task metrics", "task_type", taskType, "status", status, "processing_time_ms", processingTime.Milliseconds(), @@ -33,7 +33,7 @@ func (m *MetricsLogger) LogTaskMetrics(taskType TaskType, status string, process // LogQueueMetrics records queue metrics func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Duration) { - logger.Info(m.ctx, "Queue metrics", + logger.Info(m.ctx, "queue metrics", "queue_depth", queueDepth, "queue_latency_ms", queueLatency.Milliseconds(), "metric_type", "queue", @@ -43,7 +43,7 @@ func (m *MetricsLogger) LogQueueMetrics(queueDepth int, queueLatency time.Durati // LogWorkerMetrics records worker metrics func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorkers int, memoryUsage uint64, cpuLoad float64) { - logger.Info(m.ctx, "Worker metrics", + logger.Info(m.ctx, "worker metrics", "active_workers", activeWorkers, "idle_workers", idleWorkers, "total_workers", totalWorkers, @@ -56,7 +56,7 @@ func (m *MetricsLogger) LogWorkerMetrics(activeWorkers, idleWorkers, totalWorker // LogRetryMetrics records retry metrics func (m *MetricsLogger) LogRetryMetrics(taskType TaskType, retryCount int, success bool, delay time.Duration) { - logger.Info(m.ctx, "Retry metrics", + logger.Info(m.ctx, "retry metrics", "task_type", taskType, "retry_count", retryCount, "retry_success", success, @@ -71,7 +71,7 @@ func (m *MetricsLogger) LogSystemMetrics() { var memStats runtime.MemStats runtime.ReadMemStats(&memStats) - logger.Info(m.ctx, "System metrics", + logger.Info(m.ctx, "system metrics", "metric_type", "system", "timestamp", time.Now().Unix(), "goroutines", runtime.NumGoroutine(), @@ -90,7 +90,7 @@ func (m *MetricsLogger) LogSystemMetrics() { func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string, startTime, endTime time.Time, retryCount int, errorMsg string) { duration := endTime.Sub(startTime) - logger.Info(m.ctx, "Task completion metrics", + logger.Info(m.ctx, "task completion metrics", "metric_type", "task_completion", "timestamp", time.Now().Unix(), "task_id", taskID, @@ -107,7 +107,7 @@ func (m *MetricsLogger) LogTaskCompletionMetrics(taskID, taskType, status string // LogHealthCheckMetrics records health check metrics func (m *MetricsLogger) LogHealthCheckMetrics(healthy bool, checkDuration time.Duration, components map[string]bool) { - logger.Info(m.ctx, "Health check metrics", + logger.Info(m.ctx, "health check metrics", "metric_type", "health_check", "timestamp", time.Now().Unix(), "healthy", healthy, diff --git a/task/queue_producer.go b/task/queue_producer.go index 3ae6185..cfacad6 100644 --- a/task/queue_producer.go +++ b/task/queue_producer.go @@ -156,7 +156,7 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT return fmt.Errorf("failed to publish task message: %w", err) } - logger.Info(ctx, "Task published to queue", + logger.Info(ctx, "task published to queue", "task_id", taskID.String(), "task_type", taskType, "priority", priority, @@ -180,7 +180,7 @@ func (p *QueueProducer) PublishTaskWithRetry(ctx context.Context, taskID uuid.UU backoff := time.Duration(1<= s.MaxRetries { - logger.Info(ctx, "Task reached maximum retry count", + logger.Info(ctx, "task reached maximum retry count", "task_id", taskID, "retry_count", retryCount, "max_retries", s.MaxRetries, @@ -86,7 +86,7 @@ func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string } } - logger.Info(ctx, "Task will be retried", + logger.Info(ctx, "task will be retried", "task_id", taskID, "retry_count", retryCount, "next_retry_in", delay, diff --git a/task/retry_queue.go b/task/retry_queue.go index 34499bd..887a687 100644 --- a/task/retry_queue.go +++ b/task/retry_queue.go @@ -38,7 +38,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy shouldRetry, delay := q.strategy.ShouldRetry(ctx, taskID.String(), retryCount, lastError) if !shouldRetry { // Mark task as permanently failed - logger.Info(ctx, "Task will not be retried, marking as failed", + logger.Info(ctx, "task will not be retried, marking as failed", "task_id", taskID, "retry_count", retryCount, "max_retries", q.strategy.GetMaxRetries(), @@ -63,7 +63,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy } if err := database.UpdateTaskErrorInfo(ctx, tx, taskID, errorMsg, ""); err != nil { // Log but don't fail the whole retry scheduling - logger.Warn(ctx, "Failed to update task error info", + logger.Warn(ctx, "failed to update task error info", "task_id", taskID, "error", err, ) @@ -74,7 +74,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy }) if err != nil { - logger.Error(ctx, "Failed to schedule task retry", + logger.Error(ctx, "failed to schedule task retry", "task_id", taskID, "task_type", taskType, "retry_count", retryCount, @@ -84,7 +84,7 @@ func (q *RetryQueue) ScheduleRetry(ctx context.Context, taskID uuid.UUID, taskTy return err } - logger.Info(ctx, "Task scheduled for retry", + logger.Info(ctx, "task scheduled for retry", "task_id", taskID, "task_type", taskType, "retry_count", retryCount+1, @@ -100,7 +100,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error // Get tasks due for retry tasks, err := database.GetTasksForRetry(ctx, q.db, batchSize) if err != nil { - logger.Error(ctx, "Failed to get tasks for retry", "error", err) + logger.Error(ctx, "failed to get tasks for retry", "error", err) return err } @@ -108,7 +108,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error return nil } - logger.Info(ctx, "Processing retry queue", + logger.Info(ctx, "processing retry queue", "task_count", len(tasks), "batch_size", batchSize, ) @@ -121,7 +121,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error // Publish task to queue for immediate processing taskType := TaskType(task.TaskType) if err := q.producer.PublishTask(ctx, task.TaskID, taskType, task.Priority, map[string]any(task.Params)); err != nil { - logger.Error(ctx, "Failed to publish retry task to queue", + logger.Error(ctx, "failed to publish retry task to queue", "task_id", task.TaskID, "task_type", taskType, "error", err, @@ -132,7 +132,7 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error // Update task status back to submitted if err := database.UpdateAsyncTaskStatus(ctx, q.db, task.TaskID, "SUBMITTED"); err != nil { - logger.Warn(ctx, "Failed to update retry task status", + logger.Warn(ctx, "failed to update retry task status", "task_id", task.TaskID, "error", err, ) @@ -140,13 +140,13 @@ func (q *RetryQueue) ProcessRetryQueue(ctx context.Context, batchSize int) error // Clear next retry time since task is being retried now if err := database.UpdateTaskRetryInfo(ctx, q.db, task.TaskID, task.RetryCount, 0); err != nil { - logger.Warn(ctx, "Failed to clear next retry time", + logger.Warn(ctx, "failed to clear next retry time", "task_id", task.TaskID, "error", err, ) } - logger.Info(ctx, "Retry task resubmitted", + logger.Info(ctx, "retry task resubmitted", "task_id", task.TaskID, "task_type", taskType, "retry_count", task.RetryCount, @@ -166,11 +166,11 @@ func (q *RetryQueue) StartRetryScheduler(ctx context.Context, interval time.Dura for { select { case <-ctx.Done(): - logger.Info(ctx, "Retry scheduler stopping") + logger.Info(ctx, "retry scheduler stopping") return case <-ticker.C: if err := q.ProcessRetryQueue(ctx, batchSize); err != nil { - logger.Error(ctx, "Error processing retry queue", "error", err) + logger.Error(ctx, "error processing retry queue", "error", err) } } } diff --git a/task/test_task.go b/task/test_task.go index 14edb14..562dbb8 100644 --- a/task/test_task.go +++ b/task/test_task.go @@ -91,7 +91,7 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e return fmt.Errorf("invalid parameter type for TestTask") } - logger.Info(ctx, "Starting test task executionser", + logger.Info(ctx, "starting test task executionser", "task_id", taskID, "sleep_duration_seconds", params.SleepDuration, "message", params.Message, @@ -113,14 +113,14 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e // Save result to database if err := database.UpdateAsyncTaskResultWithSuccess(ctx, db, taskID, orm.JSONMap(result)); err != nil { - logger.Error(ctx, "Failed to save test task result", + logger.Error(ctx, "failed to save test task result", "task_id", taskID, "error", err, ) return fmt.Errorf("failed to save task result: %w", err) } - logger.Info(ctx, "Test task completed successfully", + logger.Info(ctx, "test task completed successfully", "task_id", taskID, "sleep_duration_seconds", params.SleepDuration, ) @@ -142,7 +142,7 @@ func NewTestTaskHandler() *TestTaskHandler { // Execute processes a test task using the unified task interface func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, params map[string]any, db *gorm.DB) error { - logger.Info(ctx, "Executing test task", + logger.Info(ctx, "executing test task", "task_id", taskID, "task_params", params, "db", db, diff --git a/task/worker.go b/task/worker.go index d60c1a3..f9ceeed 100644 --- a/task/worker.go +++ b/task/worker.go @@ -178,7 +178,7 @@ func NewTaskWorker(ctx context.Context, cfg WorkerConfig, db *gorm.DB, rabbitCfg // Start begins consuming tasks from the queue func (w *TaskWorker) Start() error { - logger.Info(w.ctx, "Starting task worker", + logger.Info(w.ctx, "starting task worker", "pool_size", w.cfg.PoolSize, "queue_consumers", w.cfg.QueueConsumerCount, ) @@ -193,7 +193,7 @@ func (w *TaskWorker) Start() error { w.wg.Add(1) go w.healthCheckLoop() - logger.Info(w.ctx, "Task worker started successfully") + logger.Info(w.ctx, "task worker started successfully") return nil } @@ -201,7 +201,7 @@ func (w *TaskWorker) Start() error { func (w *TaskWorker) consumerLoop(consumerID int) { defer w.wg.Done() - logger.Info(w.ctx, "Starting consumer", "consumer_id", consumerID) + logger.Info(w.ctx, "starting consumer", "consumer_id", consumerID) // Consume messages from the queue msgs, err := w.ch.Consume( @@ -214,7 +214,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) { nil, // args ) if err != nil { - logger.Error(w.ctx, "Failed to start consumer", + logger.Error(w.ctx, "failed to start consumer", "consumer_id", consumerID, "error", err, ) @@ -224,11 +224,11 @@ func (w *TaskWorker) consumerLoop(consumerID int) { for { select { case <-w.stopChan: - logger.Info(w.ctx, "Consumer stopping", "consumer_id", consumerID) + logger.Info(w.ctx, "consumer stopping", "consumer_id", consumerID) return case msg, ok := <-msgs: if !ok { - logger.Warn(w.ctx, "Consumer channel closed", "consumer_id", consumerID) + logger.Warn(w.ctx, "consumer channel closed", "consumer_id", consumerID) return } @@ -237,7 +237,7 @@ func (w *TaskWorker) consumerLoop(consumerID int) { w.handleMessage(msg) }) if err != nil { - logger.Error(w.ctx, "Failed to submit task to pool", + logger.Error(w.ctx, "failed to submit task to pool", "consumer_id", consumerID, "error", err, ) @@ -265,7 +265,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Parse task message var taskMsg TaskQueueMessage if err := json.Unmarshal(msg.Body, &taskMsg); err != nil { - logger.Error(ctx, "Failed to unmarshal task message", "error", err) + logger.Error(ctx, "failed to unmarshal task message", "error", err) msg.Nack(false, false) // Reject without requeue w.metrics.mu.Lock() w.metrics.TotalFailed++ @@ -275,7 +275,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Validate message if !taskMsg.Validate() { - logger.Error(ctx, "Invalid task message", + logger.Error(ctx, "invalid task message", "task_id", taskMsg.TaskID, "task_type", taskMsg.TaskType, ) @@ -299,7 +299,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { defer span.End() ctx = taskCtx - logger.Info(ctx, "Processing task", + logger.Info(ctx, "processing task", "task_id", taskMsg.TaskID, "task_type", taskMsg.TaskType, "priority", taskMsg.Priority, @@ -307,7 +307,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Update task status to RUNNING in database if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusRunning); err != nil { - logger.Error(ctx, "Failed to update task status", "error", err) + logger.Error(ctx, "failed to update task status", "error", err) msg.Nack(false, true) // Reject with requeue w.metrics.mu.Lock() w.metrics.TotalFailed++ @@ -326,7 +326,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { processingTime := time.Since(startTime) if err != nil { - logger.Error(ctx, "Task execution failed", + logger.Error(ctx, "task execution failed", "task_id", taskMsg.TaskID, "task_type", taskMsg.TaskType, "processing_time", processingTime, @@ -335,7 +335,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Update task status to FAILED if updateErr := w.updateTaskWithError(ctx, taskMsg.TaskID, err); updateErr != nil { - logger.Error(ctx, "Failed to update task with error", "error", updateErr) + logger.Error(ctx, "failed to update task with error", "error", updateErr) } if record, recErr := event.NewTaskFailedMessage(taskMsg.TaskID.String(), string(taskMsg.TaskType), err.Error()); recErr == nil { @@ -353,7 +353,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Update task status to COMPLETED if err := w.updateTaskStatus(ctx, taskMsg.TaskID, StatusCompleted); err != nil { - logger.Error(ctx, "Failed to update task status to completed", "error", err) + logger.Error(ctx, "failed to update task status to completed", "error", err) // Still ack the message since task was processed successfully } @@ -364,7 +364,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Acknowledge message msg.Ack(false) - logger.Info(ctx, "Task completed successfully", + logger.Info(ctx, "task completed successfully", "task_id", taskMsg.TaskID, "task_type", taskMsg.TaskType, "processing_time", processingTime, @@ -398,7 +398,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta // Update task status in database err := database.UpdateAsyncTaskStatus(ctx, w.db, taskID, ormStatus) if err != nil { - logger.Error(ctx, "Failed to update task status in database", + logger.Error(ctx, "failed to update task status in database", "task_id", taskID, "status", status, "error", err, @@ -410,7 +410,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta if status == StatusRunning { startedAt := time.Now().Unix() if err := database.UpdateTaskStarted(ctx, w.db, taskID, startedAt); err != nil { - logger.Warn(ctx, "Failed to update task start time", + logger.Warn(ctx, "failed to update task start time", "task_id", taskID, "error", err, ) @@ -423,14 +423,14 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta finishedAt := time.Now().Unix() if status == StatusCompleted { if err := database.CompleteAsyncTask(ctx, w.db, taskID, finishedAt); err != nil { - logger.Warn(ctx, "Failed to mark task as completed", + logger.Warn(ctx, "failed to mark task as completed", "task_id", taskID, "error", err, ) } } else { if err := database.FailAsyncTask(ctx, w.db, taskID, finishedAt); err != nil { - logger.Warn(ctx, "Failed to mark task as failed", + logger.Warn(ctx, "failed to mark task as failed", "task_id", taskID, "error", err, ) @@ -438,7 +438,7 @@ func (w *TaskWorker) updateTaskStatus(ctx context.Context, taskID uuid.UUID, sta } } - logger.Debug(ctx, "Task status updated", + logger.Debug(ctx, "task status updated", "task_id", taskID, "status", status, ) @@ -451,16 +451,16 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID, stackTrace := fmt.Sprintf("%+v", err) if updateErr := database.UpdateTaskErrorInfo(ctx, w.db, taskID, errorMsg, stackTrace); updateErr != nil { - logger.Error(ctx, "Failed to update task error info", "task_id", taskID, "error", updateErr) + logger.Error(ctx, "failed to update task error info", "task_id", taskID, "error", updateErr) return updateErr } if updateErr := database.UpdateAsyncTaskResultWithError(ctx, w.db, taskID, 500, errorMsg, nil); updateErr != nil { - logger.Error(ctx, "Failed to update task result with error", "task_id", taskID, "error", updateErr) + logger.Error(ctx, "failed to update task result with error", "task_id", taskID, "error", updateErr) return updateErr } - logger.Warn(ctx, "Task failed with error", "task_id", taskID, "error", errorMsg) + logger.Warn(ctx, "task failed with error", "task_id", taskID, "error", errorMsg) return nil } @@ -469,7 +469,7 @@ func (w *TaskWorker) updateTaskWithError(ctx context.Context, taskID uuid.UUID, func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uuid.UUID, params map[string]any, msg *amqp.Delivery) error { handler, err := w.factory.GetHandler(taskType) if err != nil { - logger.Error(ctx, "No handler for task type", "task_type", taskType) + logger.Error(ctx, "no handler for task type", "task_type", taskType) msg.Nack(false, false) return err } @@ -519,7 +519,7 @@ func (w *TaskWorker) checkHealth() { w.metrics.WorkersIdle = w.pool.Free() w.metrics.LastHealthCheck = time.Now() - logger.Info(w.ctx, "Worker health check", + logger.Info(w.ctx, "worker health check", "tasks_processed", w.metrics.TotalProcessed, "tasks_failed", w.metrics.TotalFailed, "tasks_success", w.metrics.TotalSuccess, @@ -536,7 +536,7 @@ func (w *TaskWorker) checkHealth() { // Stop gracefully stops the task worker func (w *TaskWorker) Stop() error { - logger.Info(w.ctx, "Stopping task worker") + logger.Info(w.ctx, "stopping task worker") // Signal all goroutines to stop close(w.stopChan) From 3309e536531343293d7889cdad5b14cd7744c61b Mon Sep 17 00:00:00 2001 From: douxu Date: Tue, 2 Jun 2026 16:35:13 +0800 Subject: [PATCH 02/12] docs: document Dockerfile smoke tests and load workflow for Minikube - add 3-stage build table (builder/certs/scratch) with image size note - add build-arg USER_ID override example in section 5.1 - add section 5.1.1 with smoke-test commands (size check, inspect, dry run, full start) - add workflow for loading pre-built local images into Minikube directly - bump builder base image from golang:1.25-alpine to golang:1.26-alpine - normalize inline Dockerfile comments to lowercase - remove example config COPY from final scratch stage --- deploy/deploy.md | 57 +++++++++++++++++++++++++++- deploy/dockerfile/modelrt.Dockerfile | 7 ++-- 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/deploy/deploy.md b/deploy/deploy.md index 7b758b0..57c50f1 100644 --- a/deploy/deploy.md +++ b/deploy/deploy.md @@ -787,14 +787,67 @@ kubectl delete -f deploy/k8s/mongodb-service.yaml \ #### 5.1 构建并推送镜像 +镜像采用三阶段构建,最终基于 `scratch`: + +| 阶段 | 基础镜像 | 作用 | +| :--- | :--- | :--- | +| **builder** | `golang:1.26-alpine` | 编译 Go 二进制(`CGO_ENABLED=0`,`-trimpath -ldflags="-s -w"`) | +| **certs** | `alpine:3.21` | 提取 CA 证书、时区数据及非 root 用户定义(UID 默认 `1000`) | +| **runtime** | `scratch` | 仅含可执行文件与运行时依赖,无 shell、无包管理器 | + +**方式一:从源码构建并加载** + ```bash -# 在项目根目录执行 +# 在项目根目录执行(默认运行用户 UID=1000) docker build -f deploy/dockerfile/modelrt.Dockerfile -t coslight/modelrt:latest . -# 推送到镜像仓库(或直接加载到 Minikube) +# 自定义运行用户 UID +docker build -f deploy/dockerfile/modelrt.Dockerfile \ + --build-arg USER_ID=2000 \ + -t coslight/modelrt:latest . + +# 加载到 Minikube(无需私有仓库) minikube image load coslight/modelrt:latest ``` +**方式二:直接加载已有本地镜像** + +Ubuntu 宿主机上已存在构建好的镜像(如 `modelrt:v1`)时,无需重新构建,直接导入 Minikube: + +```bash +# 确认本地镜像存在 +docker images modelrt:v1 + +# 加载到 Minikube +minikube image load modelrt:v1 + +# 验证镜像已进入 Minikube 缓存 +minikube image ls | grep modelrt +``` + +> **注意:** `deploy/k8s/modelrt-deployment.yaml` 中的 `image` 字段需与加载的镜像名称一致,并将 `imagePullPolicy` 设为 `Never`,防止 Minikube 尝试从远端拉取。 + +#### 5.1.1 镜像冒烟测试 + +```bash +# 查看镜像大小(scratch 镜像预期 ≤ 25 MB) +docker images coslight/modelrt:latest + +# 检查镜像元信息(确认 User、Cmd、架构) +docker inspect coslight/modelrt:latest + +# 验证二进制可执行(无 config 时程序报错退出属预期行为,说明镜像构建正常) +docker run --rm coslight/modelrt:latest + +# 挂载示例配置做完整启动验证(Ctrl+C 退出) +docker run --rm \ + -v "$(pwd)/configs/config.example.yaml:/app/configs/config.yaml" \ + -p 8080:8080 \ + coslight/modelrt:latest +``` + +> **注意:** `scratch` 镜像不含 shell,无法使用 `docker exec` 进入容器调试;如需排查问题,可临时将最终阶段改为 `alpine` 进行本地调试,确认后再切回 `scratch`。 + #### 5.2 创建客户端证书 Secret 在 RabbitMQ TLS 证书生成完成后(见 4.2),进入证书文件所在目录执行: diff --git a/deploy/dockerfile/modelrt.Dockerfile b/deploy/dockerfile/modelrt.Dockerfile index 57819b9..74d9af5 100644 --- a/deploy/dockerfile/modelrt.Dockerfile +++ b/deploy/dockerfile/modelrt.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.25-alpine AS builder +FROM golang:1.26-alpine AS builder RUN apk --no-cache upgrade WORKDIR /app @@ -21,15 +21,14 @@ RUN apk --no-cache add ca-certificates tzdata && \ FROM scratch # CA certificates required for TLS connections (RabbitMQ amqps://) COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ -# Timezone data +# timezone data COPY --from=certs /usr/share/zoneinfo /usr/share/zoneinfo -# Non-root user/group definitions +# non-root user/group definitions COPY --from=certs /etc/passwd /etc/passwd COPY --from=certs /etc/group /etc/group WORKDIR /app COPY --from=builder /app/modelrt ./modelrt -COPY configs/config.example.yaml ./configs/config.example.yaml USER modelrt CMD ["/app/modelrt", "-modelRT_config_dir=/app/configs"] From 195150d9b1dc25c129096f34a3c58947e9233a5b Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 3 Jun 2026 17:11:54 +0800 Subject: [PATCH 03/12] fix: fix K8s service names, deployment command, and GORM logger - rename all K8s services to xxx-service convention and update all configmap references (postgres, mongodb, loki, jaeger) - add explicit command: ["/app/modelrt"] to deployment to prevent args from being treated as the executable (no ENTRYPOINT in Dockerfile) - set deploy_env to development to bypass Redis empty-password guard in non-production Minikube environment - fix GormLogger Info/Warn/Error to use fmt.Sprintf(msg, data...) so GORM printf-style messages are formatted correctly and avoid json: unsupported type: func() time.Time serialization panic - expand pg PVC storage from 2Gi to 6Gi - rename loop variable msg to task in PushTaskToRabbitMQ for clarity - align comment indentation in queue_producer.go --- deploy/k8s/grafana-configmap.yaml | 4 +-- deploy/k8s/grafana-service.yaml | 4 +-- deploy/k8s/jaeger-service.yaml | 10 +++--- deploy/k8s/loki-service.yaml | 4 +-- deploy/k8s/modelrt-configmap.yaml | 6 ++-- deploy/k8s/modelrt-deployment.yaml | 3 +- deploy/k8s/mongodb-service.yaml | 2 +- deploy/k8s/pg-pvc.yaml | 2 +- deploy/k8s/pg-service.yaml | 2 +- deploy/k8s/promtail-configmap.yaml | 2 +- logger/gorm_logger.go | 7 +++-- task/queue_producer.go | 50 +++++++++++++++--------------- 12 files changed, 49 insertions(+), 47 deletions(-) diff --git a/deploy/k8s/grafana-configmap.yaml b/deploy/k8s/grafana-configmap.yaml index 76b2cb0..234a881 100644 --- a/deploy/k8s/grafana-configmap.yaml +++ b/deploy/k8s/grafana-configmap.yaml @@ -10,7 +10,7 @@ data: - name: Loki type: loki access: proxy - url: http://loki:3100 + url: http://loki-service:3100 isDefault: true jsonData: # derivedFields: 从日志的 traceID 字段生成跳转链接到 Jaeger @@ -23,4 +23,4 @@ data: type: jaeger uid: jaeger access: proxy - url: http://jaeger:16686 + url: http://jaeger-service:16686 diff --git a/deploy/k8s/grafana-service.yaml b/deploy/k8s/grafana-service.yaml index 1cc3782..84351ba 100644 --- a/deploy/k8s/grafana-service.yaml +++ b/deploy/k8s/grafana-service.yaml @@ -1,14 +1,14 @@ apiVersion: v1 kind: Service metadata: - name: grafana + name: grafana-service namespace: default spec: ports: - name: http port: 3000 targetPort: 3000 - nodePort: 31000 # Grafana UI: http://:31000 + nodePort: 31000 # Grafana UI: http://:31000 selector: app: grafana type: NodePort diff --git a/deploy/k8s/jaeger-service.yaml b/deploy/k8s/jaeger-service.yaml index d1e4779..ea60d1c 100644 --- a/deploy/k8s/jaeger-service.yaml +++ b/deploy/k8s/jaeger-service.yaml @@ -1,7 +1,7 @@ apiVersion: v1 kind: Service metadata: - name: jaeger + name: jaeger-serivce labels: app: jaeger spec: @@ -9,19 +9,19 @@ spec: - name: ui port: 16686 targetPort: 16686 - nodePort: 31686 # Jaeger UI,浏览器访问 http://:31686 + nodePort: 31686 # Jaeger UI,浏览器访问 http://:31686 - name: collector-http port: 14268 targetPort: 14268 - nodePort: 31268 # Jaeger 原生 HTTP collector(非 OTel) + nodePort: 31268 # Jaeger 原生 HTTP collector(非 OTel) - name: otlp-http port: 4318 targetPort: 4318 - nodePort: 31318 # OTLP HTTP,集群外使用 :31318 + nodePort: 31318 # OTLP HTTP,集群外使用 :31318 - name: otlp-grpc port: 4317 targetPort: 4317 - nodePort: 31317 # OTLP gRPC,集群外使用 :31317 + nodePort: 31317 # OTLP gRPC,集群外使用 :31317 selector: app: jaeger type: NodePort diff --git a/deploy/k8s/loki-service.yaml b/deploy/k8s/loki-service.yaml index e0df759..cf5ac94 100644 --- a/deploy/k8s/loki-service.yaml +++ b/deploy/k8s/loki-service.yaml @@ -1,14 +1,14 @@ apiVersion: v1 kind: Service metadata: - name: loki + name: loki-service namespace: default spec: ports: - name: http port: 3100 targetPort: 3100 - nodePort: 31100 # 集群外访问: http://:31100 + nodePort: 31100 # 集群外访问: http://:31100 selector: app: loki type: NodePort diff --git a/deploy/k8s/modelrt-configmap.yaml b/deploy/k8s/modelrt-configmap.yaml index b1a6afc..39f2b59 100644 --- a/deploy/k8s/modelrt-configmap.yaml +++ b/deploy/k8s/modelrt-configmap.yaml @@ -5,7 +5,7 @@ metadata: data: config.yaml: | postgres: - host: "192.168.1.101" + host: "postgres-service" port: 5432 database: "demo" user: "postgres" @@ -35,7 +35,7 @@ data: endpoint: "" # Promtail handles log collection in K8s, direct push disabled otel: - endpoint: "jaeger:4318" + endpoint: "jaeger-service:4318" insecure: true ants: @@ -77,7 +77,7 @@ data: service_addr: ":8080" service_name: "modelRT" secret_key: "" # injected via env SERVICE_SECRET_KEY - deploy_env: "production" + deploy_env: "development" dataRT: host: "http://127.0.0.1" diff --git a/deploy/k8s/modelrt-deployment.yaml b/deploy/k8s/modelrt-deployment.yaml index 38a9a23..d6db31f 100644 --- a/deploy/k8s/modelrt-deployment.yaml +++ b/deploy/k8s/modelrt-deployment.yaml @@ -16,8 +16,9 @@ spec: spec: containers: - name: modelrt - image: coslight/modelrt:latest + image: modelrt:v1 imagePullPolicy: IfNotPresent + command: ["/app/modelrt"] args: - "-modelRT_config_dir=/app/configs" - "-modelRT_config_name=config" diff --git a/deploy/k8s/mongodb-service.yaml b/deploy/k8s/mongodb-service.yaml index daf946a..8345287 100644 --- a/deploy/k8s/mongodb-service.yaml +++ b/deploy/k8s/mongodb-service.yaml @@ -1,7 +1,7 @@ apiVersion: v1 kind: Service metadata: - name: mongodb + name: mongodb-service labels: app: mongodb spec: diff --git a/deploy/k8s/pg-pvc.yaml b/deploy/k8s/pg-pvc.yaml index f172ce7..d3bd112 100644 --- a/deploy/k8s/pg-pvc.yaml +++ b/deploy/k8s/pg-pvc.yaml @@ -7,4 +7,4 @@ spec: - ReadWriteOnce resources: requests: - storage: 2Gi + storage: 6Gi diff --git a/deploy/k8s/pg-service.yaml b/deploy/k8s/pg-service.yaml index 5e11fe0..d525be9 100644 --- a/deploy/k8s/pg-service.yaml +++ b/deploy/k8s/pg-service.yaml @@ -1,7 +1,7 @@ apiVersion: v1 kind: Service metadata: - name: postgres + name: postgres-service labels: app: postgres spec: diff --git a/deploy/k8s/promtail-configmap.yaml b/deploy/k8s/promtail-configmap.yaml index 0ccf089..888350d 100644 --- a/deploy/k8s/promtail-configmap.yaml +++ b/deploy/k8s/promtail-configmap.yaml @@ -13,7 +13,7 @@ data: filename: /tmp/positions.yaml clients: - - url: http://loki:3100/loki/api/v1/push + - url: http://loki-service:3100/loki/api/v1/push scrape_configs: - job_name: kubernetes-pods diff --git a/logger/gorm_logger.go b/logger/gorm_logger.go index fadd2e6..2387b5f 100644 --- a/logger/gorm_logger.go +++ b/logger/gorm_logger.go @@ -4,6 +4,7 @@ package logger import ( "context" "errors" + "fmt" "time" "gorm.io/gorm" @@ -29,17 +30,17 @@ func (l *GormLogger) LogMode(_ gormLogger.LogLevel) gormLogger.Interface { // Info define func for implementing gormLogger.Interface func (l *GormLogger) Info(ctx context.Context, msg string, data ...any) { - Info(ctx, msg, "data", data) + Info(ctx, fmt.Sprintf(msg, data...)) } // Warn define func for implementing gormLogger.Interface func (l *GormLogger) Warn(ctx context.Context, msg string, data ...any) { - Warn(ctx, msg, "data", data) + Warn(ctx, fmt.Sprintf(msg, data...)) } // Error define func for implementing gormLogger.Interface func (l *GormLogger) Error(ctx context.Context, msg string, data ...any) { - Error(ctx, msg, "data", data) + Error(ctx, fmt.Sprintf(msg, data...)) } // Trace define func for implementing gormLogger.Interface diff --git a/task/queue_producer.go b/task/queue_producer.go index cfacad6..5019d6b 100644 --- a/task/queue_producer.go +++ b/task/queue_producer.go @@ -67,12 +67,12 @@ func (p *QueueProducer) declareInfrastructure() error { // Declare durable direct exchange err := p.ch.ExchangeDeclare( constants.TaskExchangeName, // name - "direct", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) if err != nil { return fmt.Errorf("failed to declare exchange: %w", err) @@ -81,12 +81,12 @@ func (p *QueueProducer) declareInfrastructure() error { // Declare durable queue with priority support and message TTL _, err = p.ch.QueueDeclare( constants.TaskQueueName, // name - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait amqp.Table{ - "x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10 + "x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10 "x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), // message TTL }, ) @@ -99,8 +99,8 @@ func (p *QueueProducer) declareInfrastructure() error { constants.TaskQueueName, // queue name constants.TaskRoutingKey, // routing key constants.TaskExchangeName, // exchange name - false, // no-wait - nil, // arguments + false, // no-wait + nil, // arguments ) if err != nil { return fmt.Errorf("failed to bind queue: %w", err) @@ -148,8 +148,8 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT ctx, constants.TaskExchangeName, // exchange constants.TaskRoutingKey, // routing key - false, // mandatory - false, // immediate + false, // mandatory + false, // immediate publishing, ) if err != nil { @@ -211,10 +211,10 @@ func (p *QueueProducer) Close() error { func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) { queue, err := p.ch.QueueDeclarePassive( constants.TaskQueueName, // name - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait amqp.Table{ "x-max-priority": constants.TaskMaxPriority, "x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), @@ -246,22 +246,22 @@ func PushTaskToRabbitMQ(ctx context.Context, cfg config.RabbitMQConfig, taskChan case <-ctx.Done(): logger.Info(ctx, "push task to RabbitMQ stopped by context cancel") return - case msg, ok := <-taskChan: + case task, ok := <-taskChan: if !ok { logger.Info(ctx, "task channel closed, exiting push loop") return } // Restore trace context from the handler that enqueued this message - taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier)) + taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(task.TraceCarrier)) taskCtx, pubSpan := otel.Tracer("modelRT/task").Start(taskCtx, "task.publish", - oteltrace.WithAttributes(attribute.String("task_id", msg.TaskID.String())), + oteltrace.WithAttributes(attribute.String("task_id", task.TaskID.String())), ) - if err := producer.PublishTaskWithRetry(taskCtx, msg.TaskID, msg.TaskType, msg.Priority, msg.Params, 3); err != nil { + if err := producer.PublishTaskWithRetry(taskCtx, task.TaskID, task.TaskType, task.Priority, task.Params, 3); err != nil { pubSpan.RecordError(err) logger.Error(taskCtx, "publish task to RabbitMQ failed", - "task_id", msg.TaskID, "error", err) + "task_id", task.TaskID, "error", err) } pubSpan.End() } } -} \ No newline at end of file +} From c4e892f1c76877977e12062633dd612551cc15db Mon Sep 17 00:00:00 2001 From: douxu Date: Mon, 8 Jun 2026 17:05:21 +0800 Subject: [PATCH 04/12] fix: correct typo in Jaeger K8s service name - rename `jaeger-serivce` to `jaeger-service` in jaeger-service.yaml --- deploy/k8s/jaeger-service.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy/k8s/jaeger-service.yaml b/deploy/k8s/jaeger-service.yaml index ea60d1c..86eb357 100644 --- a/deploy/k8s/jaeger-service.yaml +++ b/deploy/k8s/jaeger-service.yaml @@ -1,7 +1,7 @@ apiVersion: v1 kind: Service metadata: - name: jaeger-serivce + name: jaeger-service labels: app: jaeger spec: From 05c64dda141606eec38c06ab09cc762e818c88c4 Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 10 Jun 2026 16:40:50 +0800 Subject: [PATCH 05/12] chore: add imagePullPolicy and migrate WaitGroup to wg.Go - add imagePullPolicy: IfNotPresent to all k8s Deployments, DaemonSet (grafana, jaeger, loki, rabbitmq, redis, promtail) - migrate wg.Add(1)/go/defer wg.Done() pattern to wg.Go() (Go 1.25+) in logger/loki_syncer.go and task/worker.go - simplify redundant map existence check before delete in diagram/graph.go - update deploy.md to reflect pg PVC size (6Gi) and resource limits --- .gitignore | 5 +++-- deploy/deploy.md | 6 +++++- deploy/k8s/grafana-deployment.yaml | 1 + deploy/k8s/jaeger-deployment.yaml | 1 + deploy/k8s/loki-deployment.yaml | 1 + deploy/k8s/mongodb-statefulset.yaml | 14 +++++++------- deploy/k8s/promtail-daemonset.yaml | 1 + deploy/k8s/rabbitmq-deployment.yaml | 1 + deploy/k8s/redis-deployment.yaml | 1 + diagram/graph.go | 4 +--- logger/loki_syncer.go | 4 +--- task/worker.go | 10 ++-------- 12 files changed, 25 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index b338b88..164a18c 100644 --- a/.gitignore +++ b/.gitignore @@ -22,7 +22,7 @@ go.work .vscode -.idea +.idea # Shield all log files in the log folder /log/ # Shield config files in the configs folder @@ -32,6 +32,7 @@ go.work # ai config .cursor/ .claude/ +.codewhale/ .cursorrules .copilot/ .chatgpt/ @@ -39,4 +40,4 @@ go.work .vector_cache/ ai-debug.log *.patch -*.diff \ No newline at end of file +*.diff diff --git a/deploy/deploy.md b/deploy/deploy.md index 57c50f1..03d4b7f 100644 --- a/deploy/deploy.md +++ b/deploy/deploy.md @@ -695,7 +695,11 @@ kubectl apply -f deploy/k8s/pg-service.yaml | **数据库** | `demo` | ConfigMap 中 `POSTGRES_DB` | | **用户名** | `postgres` | ConfigMap 中 `POSTGRES_USER` | | **密码** | `coslight` | ConfigMap `postgres-config` 中配置,生产环境迁移至 Secret | -| **存储** | `2Gi` | PVC `postgres-data` | +| **存储** | `6Gi` | PVC `postgres-data` | +| **CPU** | `100m` 请求 / `500m` 上限 | StatefulSet `resources` 字段 | +| **内存** | `256Mi` 请求 / `512Mi` 上限 | StatefulSet `resources` 字段 | + +> **注意:** 密码当前以明文形式存储在 `pg-configmap.yaml` 中,生产环境应将其迁移至 K8s Secret,并通过环境变量注入容器,避免将明文密码提交至版本库。 ##### 4.4.1 等待 Pod 就绪 diff --git a/deploy/k8s/grafana-deployment.yaml b/deploy/k8s/grafana-deployment.yaml index 9f23045..b1f7092 100644 --- a/deploy/k8s/grafana-deployment.yaml +++ b/deploy/k8s/grafana-deployment.yaml @@ -16,6 +16,7 @@ spec: containers: - name: grafana image: grafana/grafana:10.4.2 + imagePullPolicy: IfNotPresent ports: - containerPort: 3000 env: diff --git a/deploy/k8s/jaeger-deployment.yaml b/deploy/k8s/jaeger-deployment.yaml index c0444b7..dd73c20 100644 --- a/deploy/k8s/jaeger-deployment.yaml +++ b/deploy/k8s/jaeger-deployment.yaml @@ -15,6 +15,7 @@ spec: containers: - name: jaeger image: jaegertracing/all-in-one:1.56 + imagePullPolicy: IfNotPresent env: - name: COLLECTOR_OTLP_ENABLED value: "true" diff --git a/deploy/k8s/loki-deployment.yaml b/deploy/k8s/loki-deployment.yaml index 63ff925..b4b8531 100644 --- a/deploy/k8s/loki-deployment.yaml +++ b/deploy/k8s/loki-deployment.yaml @@ -20,6 +20,7 @@ spec: containers: - name: loki image: grafana/loki:2.9.4 + imagePullPolicy: IfNotPresent args: - -config.file=/etc/loki/loki.yaml ports: diff --git a/deploy/k8s/mongodb-statefulset.yaml b/deploy/k8s/mongodb-statefulset.yaml index 8d1de21..708caa6 100644 --- a/deploy/k8s/mongodb-statefulset.yaml +++ b/deploy/k8s/mongodb-statefulset.yaml @@ -34,9 +34,9 @@ spec: - mongosh - --eval - "db.adminCommand('ping')" - initialDelaySeconds: 10 - periodSeconds: 5 - timeoutSeconds: 3 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 10 failureThreshold: 12 livenessProbe: exec: @@ -44,10 +44,10 @@ spec: - mongosh - --eval - "db.adminCommand('ping')" - initialDelaySeconds: 30 - periodSeconds: 20 - timeoutSeconds: 3 - failureThreshold: 3 + initialDelaySeconds: 120 + periodSeconds: 10 + timeoutSeconds: 30 + failureThreshold: 5 resources: requests: cpu: 100m diff --git a/deploy/k8s/promtail-daemonset.yaml b/deploy/k8s/promtail-daemonset.yaml index eedf72d..3d5f90e 100644 --- a/deploy/k8s/promtail-daemonset.yaml +++ b/deploy/k8s/promtail-daemonset.yaml @@ -19,6 +19,7 @@ spec: containers: - name: promtail image: grafana/promtail:2.9.4 + imagePullPolicy: IfNotPresent args: - -config.file=/etc/promtail/promtail.yaml ports: diff --git a/deploy/k8s/rabbitmq-deployment.yaml b/deploy/k8s/rabbitmq-deployment.yaml index 758daca..f8762cf 100644 --- a/deploy/k8s/rabbitmq-deployment.yaml +++ b/deploy/k8s/rabbitmq-deployment.yaml @@ -15,6 +15,7 @@ spec: containers: - name: rabbitmq image: rabbitmq:4.1.1-management-alpine + imagePullPolicy: IfNotPresent ports: - containerPort: 4369 - containerPort: 5671 diff --git a/deploy/k8s/redis-deployment.yaml b/deploy/k8s/redis-deployment.yaml index b2f08fc..6af2261 100644 --- a/deploy/k8s/redis-deployment.yaml +++ b/deploy/k8s/redis-deployment.yaml @@ -15,6 +15,7 @@ spec: containers: - name: redis image: redis/redis-stack-server:latest + imagePullPolicy: IfNotPresent resources: limits: memory: "128Mi" diff --git a/diagram/graph.go b/diagram/graph.go index 6c4970d..40b6900 100644 --- a/diagram/graph.go +++ b/diagram/graph.go @@ -65,9 +65,7 @@ func (g *Graph) AddEdge(from, to uuid.UUID) { // 创建新的拓扑信息时,如果被链接的点已经存在于游离节点中 // 则将其移除 - if _, exist := g.FreeVertexs[toKey]; exist { - delete(g.FreeVertexs, toKey) - } + delete(g.FreeVertexs, toKey) } // DelNode delete a node to the graph diff --git a/logger/loki_syncer.go b/logger/loki_syncer.go index 332ddcf..0876ab5 100644 --- a/logger/loki_syncer.go +++ b/logger/loki_syncer.go @@ -47,8 +47,7 @@ func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer { client: &http.Client{Timeout: 5 * time.Second}, ch: make(chan string, 512), } - ls.wg.Add(1) - go ls.run() + ls.wg.Go(ls.run) return ls } @@ -70,7 +69,6 @@ func (ls *lokiSyncer) Sync() error { } func (ls *lokiSyncer) run() { - defer ls.wg.Done() ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() diff --git a/task/worker.go b/task/worker.go index f9ceeed..9632a77 100644 --- a/task/worker.go +++ b/task/worker.go @@ -185,13 +185,11 @@ func (w *TaskWorker) Start() error { // Start multiple consumers for better throughput for i := 0; i < w.cfg.QueueConsumerCount; i++ { - w.wg.Add(1) - go w.consumerLoop(i) + w.wg.Go(func() { w.consumerLoop(i) }) } // Start health check goroutine - w.wg.Add(1) - go w.healthCheckLoop() + w.wg.Go(w.healthCheckLoop) logger.Info(w.ctx, "task worker started successfully") return nil @@ -199,8 +197,6 @@ func (w *TaskWorker) Start() error { // consumerLoop runs a single RabbitMQ consumer func (w *TaskWorker) consumerLoop(consumerID int) { - defer w.wg.Done() - logger.Info(w.ctx, "starting consumer", "consumer_id", consumerID) // Consume messages from the queue @@ -478,8 +474,6 @@ func (w *TaskWorker) dispatch(ctx context.Context, taskType TaskType, taskID uui // healthCheckLoop periodically checks worker health and metrics func (w *TaskWorker) healthCheckLoop() { - defer w.wg.Done() - ticker := time.NewTicker(w.cfg.PollingInterval) defer ticker.Stop() From 64b6562784305f97cef9b60f71914d1f9b6cf4a5 Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 10 Jun 2026 16:42:29 +0800 Subject: [PATCH 06/12] =?UTF-8?q?docs:=20overhaul=20deploy.md=20cleanup=20?= =?UTF-8?q?and=20pg=20verification=20sections=20=20=20-=20add=20pg=20conne?= =?UTF-8?q?ction=20verification=20commands=20(pg=5Fisready,=20psql=20queri?= =?UTF-8?q?es)=20=20=20-=20renumber=20pg=20subsections=20(4.4.2=E2=86=924.?= =?UTF-8?q?4.5)=20to=20accommodate=20new=20section=20=20=20-=20remove=20Mo?= =?UTF-8?q?ngoDB=20deploy=20section=20(section=204.5)=20from=20modelRT=20d?= =?UTF-8?q?eploy=20guide=20=20=20-=20remove=20MongoDB=20SSH=20tunnel=20por?= =?UTF-8?q?t-forward=20entries=20(27017/30017)=20=20=20-=20rewrite=20secti?= =?UTF-8?q?on=208=20cleanup=20guide:=20split=20into=20local=20Docker,=20lo?= =?UTF-8?q?cal=20run,=20=20=20=20=20and=20K8s(Minikube)=20categories=20wit?= =?UTF-8?q?h=20scale-down=20and=20full-delete=20options=20=20=20-=20add=20?= =?UTF-8?q?one-liner=20kubectl=20delete=20-f=20deploy/k8s/=20for=20full=20?= =?UTF-8?q?teardown?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deploy/deploy.md | 180 +++++++++++++++++++++++++++++++---------------- 1 file changed, 121 insertions(+), 59 deletions(-) diff --git a/deploy/deploy.md b/deploy/deploy.md index 03d4b7f..27e27bb 100644 --- a/deploy/deploy.md +++ b/deploy/deploy.md @@ -707,7 +707,23 @@ kubectl apply -f deploy/k8s/pg-service.yaml kubectl wait --for=condition=ready pod -l app=postgres --timeout=120s ``` -##### 4.4.2 初始化异步任务表 +##### 4.4.2 连接验证 + +```bash +# 快速检查 PostgreSQL 是否接受连接 +kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \ + -- pg_isready -U postgres -d demo + +# 进入 psql 执行简单查询确认数据库可用 +kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \ + -- psql -U postgres -d demo -c "SELECT current_database(), version();" + +# 列出所有数据库(确认 demo 库已创建) +kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \ + -- psql -U postgres -c "\l" +``` + +##### 4.4.3 初始化异步任务表 PostgreSQL 就绪后执行 1.4 节的建表 SQL,可通过以下方式进入容器执行: @@ -721,14 +737,14 @@ kubectl exec -i $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metada -- psql -U postgres -d demo < /path/to/init.sql ``` -##### 4.4.3 状态检查 +##### 4.4.4 状态检查 ```bash kubectl get pods -l app=postgres kubectl logs -l app=postgres --tail=30 ``` -##### 4.4.4 清理 +##### 4.4.5 清理 ```bash kubectl delete -f deploy/k8s/pg-service.yaml \ @@ -737,54 +753,6 @@ kubectl delete -f deploy/k8s/pg-service.yaml \ -f deploy/k8s/pg-configmap.yaml ``` -#### 4.5 部署 MongoDB - -```bash -kubectl apply -f deploy/k8s/mongodb-secret.yaml -kubectl apply -f deploy/k8s/mongodb-pvc.yaml -kubectl apply -f deploy/k8s/mongodb-statefulset.yaml -kubectl apply -f deploy/k8s/mongodb-service.yaml -``` - -| 参数 | 值 | 说明 | -| :--- | :--- | :--- | -| **镜像** | `mongo:7.0` | MongoDB 7.0 | -| **NodePort** | `30017` | 集群外访问端口 | -| **用户名** | `admin` | Root 管理员 | -| **密码** | `coslight` | Secret `mongodb-secret` 中配置,生产环境请替换强密码 | -| **存储** | `2Gi` | PVC `mongodb-data` | - -> **注意:** 密码存储在 `mongodb-secret.yaml` 的 `stringData` 中,生产环境应替换为强密码,并避免将明文密码提交至版本库。 - -##### 4.5.1 等待 Pod 就绪 - -```bash -kubectl wait --for=condition=ready pod -l app=mongodb --timeout=120s -``` - -##### 4.5.2 连接验证 - -```bash -kubectl exec -it $(kubectl get pod -l app=mongodb -o jsonpath='{.items[0].metadata.name}') \ - -- mongosh -u admin -p coslight --authenticationDatabase admin -``` - -##### 4.5.3 状态检查 - -```bash -kubectl get pods -l app=mongodb -kubectl logs -l app=mongodb --tail=30 -``` - -##### 4.5.4 清理 - -```bash -kubectl delete -f deploy/k8s/mongodb-service.yaml \ - -f deploy/k8s/mongodb-statefulset.yaml \ - -f deploy/k8s/mongodb-pvc.yaml \ - -f deploy/k8s/mongodb-secret.yaml -``` - ### 5\. 部署 ModelRT(Kubernetes) 所有资源部署在 `default` 命名空间,YAML 文件位于 `deploy/k8s/`。 @@ -1012,7 +980,6 @@ Mac 本地端口 ──SSH隧道──▶ Ubuntu 宿主机 (192.168.1.101) ```bash ssh -L 5432:192.168.49.2:30432 \ - -L 27017:192.168.49.2:30017 \ -L 5671:192.168.49.2:30671 \ -L 15671:192.168.49.2:31671 \ -L 6379:192.168.49.2:30001 \ @@ -1028,7 +995,6 @@ ssh -L 5432:192.168.49.2:30432 \ ```bash ssh -fN \ -L 5432:192.168.49.2:30432 \ - -L 27017:192.168.49.2:30017 \ -L 5671:192.168.49.2:30671 \ -L 15671:192.168.49.2:31671 \ -L 6379:192.168.49.2:30001 \ @@ -1044,7 +1010,6 @@ ssh -fN \ | Mac 本地端口 | Minikube NodePort | 服务 | 说明 | | :--- | :--- | :--- | :--- | | `5432` | `30432` | PostgreSQL | 数据库连接 `localhost:5432` | -| `27017` | `30017` | MongoDB | 数据库连接 `localhost:27017` | | `5671` | `30671` | RabbitMQ AMQP | ModelRT / EventRT 消息队列连接 | | `15671` | `31671` | RabbitMQ Management | RabbitMQ 管理界面 `http://localhost:15671` | | `6379` | `30001` | Redis | 分布式锁 / 数据存储 | @@ -1068,14 +1033,111 @@ kill ### 8\. 后续操作(停止与清理) -#### 8.1 停止容器 +#### 8.1 本地 Docker 部署清理 + +适用于第 1、2 节使用 `docker run` 启动的 PostgreSQL 和 Redis 容器。 ```bash +# 停止容器 docker stop postgres redis -``` -#### 8.2 删除容器(删除后数据将丢失) - -```bash +# 删除容器(容器内数据将同步丢失) docker rm postgres redis ``` + +#### 8.2 本地运行清理 + +适用于第 3 节以 `go run` 或编译后二进制方式在本地启动的 ModelRT 服务。 + +前台运行时直接 `Ctrl+C` 终止;后台运行时查找并终止进程: + +```bash +# 终止 go run 启动的进程 +pkill -f "go run main.go" + +# 或终止编译后的二进制进程 +pkill model-rt +``` + +#### 8.3 K8s(Minikube) 部署清理 + +适用于第 4、5、6 节在 Minikube 中部署的所有资源。 + +##### 8.3.1 分服务清理 + +**仅停止(缩容至 0,PVC 数据保留)** + +将所有 Deployment 和 StatefulSet 缩容至 0 副本,Pod 停止运行但持久卷数据不删除,之后可直接缩容回 1 恢复服务。 + +```bash +# 停止所有 Deployment(Redis / RabbitMQ / ModelRT / Jaeger / Loki / Grafana) +kubectl scale deployment --all --replicas=0 + +# 停止所有 StatefulSet(PostgreSQL,PVC 数据保留) +kubectl scale statefulset --all --replicas=0 +``` + +恢复时: + +```bash +kubectl scale deployment --all --replicas=1 +kubectl scale statefulset --all --replicas=1 +``` + +> **注意:** DaemonSet(Promtail)无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/promtail-daemonset.yaml`。 + +--- + +**永久清理(删除所有资源,包含 PVC,数据不可恢复)** + +按部署顺序反向删除各服务资源: + +```bash +# 可观测性栈(Grafana / Promtail / Loki / Jaeger) +kubectl delete -f deploy/k8s/grafana-service.yaml \ + -f deploy/k8s/grafana-deployment.yaml \ + -f deploy/k8s/grafana-configmap.yaml \ + -f deploy/k8s/promtail-daemonset.yaml \ + -f deploy/k8s/promtail-configmap.yaml \ + -f deploy/k8s/promtail-rbac.yaml \ + -f deploy/k8s/loki-service.yaml \ + -f deploy/k8s/loki-deployment.yaml \ + -f deploy/k8s/loki-pvc.yaml \ + -f deploy/k8s/loki-configmap.yaml \ + -f deploy/k8s/jaeger-service.yaml \ + -f deploy/k8s/jaeger-deployment.yaml + +# ModelRT 应用 +kubectl delete -f deploy/k8s/modelrt-service.yaml \ + -f deploy/k8s/modelrt-deployment.yaml \ + -f deploy/k8s/modelrt-configmap.yaml \ + -f deploy/k8s/modelrt-secret.yaml +kubectl delete secret modelrt-certs + +# PostgreSQL +kubectl delete -f deploy/k8s/pg-service.yaml \ + -f deploy/k8s/pg-statefulset.yaml \ + -f deploy/k8s/pg-pvc.yaml \ + -f deploy/k8s/pg-configmap.yaml + +# RabbitMQ +kubectl delete -f deploy/k8s/rabbitmq-service.yaml \ + -f deploy/k8s/rabbitmq-deployment.yaml \ + -f deploy/k8s/rabbitmq-users-config.yaml \ + -f deploy/k8s/rabbitmq-config.yaml \ + -f deploy/k8s/rabbitmq-secret.yaml +kubectl delete secret rabbitmq-certs + +# Redis +kubectl delete -f deploy/k8s/redis-service.yaml \ + -f deploy/k8s/redis-deployment.yaml +``` + +##### 8.3.2 一键清理 + +> **注意:** 此操作会删除 `deploy/k8s/` 下所有 YAML 对应的 K8s 资源,包括 PVC,**持久化数据将永久丢失**,请确认后执行。 + +```bash +kubectl delete -f deploy/k8s/ +kubectl delete secret rabbitmq-certs modelrt-certs +``` From 908c71356571b3f2198be8a16416316bd95c6bdf Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 12 Jun 2026 11:20:58 +0800 Subject: [PATCH 07/12] chore: add rabbitmq cert secret script and plugins configmap - add rabbitmq-certs-secret.sh helper to create the server cert secret - add rabbitmq-plugins-config.yaml ConfigMap enabling ssl auth, management, prometheus, and web dispatch plugins - rename rabbitmq Deployment from `eventrt-rabbitmq` to `rabbitmq` - document the secret-creation script in deploy.md --- deploy/deploy.md | 6 ++++++ deploy/k8s/rabbitmq-certs-secret.sh | 14 ++++++++++++++ deploy/k8s/rabbitmq-deployment.yaml | 2 +- deploy/k8s/rabbitmq-plugins-config.yaml | 7 +++++++ 4 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 deploy/k8s/rabbitmq-certs-secret.sh create mode 100644 deploy/k8s/rabbitmq-plugins-config.yaml diff --git a/deploy/deploy.md b/deploy/deploy.md index 27e27bb..18f14b9 100644 --- a/deploy/deploy.md +++ b/deploy/deploy.md @@ -640,6 +640,12 @@ openssl x509 -in eventrt_client_cert.pem -noout -subject 将服务器端三个证书文件打包为 K8s Secret(在证书文件所在目录执行): +```bash +sh deploy/k8s/rabbitmq-certs-secret.sh +``` + +该脚本等价于: + ```bash kubectl create secret generic rabbitmq-certs \ --from-file=ca_certificate.pem=./ca_certificate.pem \ diff --git a/deploy/k8s/rabbitmq-certs-secret.sh b/deploy/k8s/rabbitmq-certs-secret.sh new file mode 100644 index 0000000..404617c --- /dev/null +++ b/deploy/k8s/rabbitmq-certs-secret.sh @@ -0,0 +1,14 @@ +#!/bin/sh +# Create the rabbitmq server certificate secret. +# Run this script from the directory that contains the three cert files, +# or adjust the paths below to point at the actual files. +# +# Expected files (generated during RabbitMQ TLS setup): +# ca_certificate.pem +# server_certificate.pem +# server_key.pem + +kubectl create secret generic rabbitmq-certs \ + --from-file=ca_certificate.pem=./ca_certificate.pem \ + --from-file=server_certificate.pem=./server_certificate.pem \ + --from-file=server_key.pem=./server_key.pem diff --git a/deploy/k8s/rabbitmq-deployment.yaml b/deploy/k8s/rabbitmq-deployment.yaml index f8762cf..3dd645f 100644 --- a/deploy/k8s/rabbitmq-deployment.yaml +++ b/deploy/k8s/rabbitmq-deployment.yaml @@ -1,7 +1,7 @@ apiVersion: apps/v1 kind: Deployment metadata: - name: eventrt-rabbitmq + name: rabbitmq spec: replicas: 1 selector: diff --git a/deploy/k8s/rabbitmq-plugins-config.yaml b/deploy/k8s/rabbitmq-plugins-config.yaml new file mode 100644 index 0000000..f6fe7d9 --- /dev/null +++ b/deploy/k8s/rabbitmq-plugins-config.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: rabbit-plugins-conf +data: + enabled_plugins: | + [rabbitmq_auth_mechanism_ssl, rabbitmq_management, rabbitmq_management_agent, rabbitmq_prometheus, rabbitmq_web_dispatch]. From 82622d0d858e8ea717d51e76a89f41e4d04131c9 Mon Sep 17 00:00:00 2001 From: douxu Date: Tue, 16 Jun 2026 16:15:28 +0800 Subject: [PATCH 08/12] refactor: add generic helpers and type-safe TypedMap wrapper - add util.TypedMap, a generic wrapper over sync.Map to drop call-site type assertions - add generic util.MapSlice and reuse it in ConvertZSetMembersToFloat64 - make GetKeysFromSet/SliceToSet/RemoveTargetsFromSliceSimple/DeduplicateAndReportDuplicates generic - migrate graphOverview to util.TypedMap[int64, *Graph] - build redis suggestions via util.MapSlice in measurement group recommend --- diagram/topologic_set.go | 18 +++----- model/measurement_group_recommend_model.go | 28 +++++------- util/convert.go | 17 +++++--- util/map.go | 16 ++++--- util/string.go | 23 +++++----- util/typed_map.go | 51 ++++++++++++++++++++++ 6 files changed, 101 insertions(+), 52 deletions(-) create mode 100644 util/typed_map.go diff --git a/diagram/topologic_set.go b/diagram/topologic_set.go index ccc17a9..4c96a4a 100644 --- a/diagram/topologic_set.go +++ b/diagram/topologic_set.go @@ -2,32 +2,28 @@ package diagram import ( - "errors" "fmt" - "sync" + + "modelRT/util" ) -// graphOverview define struct of storage all circuit diagram topologic data -var graphOverview sync.Map +// graphOverview define struct of storage all circuit diagram topologic data keyed by pageID +var graphOverview util.TypedMap[int64, *Graph] // PrintGrapMap define func of print circuit diagram topologic info data func PrintGrapMap() { - graphOverview.Range(func(key, value any) bool { - fmt.Println(key, value) + graphOverview.Range(func(pageID int64, graph *Graph) bool { + fmt.Println(pageID, graph) return true }) } // GetGraphMap define func of get circuit diagram topologic data by pageID func GetGraphMap(pageID int64) (*Graph, error) { - value, ok := graphOverview.Load(pageID) + graph, ok := graphOverview.Load(pageID) if !ok { return nil, fmt.Errorf("can not find graph by pageID:%d", pageID) } - graph, ok := value.(*Graph) - if !ok { - return nil, errors.New("convert to graph struct failed") - } return graph, nil } diff --git a/model/measurement_group_recommend_model.go b/model/measurement_group_recommend_model.go index bcb9f52..02babc8 100644 --- a/model/measurement_group_recommend_model.go +++ b/model/measurement_group_recommend_model.go @@ -10,6 +10,7 @@ import ( "modelRT/diagram" "modelRT/logger" "modelRT/orm" + "modelRT/util" "github.com/RediSearch/redisearch-go/v2/redisearch" ) @@ -63,10 +64,9 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement } safeSAdd(constants.RedisAllGridSetKey, measSet.AllGridTags) - gridSug := make([]redisearch.Suggestion, 0, len(measSet.AllGridTags)) - for _, gridTag := range measSet.AllGridTags { - gridSug = append(gridSug, redisearch.Suggestion{Term: gridTag, Score: constants.DefaultScore}) - } + gridSug := util.MapSlice(measSet.AllGridTags, func(gridTag string) redisearch.Suggestion { + return redisearch.Suggestion{Term: gridTag, Score: constants.DefaultScore} + }) ac.AddTerms(gridSug...) safeSAdd(constants.RedisAllZoneSetKey, measSet.AllZoneTags) @@ -78,19 +78,16 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement // building the grid -> zones hierarchy for gridTag, zoneTags := range measSet.GridToZoneTags { - sug := make([]redisearch.Suggestion, 0, len(zoneTags)) - for _, zoneTag := range zoneTags { - term := fmt.Sprintf("%s.%s", gridTag, zoneTag) - // add redis fuzzy search suggestion for token1-token7 type - sug = append(sug, redisearch.Suggestion{Term: term, Score: constants.DefaultScore}) - } + // add redis fuzzy search suggestion for token1-token7 type + sug := util.MapSlice(zoneTags, func(zoneTag string) redisearch.Suggestion { + return redisearch.Suggestion{Term: fmt.Sprintf("%s.%s", gridTag, zoneTag), Score: constants.DefaultScore} + }) safeSAdd(fmt.Sprintf(constants.RedisSpecGridZoneSetKey, gridTag), zoneTags) ac.AddTerms(sug...) } // building the zone -> stations hierarchy for zoneTag, stationTags := range measSet.ZoneToStationTags { - sug := make([]redisearch.Suggestion, 0, len(stationTags)) gridTag, exists := zoneToGridPath[zoneTag] if !exists { err := fmt.Errorf("zone tag to grid tag mapping not found for zoneTag: %s", zoneTag) @@ -98,11 +95,10 @@ func TraverseMeasurementGroupTables(ctx context.Context, measSet orm.Measurement return nil, nil, err } - for _, stationTag := range stationTags { - // add redis fuzzy search suggestion for token1-token7 type - term := fmt.Sprintf("%s.%s.%s", gridTag, zoneTag, stationTag) - sug = append(sug, redisearch.Suggestion{Term: term, Score: constants.DefaultScore}) - } + // add redis fuzzy search suggestion for token1-token7 type + sug := util.MapSlice(stationTags, func(stationTag string) redisearch.Suggestion { + return redisearch.Suggestion{Term: fmt.Sprintf("%s.%s.%s", gridTag, zoneTag, stationTag), Score: constants.DefaultScore} + }) safeSAdd(fmt.Sprintf(constants.RedisSpecZoneStationSetKey, zoneTag), stationTags) ac.AddTerms(sug...) diff --git a/util/convert.go b/util/convert.go index 6e4f04f..c4d378b 100644 --- a/util/convert.go +++ b/util/convert.go @@ -7,15 +7,22 @@ import ( "github.com/redis/go-redis/v9" ) +// MapSlice define func to build a new slice by applying f to every element of s. +func MapSlice[T, U any](s []T, f func(T) U) []U { + result := make([]U, 0, len(s)) + for _, item := range s { + result = append(result, f(item)) + } + return result +} + // ConvertZSetMembersToFloat64 define func to conver zset member type to float64 func ConvertZSetMembersToFloat64(members []redis.Z) []float64 { - dataFloats := make([]float64, 0, len(members)) // recovery time sorted in ascending order sortRedisZByTimeMemberAscending(members) - for _, member := range members { - dataFloats = append(dataFloats, member.Score) - } - return dataFloats + return MapSlice(members, func(member redis.Z) float64 { + return member.Score + }) } func sortRedisZByTimeMemberAscending(data []redis.Z) { diff --git a/util/map.go b/util/map.go index e6f3116..8cd7303 100644 --- a/util/map.go +++ b/util/map.go @@ -1,11 +1,13 @@ // Package util provide some utility functions package util -// GetKeysFromSet define func to get all keys from a map[string]struct{} -func GetKeysFromSet(set map[string]struct{}) []string { - keys := make([]string, 0, len(set)) - for key := range set { - keys = append(keys, key) - } - return keys +import ( + "maps" + "slices" +) + +// GetKeysFromSet define func to get all keys from a set-like map. +// It delegates to the standard library maps/slices helpers. +func GetKeysFromSet[K comparable, V any](set map[K]V) []K { + return slices.Collect(maps.Keys(set)) } diff --git a/util/string.go b/util/string.go index 47e2e43..d01c098 100644 --- a/util/string.go +++ b/util/string.go @@ -1,12 +1,9 @@ // Package util provide some utility functions package util -// RemoveTargetsFromSliceSimple define func to remove targets from a slice of strings -func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []string) []string { - targetsToRemoveSet := make(map[string]struct{}, len(targetsToRemove)) - for _, target := range targetsToRemove { - targetsToRemoveSet[target] = struct{}{} - } +// RemoveTargetsFromSliceSimple define func to remove targets from a slice +func RemoveTargetsFromSliceSimple[T comparable](targetsSlice []T, targetsToRemove []T) []T { + targetsToRemoveSet := SliceToSet(targetsToRemove) for i := len(targetsSlice) - 1; i >= 0; i-- { if _, found := targetsToRemoveSet[targetsSlice[i]]; found { @@ -17,21 +14,21 @@ func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []strin return targetsSlice } -// SliceToSet define func to convert string slice to set -func SliceToSet(targetsSlice []string) map[string]struct{} { - set := make(map[string]struct{}, len(targetsSlice)) +// SliceToSet define func to convert a slice to a set +func SliceToSet[T comparable](targetsSlice []T) map[T]struct{} { + set := make(map[T]struct{}, len(targetsSlice)) for _, target := range targetsSlice { set[target] = struct{}{} } return set } -// DeduplicateAndReportDuplicates define func to deduplicate a slice of strings and report duplicates -func DeduplicateAndReportDuplicates(targetsSlice []string, sourceSlice []string) (deduplicated []string, duplicates []string) { +// DeduplicateAndReportDuplicates define func to deduplicate a slice and report duplicates +func DeduplicateAndReportDuplicates[T comparable](targetsSlice []T, sourceSlice []T) (deduplicated []T, duplicates []T) { targetSet := SliceToSet(targetsSlice) - deduplicated = make([]string, 0, len(sourceSlice)) + deduplicated = make([]T, 0, len(sourceSlice)) // duplicate items slice - duplicates = make([]string, 0, len(sourceSlice)) + duplicates = make([]T, 0, len(sourceSlice)) for _, source := range sourceSlice { if _, found := targetSet[source]; found { diff --git a/util/typed_map.go b/util/typed_map.go new file mode 100644 index 0000000..6693668 --- /dev/null +++ b/util/typed_map.go @@ -0,0 +1,51 @@ +// Package util provide some utility functions +package util + +import "sync" + +// TypedMap define a type-safe generic wrapper around sync.Map. +// It keeps the concurrency guarantees of sync.Map while removing the +// per-call-site type assertions previously required for every Load. +type TypedMap[K comparable, V any] struct { + m sync.Map +} + +// Load define func of return the value stored for key, and whether it was found. +func (t *TypedMap[K, V]) Load(key K) (V, bool) { + value, ok := t.m.Load(key) + if !ok { + var zero V + return zero, false + } + // safe: only values of type V are ever stored through this wrapper + return value.(V), true +} + +// Store define func of set the value for key. +func (t *TypedMap[K, V]) Store(key K, value V) { + t.m.Store(key, value) +} + +// Swap define func of store value for key and return the previous value (if any). +// loaded reports whether a value was already present for key. +func (t *TypedMap[K, V]) Swap(key K, value V) (previous V, loaded bool) { + prev, loaded := t.m.Swap(key, value) + if !loaded { + var zero V + return zero, false + } + return prev.(V), true +} + +// Delete define func of remove the value for key. +func (t *TypedMap[K, V]) Delete(key K) { + t.m.Delete(key) +} + +// Range define func of iterate over all key/value pairs. +// Iteration stops early if f returns false. +func (t *TypedMap[K, V]) Range(f func(key K, value V) bool) { + t.m.Range(func(key, value any) bool { + return f(key.(K), value.(V)) + }) +} From c82ad773a35fd82960637555be9810ceb42ad974 Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 17 Jun 2026 10:47:35 +0800 Subject: [PATCH 09/12] refactor: lowercase channel name suffixes and rename PS to PF - change all ChannelSuffix values from uppercase to lowercase - rename ChannelSuffixPS ("PS") to ChannelSuffixPF ("pf") - align channel suffix naming with downstream measurement keys --- constants/measurement.go | 18 ++++----- diagram/anchor_set.go | 14 +++---- diagram/component_set.go | 17 +++------ handler/circuit_diagram_update.go | 2 +- model/measurement_protol_model.go | 61 ++++++++++++++++++------------- 5 files changed, 56 insertions(+), 56 deletions(-) diff --git a/constants/measurement.go b/constants/measurement.go index d95864b..cfaed67 100644 --- a/constants/measurement.go +++ b/constants/measurement.go @@ -19,15 +19,15 @@ const ( // channel name suffix const ( - ChannelSuffixP = "P" - ChannelSuffixQ = "Q" - ChannelSuffixS = "S" - ChannelSuffixPS = "PS" - ChannelSuffixF = "F" - ChannelSuffixDeltaF = "deltaF" - ChannelSuffixUAB = "UAB" - ChannelSuffixUBC = "UBC" - ChannelSuffixUCA = "UCA" + ChannelSuffixP = "p" + ChannelSuffixQ = "q" + ChannelSuffixS = "s" + ChannelSuffixPF = "pf" + ChannelSuffixF = "f" + ChannelSuffixDeltaF = "df" + ChannelSuffixUAB = "uab" + ChannelSuffixUBC = "ubc" + ChannelSuffixUCA = "uca" ) const ( diff --git a/diagram/anchor_set.go b/diagram/anchor_set.go index a6447ca..2923ee7 100644 --- a/diagram/anchor_set.go +++ b/diagram/anchor_set.go @@ -2,24 +2,20 @@ package diagram import ( - "errors" "fmt" - "sync" + + "modelRT/util" ) -// anchorValueOverview define struct of storage all anchor value -var anchorValueOverview sync.Map +// anchorValueOverview define struct of storage all anchor value keyed by component uuid +var anchorValueOverview util.TypedMap[string, string] // GetAnchorValue define func of get circuit diagram data by componentID func GetAnchorValue(componentUUID string) (string, error) { - value, ok := diagramsOverview.Load(componentUUID) + anchorValue, ok := anchorValueOverview.Load(componentUUID) if !ok { return "", fmt.Errorf("can not find anchor value by componentUUID:%s", componentUUID) } - anchorValue, ok := value.(string) - if !ok { - return "", errors.New("convert to string failed") - } return anchorValue, nil } diff --git a/diagram/component_set.go b/diagram/component_set.go index da9bddf..c06ad85 100644 --- a/diagram/component_set.go +++ b/diagram/component_set.go @@ -2,32 +2,27 @@ package diagram import ( - "errors" "fmt" - "sync" "modelRT/orm" + "modelRT/util" ) -// diagramsOverview define struct of storage all circuit diagram data -var diagramsOverview sync.Map +// diagramsOverview define struct of storage all circuit diagram data keyed by component uuid +var diagramsOverview util.TypedMap[string, *orm.Component] // GetComponentMap define func of get circuit diagram data by component uuid func GetComponentMap(componentUUID string) (*orm.Component, error) { - value, ok := diagramsOverview.Load(componentUUID) + componentInfo, ok := diagramsOverview.Load(componentUUID) if !ok { return nil, fmt.Errorf("can not find graph by global uuid:%s", componentUUID) } - componentInfo, ok := value.(*orm.Component) - if !ok { - return nil, errors.New("convert to component map struct failed") - } return componentInfo, nil } // UpdateComponentMap define func of update circuit diagram data by component uuid and component info -func UpdateComponentMap(componentID int64, componentInfo *orm.Component) bool { - _, result := diagramsOverview.Swap(componentID, componentInfo) +func UpdateComponentMap(componentUUID string, componentInfo *orm.Component) bool { + _, result := diagramsOverview.Swap(componentUUID, componentInfo) return result } diff --git a/handler/circuit_diagram_update.go b/handler/circuit_diagram_update.go index 33bb46b..e28a413 100644 --- a/handler/circuit_diagram_update.go +++ b/handler/circuit_diagram_update.go @@ -137,7 +137,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { c.JSON(http.StatusOK, resp) return } - diagram.UpdateComponentMap(info.ID, component) + diagram.UpdateComponentMap(info.UUID, component) } if len(request.FreeVertexs) > 0 { diff --git a/model/measurement_protol_model.go b/model/measurement_protol_model.go index 7fdce12..228f51f 100644 --- a/model/measurement_protol_model.go +++ b/model/measurement_protol_model.go @@ -59,36 +59,36 @@ func NewPower104DataSource(station string, packet, offset int) (*MeasurementData } func generateChannelName(prefix string, number int, suffix string) (string, error) { + // shortPrefix is the literal prefix written into the channel name (tm/ts/tc), + // maxNumber is the inclusive upper bound, padWidth is the zero-padded digit width. + var shortPrefix string + var maxNumber, padWidth int switch prefix { case constants.ChannelPrefixTelemetry: - if number > 10 { - return "", common.ErrExceedsLimitType - } - var builder strings.Builder - numberStr := strconv.Itoa(number) - builder.Grow(len(prefix) + len(numberStr) + len(suffix)) - builder.WriteString(prefix) - builder.WriteString(numberStr) - builder.WriteString(suffix) - channelName := builder.String() - return channelName, nil + shortPrefix, maxNumber, padWidth = "tm", 8, 1 case constants.ChannelPrefixTelesignal: - var numberStr string - if number < 10 { - numberStr = "0" + strconv.Itoa(number) - } - numberStr = strconv.Itoa(number) - - var builder strings.Builder - builder.Grow(len(prefix) + len(numberStr) + len(suffix)) - builder.WriteString(prefix) - builder.WriteString(numberStr) - builder.WriteString(suffix) - channelName := builder.String() - return channelName, nil + shortPrefix, maxNumber, padWidth = "ts", 16, 2 + case constants.ChannelPrefixTelecommand: + shortPrefix, maxNumber, padWidth = "tc", 9, 1 default: return "", common.ErrUnsupportedChannelPrefixType } + + if number < 1 || number > maxNumber { + return "", common.ErrExceedsLimitType + } + + numberStr := strconv.Itoa(number) + if len(numberStr) < padWidth { + numberStr = strings.Repeat("0", padWidth-len(numberStr)) + numberStr + } + + var builder strings.Builder + builder.Grow(len(shortPrefix) + len(numberStr) + len(suffix)) + builder.WriteString(shortPrefix) + builder.WriteString(numberStr) + builder.WriteString(suffix) + return builder.String(), nil } // NewTelemetryChannel define func of generate telemetry channel CL3611 data source @@ -109,6 +109,15 @@ func NewTelesignalChannel(station, device, channelNameSuffix string, channelNumb return NewCL3611DataSource(station, device, channelName) } +// NewTelecommandChannel define func of generate telecommand channel CL3611 data source +func NewTelecommandChannel(station, device, channelNameSuffix string, channelNumber int) (*MeasurementDataSource, error) { + channelName, err := generateChannelName(constants.ChannelPrefixTelecommand, channelNumber, channelNameSuffix) + if err != nil { + return nil, fmt.Errorf("failed to generate channel name: %w", err) + } + return NewCL3611DataSource(station, device, channelName) +} + // NewStandardChannel define func of generate standard channel CL3611 data source func NewStandardChannel(station, device, channelType string) (*MeasurementDataSource, error) { return NewCL3611DataSource(station, device, channelType) @@ -264,9 +273,9 @@ func GenerateMeasureIdentifier(source map[string]any) (string, error) { func concatP104WithPlus(station string, packet int, offset int) string { packetStr := strconv.Itoa(packet) offsetStr := strconv.Itoa(offset) - return station + ":" + packetStr + ":" + offsetStr + return strings.ToLower(station + ":104:" + packetStr + ":" + offsetStr) } func concatCL361WithPlus(station, device, channel string) string { - return station + ":" + device + ":" + "phasor" + ":" + channel + return strings.ToLower(station + ":" + device + ":" + "phasor" + ":" + channel) } From ca68cf6c18c9c15f8e74d26e5a046a6ba01de57f Mon Sep 17 00:00:00 2001 From: douxu Date: Thu, 18 Jun 2026 16:06:06 +0800 Subject: [PATCH 10/12] refactor: extend TypedMap and migrate MeasComputeState onto it - add LoadOrStore, Len, and All (range-over-func) to util.TypedMap - embed util.TypedMap in MeasComputeState, dropping its hand-written sync.Map wrappers and per-call-site type assertions - iterate graphOverview via All() instead of Range in PrintGrapMap - remove unused Set/Comparer/OrderedSet/HashSet code from redis_zset.go - update deploy.md to replace Promtail with Grafana Alloy in the observability stack --- deploy/deploy.md | 30 ++++++++------ diagram/redis_zset.go | 54 ------------------------- diagram/topologic_set.go | 5 +-- real-time-data/compute_state_manager.go | 51 +++-------------------- util/typed_map.go | 35 +++++++++++++++- 5 files changed, 59 insertions(+), 116 deletions(-) diff --git a/deploy/deploy.md b/deploy/deploy.md index 18f14b9..e8f2e15 100644 --- a/deploy/deploy.md +++ b/deploy/deploy.md @@ -895,7 +895,9 @@ kubectl delete secret modelrt-certs ### 6\. 部署可观测性栈(Kubernetes) -在 `Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Promtail + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`。 +在 `Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Alloy + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`。 + +> **日志采集器说明:** 集群内的日志采集由 `Grafana Alloy`(DaemonSet)负责,它通过 Kubernetes API 抓取带 `app` label 的 Pod 容器日志,解析 `zap` 输出的 JSON 字段后推送到 `Loki`。Alloy 已**替代**早期的 `Promtail`,两者推送目标(`loki-service:3100`)与标签解析完全一致,**不要同时部署**,否则会导致 Loki 中日志翻倍。 #### 6.1 部署 Jaeger @@ -913,14 +915,16 @@ kubectl apply -f deploy/k8s/loki-deployment.yaml kubectl apply -f deploy/k8s/loki-service.yaml ``` -#### 6.3 部署 Promtail +#### 6.3 部署 Alloy ```bash -kubectl apply -f deploy/k8s/promtail-rbac.yaml -kubectl apply -f deploy/k8s/promtail-configmap.yaml -kubectl apply -f deploy/k8s/promtail-daemonset.yaml +kubectl apply -f deploy/k8s/alloy-rbac.yaml +kubectl apply -f deploy/k8s/alloy-configmap.yaml +kubectl apply -f deploy/k8s/alloy-daemonset.yaml ``` +> Alloy 以 DaemonSet 形式在每个节点运行,需要 `ServiceAccount` + `ClusterRole`(`alloy-rbac.yaml`)授予读取 `nodes/pods/pods/log` 的权限。采集与解析规则定义在 `alloy-configmap.yaml` 的 `config.alloy` 中。 + #### 6.4 部署 Grafana ```bash @@ -938,9 +942,9 @@ kubectl apply -f deploy/k8s/jaeger-deployment.yaml \ -f deploy/k8s/loki-pvc.yaml \ -f deploy/k8s/loki-deployment.yaml \ -f deploy/k8s/loki-service.yaml \ - -f deploy/k8s/promtail-rbac.yaml \ - -f deploy/k8s/promtail-configmap.yaml \ - -f deploy/k8s/promtail-daemonset.yaml \ + -f deploy/k8s/alloy-rbac.yaml \ + -f deploy/k8s/alloy-configmap.yaml \ + -f deploy/k8s/alloy-daemonset.yaml \ -f deploy/k8s/grafana-configmap.yaml \ -f deploy/k8s/grafana-deployment.yaml \ -f deploy/k8s/grafana-service.yaml @@ -1090,7 +1094,7 @@ kubectl scale deployment --all --replicas=1 kubectl scale statefulset --all --replicas=1 ``` -> **注意:** DaemonSet(Promtail)无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/promtail-daemonset.yaml`。 +> **注意:** DaemonSet(Alloy)无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/alloy-daemonset.yaml`。 --- @@ -1099,13 +1103,13 @@ kubectl scale statefulset --all --replicas=1 按部署顺序反向删除各服务资源: ```bash -# 可观测性栈(Grafana / Promtail / Loki / Jaeger) +# 可观测性栈(Grafana / Alloy / Loki / Jaeger) kubectl delete -f deploy/k8s/grafana-service.yaml \ -f deploy/k8s/grafana-deployment.yaml \ -f deploy/k8s/grafana-configmap.yaml \ - -f deploy/k8s/promtail-daemonset.yaml \ - -f deploy/k8s/promtail-configmap.yaml \ - -f deploy/k8s/promtail-rbac.yaml \ + -f deploy/k8s/alloy-daemonset.yaml \ + -f deploy/k8s/alloy-configmap.yaml \ + -f deploy/k8s/alloy-rbac.yaml \ -f deploy/k8s/loki-service.yaml \ -f deploy/k8s/loki-deployment.yaml \ -f deploy/k8s/loki-pvc.yaml \ diff --git a/diagram/redis_zset.go b/diagram/redis_zset.go index d350b4f..6884448 100644 --- a/diagram/redis_zset.go +++ b/diagram/redis_zset.go @@ -3,8 +3,6 @@ package diagram import ( "context" - "iter" - "maps" locker "modelRT/distributedlock" "modelRT/logger" @@ -70,55 +68,3 @@ func (rs *RedisZSet) ZRANGE(setKey string, start, stop int64) ([]string, error) } return results, nil } - -type Comparer[T any] interface { - Compare(T) int -} - -type ComparableComparer[T any] interface { - Compare(T) int - comparable // 直接嵌入 comparable 约束 -} - -type methodNode[E Comparer[E]] struct { - value E - left *methodNode[E] - right *methodNode[E] -} - -type MethodTree[E Comparer[E]] struct { - root *methodNode[E] -} - -type OrderedSet[E interface { - comparable - Comparer[E] -}] struct { - tree MethodTree[E] - elements map[E]bool -} - -type ComparableOrderedSet[E ComparableComparer[E]] struct { - tree MethodTree[E] - elements map[E]bool -} - -type Set[E any] interface { - Insert(E) - Delete(E) - Has(E) bool - All() iter.Seq[E] -} - -func InsertAll[E any](set Set[E], seq iter.Seq[E]) { - for v := range seq { - set.Insert(v) - } -} - -type HashSet[E comparable] map[E]bool - -func (s HashSet[E]) Insert(v E) { s[v] = true } -func (s HashSet[E]) Delete(v E) { delete(s, v) } -func (s HashSet[E]) Has(v E) bool { return s[v] } -func (s HashSet[E]) All() iter.Seq[E] { return maps.Keys(s) } diff --git a/diagram/topologic_set.go b/diagram/topologic_set.go index 4c96a4a..6413256 100644 --- a/diagram/topologic_set.go +++ b/diagram/topologic_set.go @@ -12,10 +12,9 @@ var graphOverview util.TypedMap[int64, *Graph] // PrintGrapMap define func of print circuit diagram topologic info data func PrintGrapMap() { - graphOverview.Range(func(pageID int64, graph *Graph) bool { + for pageID, graph := range graphOverview.All() { fmt.Println(pageID, graph) - return true - }) + } } // GetGraphMap define func of get circuit diagram topologic data by pageID diff --git a/real-time-data/compute_state_manager.go b/real-time-data/compute_state_manager.go index 33754c4..d0161e7 100644 --- a/real-time-data/compute_state_manager.go +++ b/real-time-data/compute_state_manager.go @@ -2,7 +2,7 @@ package realtimedata import ( - "sync" + "modelRT/util" ) // ComputeConfig define struct of measurement computation @@ -19,54 +19,15 @@ type ComputeConfig struct { Analyzer RealTimeAnalyzer } -// MeasComputeState define struct of manages the state of measurement computations using sync.Map +// MeasComputeState define struct of manages the state of measurement +// computations. It embeds util.TypedMap to inherit the concurrency-safe, +// type-safe Store/Load/Delete/LoadOrStore/Range/All/Len operations without +// per-call-site type assertions. type MeasComputeState struct { - measMap sync.Map + util.TypedMap[string, *ComputeConfig] } // NewMeasComputeState define func to create and returns a new instance of MeasComputeState func NewMeasComputeState() *MeasComputeState { return &MeasComputeState{} } - -// Store define func to store a compute configuration for the specified key -func (m *MeasComputeState) Store(key string, config *ComputeConfig) { - m.measMap.Store(key, config) -} - -// Load define func to retrieve the compute configuration for the specified key -func (m *MeasComputeState) Load(key string) (*ComputeConfig, bool) { - value, ok := m.measMap.Load(key) - if !ok { - return nil, false - } - return value.(*ComputeConfig), true -} - -// Delete define func to remove the compute configuration for the specified key -func (m *MeasComputeState) Delete(key string) { - m.measMap.Delete(key) -} - -// LoadOrStore define func to returns the existing compute configuration for the key if present,otherwise stores and returns the given configuration -func (m *MeasComputeState) LoadOrStore(key string, config *ComputeConfig) (*ComputeConfig, bool) { - value, loaded := m.measMap.LoadOrStore(key, config) - return value.(*ComputeConfig), loaded -} - -// Range define func to iterate over all key-configuration pairs in the map -func (m *MeasComputeState) Range(f func(key string, config *ComputeConfig) bool) { - m.measMap.Range(func(key, value any) bool { - return f(key.(string), value.(*ComputeConfig)) - }) -} - -// Len define func to return the number of compute configurations in the map -func (m *MeasComputeState) Len() int { - count := 0 - m.measMap.Range(func(_, _ any) bool { - count++ - return true - }) - return count -} diff --git a/util/typed_map.go b/util/typed_map.go index 6693668..bd1b1d9 100644 --- a/util/typed_map.go +++ b/util/typed_map.go @@ -1,7 +1,10 @@ // Package util provide some utility functions package util -import "sync" +import ( + "iter" + "sync" +) // TypedMap define a type-safe generic wrapper around sync.Map. // It keeps the concurrency guarantees of sync.Map while removing the @@ -26,6 +29,14 @@ func (t *TypedMap[K, V]) Store(key K, value V) { t.m.Store(key, value) } +// LoadOrStore define func of return the existing value for key if present. +// Otherwise it stores and returns the given value. loaded reports whether the +// value was already present. +func (t *TypedMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + v, loaded := t.m.LoadOrStore(key, value) + return v.(V), loaded +} + // Swap define func of store value for key and return the previous value (if any). // loaded reports whether a value was already present for key. func (t *TypedMap[K, V]) Swap(key K, value V) (previous V, loaded bool) { @@ -49,3 +60,25 @@ func (t *TypedMap[K, V]) Range(f func(key K, value V) bool) { return f(key.(K), value.(V)) }) } + +// Len define func of return the number of key/value pairs currently stored. +// It walks the map, so the cost is O(n). +func (t *TypedMap[K, V]) Len() int { + count := 0 + t.m.Range(func(_, _ any) bool { + count++ + return true + }) + return count +} + +// All define func of return an iterator over all key/value pairs, +// usable directly with range-over-func. Iteration stops early if the +// consumer breaks out of the loop. +func (t *TypedMap[K, V]) All() iter.Seq2[K, V] { + return func(yield func(K, V) bool) { + t.m.Range(func(key, value any) bool { + return yield(key.(K), value.(V)) + }) + } +} From 98a28b62eb44bad78919d2f50c2cfe8a0476d12c Mon Sep 17 00:00:00 2001 From: douxu Date: Mon, 22 Jun 2026 16:06:09 +0800 Subject: [PATCH 11/12] feat: add Grafana Alloy log collection manifests for K8s - add alloy-daemonset to run Alloy on every node via DaemonSet - add alloy-configmap to scrape Pod logs through the K8s API and parse zap JSON fields (level, traceID, pod, namespace) into Loki labels - add alloy-rbac granting pods/log read access for log collection - forward parsed logs to loki-service for Grafana querying --- deploy/k8s/alloy-configmap.yaml | 81 +++++++++++++++++++++++++++++++++ deploy/k8s/alloy-daemonset.yaml | 48 +++++++++++++++++++ deploy/k8s/alloy-rbac.yaml | 30 ++++++++++++ 3 files changed, 159 insertions(+) create mode 100644 deploy/k8s/alloy-configmap.yaml create mode 100644 deploy/k8s/alloy-daemonset.yaml create mode 100644 deploy/k8s/alloy-rbac.yaml diff --git a/deploy/k8s/alloy-configmap.yaml b/deploy/k8s/alloy-configmap.yaml new file mode 100644 index 0000000..a11402f --- /dev/null +++ b/deploy/k8s/alloy-configmap.yaml @@ -0,0 +1,81 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: alloy-config + namespace: default +data: + config.alloy: | + // 发现集群内所有 Pod + discovery.kubernetes "pods" { + role = "pod" + } + + // 重写元数据标签,并只保留带 app label 的 Pod + discovery.relabel "pods" { + targets = discovery.kubernetes.pods.targets + + rule { + source_labels = ["__meta_kubernetes_namespace"] + target_label = "namespace" + } + rule { + source_labels = ["__meta_kubernetes_pod_name"] + target_label = "pod" + } + rule { + source_labels = ["__meta_kubernetes_pod_container_name"] + target_label = "container" + } + rule { + source_labels = ["__meta_kubernetes_pod_label_app"] + target_label = "app" + } + // 只采集有 app label 的 Pod + rule { + source_labels = ["__meta_kubernetes_pod_label_app"] + action = "keep" + regex = ".+" + } + } + + // 通过 Kubernetes API 抓取容器日志(无需挂载宿主机日志目录) + loki.source.kubernetes "pods" { + targets = discovery.relabel.pods.output + forward_to = [loki.process.parse.receiver] + } + + // 解析 zap 输出的 JSON 日志,并将关键字段提升为 Loki Label + loki.process "parse" { + forward_to = [loki.write.default.receiver] + + // 解析结构化字段 + stage.json { + expressions = { + level = "level", + traceID = "traceID", + spanID = "spanID", + caller = "caller", + pod = "pod", + namespace = "namespace", + node = "node", + } + } + + // 提升为 Label,支持在 Grafana 中按实例/Trace 过滤 + stage.labels { + values = { + level = "", + traceID = "", + pod = "", + namespace = "", + node = "", + } + } + } + + // 推送到 Loki + loki.write "default" { + endpoint { + url = "http://loki-service:3100/loki/api/v1/push" + } + } diff --git a/deploy/k8s/alloy-daemonset.yaml b/deploy/k8s/alloy-daemonset.yaml new file mode 100644 index 0000000..1423d23 --- /dev/null +++ b/deploy/k8s/alloy-daemonset.yaml @@ -0,0 +1,48 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: alloy + namespace: default +spec: + selector: + matchLabels: + app: alloy + template: + metadata: + labels: + app: alloy + spec: + serviceAccountName: alloy + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule + containers: + - name: alloy + image: grafana/alloy:v1.16.3 + imagePullPolicy: IfNotPresent + args: + - run + - /etc/alloy/config.alloy + - --storage.path=/var/lib/alloy/data + - --server.http.listen-addr=0.0.0.0:12345 + ports: + - containerPort: 12345 + name: http + volumeMounts: + - name: config + mountPath: /etc/alloy + - name: data + mountPath: /var/lib/alloy/data + resources: + limits: + cpu: 200m + memory: 128Mi + requests: + cpu: 50m + memory: 64Mi + volumes: + - name: config + configMap: + name: alloy-config + - name: data + emptyDir: {} diff --git a/deploy/k8s/alloy-rbac.yaml b/deploy/k8s/alloy-rbac.yaml new file mode 100644 index 0000000..0a48dd1 --- /dev/null +++ b/deploy/k8s/alloy-rbac.yaml @@ -0,0 +1,30 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: alloy + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: alloy +rules: + - apiGroups: [""] + resources: ["nodes", "nodes/proxy", "services", "endpoints", "pods"] + verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["pods/log"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: alloy +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: alloy +subjects: + - kind: ServiceAccount + name: alloy + namespace: default From 2650771cdb3f4a425ac52a6b86d5dba1b78d110b Mon Sep 17 00:00:00 2001 From: douxu Date: Tue, 23 Jun 2026 11:27:42 +0800 Subject: [PATCH 12/12] docs: regenerate swagger and drop stale TODO comment - regenerate AsyncTask swagger responses, keeping only the documented case - remove obsolete 200/400/404/500 response entries from docs/swagger - delete commented-out termsOfService TODO from main.go API annotations --- docs/docs.go | 72 ----------------------------------------------- docs/swagger.json | 72 ----------------------------------------------- docs/swagger.yaml | 42 --------------------------- main.go | 2 -- 4 files changed, 188 deletions(-) diff --git a/docs/docs.go b/docs/docs.go index f674f98..8aa977c 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -332,34 +332,10 @@ const docTemplate = `{ ], "responses": { "200": { - "description": "查询成功", - "schema": { - "allOf": [ - { - "$ref": "#/definitions/network.SuccessResponse" - }, - { - "type": "object", - "properties": { - "payload": { - "$ref": "#/definitions/network.AsyncTaskResultQueryResponse" - } - } - } - ] - } - }, - "400": { "description": "请求参数错误", "schema": { "$ref": "#/definitions/network.FailureResponse" } - }, - "500": { - "description": "服务器内部错误", - "schema": { - "$ref": "#/definitions/network.FailureResponse" - } } } } @@ -388,40 +364,10 @@ const docTemplate = `{ ], "responses": { "200": { - "description": "查询成功", - "schema": { - "allOf": [ - { - "$ref": "#/definitions/network.SuccessResponse" - }, - { - "type": "object", - "properties": { - "payload": { - "$ref": "#/definitions/network.AsyncTaskResult" - } - } - } - ] - } - }, - "400": { "description": "请求参数错误", "schema": { "$ref": "#/definitions/network.FailureResponse" } - }, - "404": { - "description": "任务不存在", - "schema": { - "$ref": "#/definitions/network.FailureResponse" - } - }, - "500": { - "description": "服务器内部错误", - "schema": { - "$ref": "#/definitions/network.FailureResponse" - } } } } @@ -450,28 +396,10 @@ const docTemplate = `{ ], "responses": { "200": { - "description": "任务取消成功", - "schema": { - "$ref": "#/definitions/network.SuccessResponse" - } - }, - "400": { "description": "请求参数错误或任务无法取消", "schema": { "$ref": "#/definitions/network.FailureResponse" } - }, - "404": { - "description": "任务不存在", - "schema": { - "$ref": "#/definitions/network.FailureResponse" - } - }, - "500": { - "description": "服务器内部错误", - "schema": { - "$ref": "#/definitions/network.FailureResponse" - } } } } diff --git a/docs/swagger.json b/docs/swagger.json index c6bb033..5c5dc57 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -326,34 +326,10 @@ ], "responses": { "200": { - "description": "查询成功", - "schema": { - "allOf": [ - { - "$ref": "#/definitions/network.SuccessResponse" - }, - { - "type": "object", - "properties": { - "payload": { - "$ref": "#/definitions/network.AsyncTaskResultQueryResponse" - } - } - } - ] - } - }, - "400": { "description": "请求参数错误", "schema": { "$ref": "#/definitions/network.FailureResponse" } - }, - "500": { - "description": "服务器内部错误", - "schema": { - "$ref": "#/definitions/network.FailureResponse" - } } } } @@ -382,40 +358,10 @@ ], "responses": { "200": { - "description": "查询成功", - "schema": { - "allOf": [ - { - "$ref": "#/definitions/network.SuccessResponse" - }, - { - "type": "object", - "properties": { - "payload": { - "$ref": "#/definitions/network.AsyncTaskResult" - } - } - } - ] - } - }, - "400": { "description": "请求参数错误", "schema": { "$ref": "#/definitions/network.FailureResponse" } - }, - "404": { - "description": "任务不存在", - "schema": { - "$ref": "#/definitions/network.FailureResponse" - } - }, - "500": { - "description": "服务器内部错误", - "schema": { - "$ref": "#/definitions/network.FailureResponse" - } } } } @@ -444,28 +390,10 @@ ], "responses": { "200": { - "description": "任务取消成功", - "schema": { - "$ref": "#/definitions/network.SuccessResponse" - } - }, - "400": { "description": "请求参数错误或任务无法取消", "schema": { "$ref": "#/definitions/network.FailureResponse" } - }, - "404": { - "description": "任务不存在", - "schema": { - "$ref": "#/definitions/network.FailureResponse" - } - }, - "500": { - "description": "服务器内部错误", - "schema": { - "$ref": "#/definitions/network.FailureResponse" - } } } } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 598c8d9..05519e2 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -361,26 +361,9 @@ paths: - application/json responses: "200": - description: 查询成功 - schema: - allOf: - - $ref: '#/definitions/network.SuccessResponse' - - properties: - payload: - $ref: '#/definitions/network.AsyncTaskResult' - type: object - "400": description: 请求参数错误 schema: $ref: '#/definitions/network.FailureResponse' - "404": - description: 任务不存在 - schema: - $ref: '#/definitions/network.FailureResponse' - "500": - description: 服务器内部错误 - schema: - $ref: '#/definitions/network.FailureResponse' summary: 查询异步任务详情 tags: - AsyncTask @@ -399,21 +382,9 @@ paths: - application/json responses: "200": - description: 任务取消成功 - schema: - $ref: '#/definitions/network.SuccessResponse' - "400": description: 请求参数错误或任务无法取消 schema: $ref: '#/definitions/network.FailureResponse' - "404": - description: 任务不存在 - schema: - $ref: '#/definitions/network.FailureResponse' - "500": - description: 服务器内部错误 - schema: - $ref: '#/definitions/network.FailureResponse' summary: 取消异步任务 tags: - AsyncTask @@ -432,22 +403,9 @@ paths: - application/json responses: "200": - description: 查询成功 - schema: - allOf: - - $ref: '#/definitions/network.SuccessResponse' - - properties: - payload: - $ref: '#/definitions/network.AsyncTaskResultQueryResponse' - type: object - "400": description: 请求参数错误 schema: $ref: '#/definitions/network.FailureResponse' - "500": - description: 服务器内部错误 - schema: - $ref: '#/definitions/network.FailureResponse' summary: 查询异步任务结果 tags: - AsyncTask diff --git a/main.go b/main.go index ea1a710..84fa153 100644 --- a/main.go +++ b/main.go @@ -59,8 +59,6 @@ var ( // @title ModelRT 实时模型服务 API 文档 // @version 1.0 // @description 实时数据计算和模型运行服务的 API 服务 -// TODO termsOfService服务条款待后续优化 -// // @termsOfService http://swagger.io/terms/ // // @contact.name douxu // TODO 修改支持的文档地址