From cccd4becdceb631573b1317fcc8d1a62548ba44f Mon Sep 17 00:00:00 2001 From: douxu Date: Mon, 11 May 2026 17:34:27 +0800 Subject: [PATCH] feat: add Loki logging, fix MQ shutdown order, improve realtime tracing - add LokiConfig and batching lokiSyncer for dev-mode direct log push - refactor zap logger to support mode-aware encoding and K8s pod fields - fix RabbitMQ shutdown race: move CloseRabbitProxy to defer so channel closes before connection (prevents 504 error on Ctrl+C) - wrap MsgChan with EventMessage to carry per-cycle trace carrier - create new root OTel span per computation cycle linked to startup span, giving each cycle an independent traceID with startup as reference --- config/config.go | 21 ++- logger/loki_syncer.go | 133 ++++++++++++++++++ logger/zap.go | 96 +++++++++---- main.go | 3 +- mq/publish_up_down_limit_event.go | 21 ++- real-time-data/compute_analyzer.go | 12 +- .../real_time_data_up_down_limit_computing.go | 30 +++- task/worker.go | 4 +- 8 files changed, 266 insertions(+), 54 deletions(-) create mode 100644 logger/loki_syncer.go diff --git a/config/config.go b/config/config.go index c4a69d7..5a87f34 100644 --- a/config/config.go +++ b/config/config.go @@ -56,15 +56,22 @@ type PostgresConfig struct { Password string `mapstructure:"password"` } +// LokiConfig define config struct of loki direct-push (used in development mode) +type LokiConfig struct { + Endpoint string `mapstructure:"endpoint"` // empty disables direct push + Labels map[string]string `mapstructure:"labels"` +} + // LoggerConfig define config struct of zap logger config type LoggerConfig struct { - Mode string `mapstructure:"mode"` - Level string `mapstructure:"level"` - FilePath string `mapstructure:"filepath"` - MaxSize int `mapstructure:"maxsize"` - MaxBackups int `mapstructure:"maxbackups"` - MaxAge int `mapstructure:"maxage"` - Compress bool `mapstructure:"compress"` + Mode string `mapstructure:"mode"` + Level string `mapstructure:"level"` + FilePath string `mapstructure:"filepath"` // empty disables file rotation in container modes + MaxSize int `mapstructure:"maxsize"` + MaxBackups int `mapstructure:"maxbackups"` + MaxAge int `mapstructure:"maxage"` + Compress bool `mapstructure:"compress"` + Loki LokiConfig `mapstructure:"loki"` } // RedisConfig define config struct of redis config diff --git a/logger/loki_syncer.go b/logger/loki_syncer.go new file mode 100644 index 0000000..3e3f319 --- /dev/null +++ b/logger/loki_syncer.go @@ -0,0 +1,133 @@ +// Package logger define log struct of modelRT project +package logger + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "strconv" + "sync" + "time" + + "modelRT/config" +) + +type lokiPushRequest struct { + Streams []lokiStream `json:"streams"` +} + +type lokiStream struct { + Stream map[string]string `json:"stream"` + Values [][2]string `json:"values"` +} + +// lokiSyncer implements zapcore.WriteSyncer, batching log lines and pushing them +// to Loki's push API asynchronously. Errors are silently dropped so a unreachable +// Loki instance never blocks or crashes the application. +type lokiSyncer struct { + endpoint string + labels map[string]string + client *http.Client + ch chan string + wg sync.WaitGroup + closeOnce sync.Once +} + +func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer { + // always tag development logs with env=development; caller-supplied labels override if needed + labels := map[string]string{"env": "development"} + for k, v := range lCfg.Labels { + labels[k] = v + } + ls := &lokiSyncer{ + endpoint: lCfg.Endpoint + "/loki/api/v1/push", + labels: labels, + client: &http.Client{Timeout: 5 * time.Second}, + ch: make(chan string, 512), + } + ls.wg.Add(1) + go ls.run() + return ls +} + +func (ls *lokiSyncer) Write(p []byte) (int, error) { + select { + case ls.ch <- string(p): + default: + // channel full: drop the line rather than block the caller + } + return len(p), nil +} + +// Sync flushes remaining buffered lines and shuts down the background goroutine. +// Called by zap.Logger.Sync() at application shutdown. +func (ls *lokiSyncer) Sync() error { + ls.closeOnce.Do(func() { close(ls.ch) }) + ls.wg.Wait() + return nil +} + +func (ls *lokiSyncer) run() { + defer ls.wg.Done() + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + var batch []string + flush := func() { + if len(batch) == 0 { + return + } + ls.push(batch) + batch = batch[:0] + } + + for { + select { + case line, ok := <-ls.ch: + if !ok { + flush() + return + } + batch = append(batch, line) + if len(batch) >= 100 { + flush() + } + case <-ticker.C: + flush() + } + } +} + +func (ls *lokiSyncer) push(lines []string) { + ts := strconv.FormatInt(time.Now().UnixNano(), 10) + values := make([][2]string, len(lines)) + for i, line := range lines { + values[i] = [2]string{ts, line} + } + + body, err := json.Marshal(lokiPushRequest{ + Streams: []lokiStream{{Stream: ls.labels, Values: values}}, + }) + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, ls.endpoint, bytes.NewReader(body)) + if err != nil { + return + } + req.Header.Set("Content-Type", "application/json") + + resp, err := ls.client.Do(req) + if err != nil { + fmt.Fprintf(os.Stderr, "loki syncer: push failed: %v\n", err) + return + } + defer resp.Body.Close() +} diff --git a/logger/zap.go b/logger/zap.go index 5e65db7..52d74eb 100644 --- a/logger/zap.go +++ b/logger/zap.go @@ -21,53 +21,89 @@ var ( _globalLogger *zap.Logger ) -// getEncoder responsible for setting the log format for encoding -func getEncoder() zapcore.Encoder { - encoderConfig := zap.NewProductionEncoderConfig() - // serialization time eg:2006-01-02 15:04:05 - encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05") - encoderConfig.TimeKey = "time" - encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder - encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder - return zapcore.NewJSONEncoder(encoderConfig) +// getEncoder returns a console encoder for development (human-readable, colored) and a JSON encoder +// for container modes (parseable by Promtail pipeline_stages). +func getEncoder(mode string) zapcore.Encoder { + cfg := zap.NewProductionEncoderConfig() + cfg.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05") + cfg.TimeKey = "time" + cfg.EncodeCaller = zapcore.ShortCallerEncoder + if mode == constants.DevelopmentLogMode { + cfg.EncodeLevel = zapcore.CapitalColorLevelEncoder + return zapcore.NewConsoleEncoder(cfg) + } + cfg.EncodeLevel = zapcore.CapitalLevelEncoder + return zapcore.NewJSONEncoder(cfg) } -// getLogWriter responsible for setting the location of log storage -func getLogWriter(mode, filename string, maxsize, maxBackup, maxAge int, compress bool) zapcore.WriteSyncer { - dateStr := time.Now().Format("2006-01-02 15:04:05") - finalFilename := fmt.Sprintf(filename, dateStr) - lumberJackLogger := &lumberjack.Logger{ - Filename: finalFilename, // log file position - MaxSize: maxsize, // log file maxsize - MaxAge: maxAge, // maximum number of day files retained - MaxBackups: maxBackup, // maximum number of old files retained - Compress: compress, // whether to compress +// getWriteSyncer returns write targets based on mode: +// - development: stdout + optional Loki direct-push (when loki.endpoint is set) +// - container modes: stdout always (Promtail collects) + rotating file (when filepath is set) +func getWriteSyncer(lCfg config.LoggerConfig) zapcore.WriteSyncer { + stdout := zapcore.AddSync(os.Stdout) + + if lCfg.Mode == constants.DevelopmentLogMode { + if lCfg.Loki.Endpoint == "" { + return stdout + } + return zapcore.NewMultiWriteSyncer(stdout, newLokiSyncer(lCfg.Loki)) } - syncConsole := zapcore.AddSync(os.Stderr) - if mode == constants.DevelopmentLogMode { - return syncConsole + syncers := []zapcore.WriteSyncer{stdout} + if lCfg.FilePath != "" { + dateStr := time.Now().Format("2006-01-02 15:04:05") + syncers = append(syncers, zapcore.AddSync(&lumberjack.Logger{ + Filename: fmt.Sprintf(lCfg.FilePath, dateStr), + MaxSize: lCfg.MaxSize, + MaxAge: lCfg.MaxAge, + MaxBackups: lCfg.MaxBackups, + Compress: lCfg.Compress, + })) } + return zapcore.NewMultiWriteSyncer(syncers...) +} - syncFile := zapcore.AddSync(lumberJackLogger) - return zapcore.NewMultiWriteSyncer(syncFile, syncConsole) +// containerFields reads K8s Downward API environment variables and returns them as global zap fields. +// These fields appear on every log line, allowing Loki/Grafana to filter by pod, namespace, and node. +// Inject them in the Deployment manifest: +// +// env: +// - name: K8S_NAMESPACE +// valueFrom: {fieldRef: {fieldPath: metadata.namespace}} +// - name: K8S_NODE_NAME +// valueFrom: {fieldRef: {fieldPath: spec.nodeName}} +func containerFields() []zap.Field { + var fields []zap.Field + // HOSTNAME is automatically set to the pod name by Kubernetes. + if pod := os.Getenv("HOSTNAME"); pod != "" { + fields = append(fields, zap.String("pod", pod)) + } + if ns := os.Getenv("K8S_NAMESPACE"); ns != "" { + fields = append(fields, zap.String("namespace", ns)) + } + if node := os.Getenv("K8S_NODE_NAME"); node != "" { + fields = append(fields, zap.String("node", node)) + } + return fields } // initLogger return successfully initialized zap logger func initLogger(lCfg config.LoggerConfig) *zap.Logger { - writeSyncer := getLogWriter(lCfg.Mode, lCfg.FilePath, lCfg.MaxSize, lCfg.MaxBackups, lCfg.MaxAge, lCfg.Compress) - encoder := getEncoder() + writeSyncer := getWriteSyncer(lCfg) + encoder := getEncoder(lCfg.Mode) l := new(zapcore.Level) - err := l.UnmarshalText([]byte(lCfg.Level)) - if err != nil { + if err := l.UnmarshalText([]byte(lCfg.Level)); err != nil { panic(err) } core := zapcore.NewCore(encoder, writeSyncer, l) - logger := zap.New(core, zap.AddCaller()) + opts := []zap.Option{zap.AddCaller()} + if lCfg.Mode != constants.DevelopmentLogMode { + opts = append(opts, zap.Fields(containerFields()...)) + } + logger := zap.New(core, opts...) - // 替换全局日志实例 zap.ReplaceGlobals(logger) return logger } diff --git a/main.go b/main.go index 4121a1f..f120c6f 100644 --- a/main.go +++ b/main.go @@ -174,6 +174,7 @@ func main() { // init rabbitmq connection mq.InitRabbitProxy(ctx, modelRTConfig.RabbitMQConfig) + defer mq.CloseRabbitProxy() // init async task worker taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient) @@ -284,7 +285,6 @@ func main() { if err := server.Shutdown(context.Background()); err != nil { logger.Error(ctx, "shutdown serverError", "err", err) } - mq.CloseRabbitProxy() logger.Info(ctx, "resources cleaned up, exiting") }() @@ -300,4 +300,3 @@ func main() { } } } - diff --git a/mq/publish_up_down_limit_event.go b/mq/publish_up_down_limit_event.go index 65879d7..07a122b 100644 --- a/mq/publish_up_down_limit_event.go +++ b/mq/publish_up_down_limit_event.go @@ -41,11 +41,17 @@ func (c amqpHeaderCarrier) Keys() []string { var _ propagation.TextMapCarrier = amqpHeaderCarrier{} +// EventMessage wraps an EventRecord with the trace context of the computation cycle that produced it. +type EventMessage struct { + Record *event.EventRecord + TraceCarrier map[string]string +} + // MsgChan define variable of channel to store messages that need to be sent to rabbitMQ -var MsgChan chan *event.EventRecord +var MsgChan chan *EventMessage func init() { - MsgChan = make(chan *event.EventRecord, 10000) + MsgChan = make(chan *EventMessage, 10000) } func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) { @@ -107,7 +113,7 @@ func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) { } // PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ -func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.EventRecord) { +func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *EventMessage) { channel, err := initUpDownLimitEventChannel(ctx) if err != nil { logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err) @@ -138,13 +144,15 @@ func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.Eve logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel") channel.Close() return - case eventRecord, ok := <-msgChan: + case msg, ok := <-msgChan: if !ok { logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop") channel.Close() return } + eventRecord := msg.Record + // TODO 将消息的序列化移动到发送之前,以便使用eventRecord的category来作为routing key recordBytes, err := json.Marshal(eventRecord) if err != nil { @@ -152,9 +160,10 @@ func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.Eve continue } - // inject current trace context into AMQP headers so eventRT can restore the span chain + // restore computation cycle trace context so the AMQP message carries the correct parent span + msgCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier)) headers := amqp.Table{} - otel.GetTextMapPropagator().Inject(ctx, amqpHeaderCarrier(headers)) + otel.GetTextMapPropagator().Inject(msgCtx, amqpHeaderCarrier(headers)) // send event alarm message to rabbitMQ queue routingKey := eventRecord.Category diff --git a/real-time-data/compute_analyzer.go b/real-time-data/compute_analyzer.go index d65a343..864fd2d 100644 --- a/real-time-data/compute_analyzer.go +++ b/real-time-data/compute_analyzer.go @@ -11,6 +11,9 @@ import ( "modelRT/logger" "modelRT/mq" "modelRT/mq/event" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" ) // RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering @@ -146,6 +149,9 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE } } + carrier := make(map[string]string) + otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier)) + for breachType, trigger := range breachTriggers { // trigger Action command, mainBody := genTEEventCommandAndMainBody(ctx, conf.Action) @@ -155,7 +161,7 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE logger.Error(ctx, "trigger event action failed", "error", err) return } - mq.MsgChan <- eventRecord + mq.MsgChan <- &mq.EventMessage{Record: eventRecord, TraceCarrier: carrier} } } @@ -330,7 +336,9 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE logger.Error(ctx, "trigger event action failed", "error", err) return } - mq.MsgChan <- eventRecord + carrier := make(map[string]string) + otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier)) + mq.MsgChan <- &mq.EventMessage{Record: eventRecord, TraceCarrier: carrier} return } } diff --git a/real-time-data/real_time_data_up_down_limit_computing.go b/real-time-data/real_time_data_up_down_limit_computing.go index 53acc8d..24a93e9 100644 --- a/real-time-data/real_time_data_up_down_limit_computing.go +++ b/real-time-data/real_time_data_up_down_limit_computing.go @@ -14,6 +14,10 @@ import ( "modelRT/network" "modelRT/orm" "modelRT/util" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + oteltrace "go.opentelemetry.io/otel/trace" ) var ( @@ -205,25 +209,41 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) { logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal") return case <-ticker.C: - queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + // start a new root span for this computation cycle, linked to the startup span + startupSpanCtx := oteltrace.SpanFromContext(ctx).SpanContext() + cycleCtx, cycleSpan := otel.Tracer("modelRT/realtime").Start( + context.Background(), + "realtime.compute_cycle", + oteltrace.WithNewRoot(), + oteltrace.WithLinks(oteltrace.Link{SpanContext: startupSpanCtx}), + oteltrace.WithAttributes( + attribute.String("measurement_uuid", uuid), + attribute.String("query_key", conf.QueryKey), + ), + ) + + queryCtx, cancel := context.WithTimeout(cycleCtx, 2*time.Second) members, err := client.QueryByZRange(queryCtx, conf.QueryKey, conf.DataSize) cancel() if err != nil { - logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err) + logger.Error(cycleCtx, "query real time data from redis failed", "key", conf.QueryKey, "error", err) + cycleSpan.End() continue } realTimedatas := util.ConvertZSetMembersToFloat64(members) if len(realTimedatas) == 0 { - logger.Info(ctx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey) + logger.Info(cycleCtx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey) + cycleSpan.End() continue } if conf.Analyzer != nil { - conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas) + conf.Analyzer.AnalyzeAndTriggerEvent(cycleCtx, conf, realTimedatas) } else { - logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid) + logger.Error(cycleCtx, "analyzer is not initialized for this measurement", "uuid", uuid) } + cycleSpan.End() } } } diff --git a/task/worker.go b/task/worker.go index 20795cf..b51e00d 100644 --- a/task/worker.go +++ b/task/worker.go @@ -537,11 +537,11 @@ func (w *TaskWorker) Stop() error { // Close channel if w.ch != nil { if err := w.ch.Close(); err != nil { - logger.Error(w.ctx, "Failed to close channel", "error", err) + logger.Error(w.ctx, "failed to close channel", "error", err) } } - logger.Info(w.ctx, "Task worker stopped") + logger.Info(w.ctx, "task worker stopped") return nil }