feat: add Loki logging, fix MQ shutdown order, improve realtime tracing

- add LokiConfig and batching lokiSyncer for dev-mode direct log push
  - refactor zap logger to support mode-aware encoding and K8s pod fields
  - fix RabbitMQ shutdown race: move CloseRabbitProxy to defer so channel
    closes before connection (prevents 504 error on Ctrl+C)
  - wrap MsgChan with EventMessage to carry per-cycle trace carrier
  - create new root OTel span per computation cycle linked to startup span,
    giving each cycle an independent traceID with startup as reference
This commit is contained in:
douxu 2026-05-11 17:34:27 +08:00
parent 1dd8491440
commit cccd4becdc
8 changed files with 266 additions and 54 deletions

View File

@ -56,15 +56,22 @@ type PostgresConfig struct {
Password string `mapstructure:"password"`
}
// LokiConfig define config struct of loki direct-push (used in development mode)
type LokiConfig struct {
Endpoint string `mapstructure:"endpoint"` // empty disables direct push
Labels map[string]string `mapstructure:"labels"`
}
// LoggerConfig define config struct of zap logger config
type LoggerConfig struct {
Mode string `mapstructure:"mode"`
Level string `mapstructure:"level"`
FilePath string `mapstructure:"filepath"`
MaxSize int `mapstructure:"maxsize"`
MaxBackups int `mapstructure:"maxbackups"`
MaxAge int `mapstructure:"maxage"`
Compress bool `mapstructure:"compress"`
Mode string `mapstructure:"mode"`
Level string `mapstructure:"level"`
FilePath string `mapstructure:"filepath"` // empty disables file rotation in container modes
MaxSize int `mapstructure:"maxsize"`
MaxBackups int `mapstructure:"maxbackups"`
MaxAge int `mapstructure:"maxage"`
Compress bool `mapstructure:"compress"`
Loki LokiConfig `mapstructure:"loki"`
}
// RedisConfig define config struct of redis config

133
logger/loki_syncer.go Normal file
View File

@ -0,0 +1,133 @@
// Package logger define log struct of modelRT project
package logger
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"sync"
"time"
"modelRT/config"
)
type lokiPushRequest struct {
Streams []lokiStream `json:"streams"`
}
type lokiStream struct {
Stream map[string]string `json:"stream"`
Values [][2]string `json:"values"`
}
// lokiSyncer implements zapcore.WriteSyncer, batching log lines and pushing them
// to Loki's push API asynchronously. Errors are silently dropped so a unreachable
// Loki instance never blocks or crashes the application.
type lokiSyncer struct {
endpoint string
labels map[string]string
client *http.Client
ch chan string
wg sync.WaitGroup
closeOnce sync.Once
}
func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer {
// always tag development logs with env=development; caller-supplied labels override if needed
labels := map[string]string{"env": "development"}
for k, v := range lCfg.Labels {
labels[k] = v
}
ls := &lokiSyncer{
endpoint: lCfg.Endpoint + "/loki/api/v1/push",
labels: labels,
client: &http.Client{Timeout: 5 * time.Second},
ch: make(chan string, 512),
}
ls.wg.Add(1)
go ls.run()
return ls
}
func (ls *lokiSyncer) Write(p []byte) (int, error) {
select {
case ls.ch <- string(p):
default:
// channel full: drop the line rather than block the caller
}
return len(p), nil
}
// Sync flushes remaining buffered lines and shuts down the background goroutine.
// Called by zap.Logger.Sync() at application shutdown.
func (ls *lokiSyncer) Sync() error {
ls.closeOnce.Do(func() { close(ls.ch) })
ls.wg.Wait()
return nil
}
func (ls *lokiSyncer) run() {
defer ls.wg.Done()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
var batch []string
flush := func() {
if len(batch) == 0 {
return
}
ls.push(batch)
batch = batch[:0]
}
for {
select {
case line, ok := <-ls.ch:
if !ok {
flush()
return
}
batch = append(batch, line)
if len(batch) >= 100 {
flush()
}
case <-ticker.C:
flush()
}
}
}
func (ls *lokiSyncer) push(lines []string) {
ts := strconv.FormatInt(time.Now().UnixNano(), 10)
values := make([][2]string, len(lines))
for i, line := range lines {
values[i] = [2]string{ts, line}
}
body, err := json.Marshal(lokiPushRequest{
Streams: []lokiStream{{Stream: ls.labels, Values: values}},
})
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ls.endpoint, bytes.NewReader(body))
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := ls.client.Do(req)
if err != nil {
fmt.Fprintf(os.Stderr, "loki syncer: push failed: %v\n", err)
return
}
defer resp.Body.Close()
}

View File

@ -21,53 +21,89 @@ var (
_globalLogger *zap.Logger
)
// getEncoder responsible for setting the log format for encoding
func getEncoder() zapcore.Encoder {
encoderConfig := zap.NewProductionEncoderConfig()
// serialization time eg:2006-01-02 15:04:05
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
encoderConfig.TimeKey = "time"
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
return zapcore.NewJSONEncoder(encoderConfig)
// getEncoder returns a console encoder for development (human-readable, colored) and a JSON encoder
// for container modes (parseable by Promtail pipeline_stages).
func getEncoder(mode string) zapcore.Encoder {
cfg := zap.NewProductionEncoderConfig()
cfg.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
cfg.TimeKey = "time"
cfg.EncodeCaller = zapcore.ShortCallerEncoder
if mode == constants.DevelopmentLogMode {
cfg.EncodeLevel = zapcore.CapitalColorLevelEncoder
return zapcore.NewConsoleEncoder(cfg)
}
cfg.EncodeLevel = zapcore.CapitalLevelEncoder
return zapcore.NewJSONEncoder(cfg)
}
// getLogWriter responsible for setting the location of log storage
func getLogWriter(mode, filename string, maxsize, maxBackup, maxAge int, compress bool) zapcore.WriteSyncer {
dateStr := time.Now().Format("2006-01-02 15:04:05")
finalFilename := fmt.Sprintf(filename, dateStr)
lumberJackLogger := &lumberjack.Logger{
Filename: finalFilename, // log file position
MaxSize: maxsize, // log file maxsize
MaxAge: maxAge, // maximum number of day files retained
MaxBackups: maxBackup, // maximum number of old files retained
Compress: compress, // whether to compress
// getWriteSyncer returns write targets based on mode:
// - development: stdout + optional Loki direct-push (when loki.endpoint is set)
// - container modes: stdout always (Promtail collects) + rotating file (when filepath is set)
func getWriteSyncer(lCfg config.LoggerConfig) zapcore.WriteSyncer {
stdout := zapcore.AddSync(os.Stdout)
if lCfg.Mode == constants.DevelopmentLogMode {
if lCfg.Loki.Endpoint == "" {
return stdout
}
return zapcore.NewMultiWriteSyncer(stdout, newLokiSyncer(lCfg.Loki))
}
syncConsole := zapcore.AddSync(os.Stderr)
if mode == constants.DevelopmentLogMode {
return syncConsole
syncers := []zapcore.WriteSyncer{stdout}
if lCfg.FilePath != "" {
dateStr := time.Now().Format("2006-01-02 15:04:05")
syncers = append(syncers, zapcore.AddSync(&lumberjack.Logger{
Filename: fmt.Sprintf(lCfg.FilePath, dateStr),
MaxSize: lCfg.MaxSize,
MaxAge: lCfg.MaxAge,
MaxBackups: lCfg.MaxBackups,
Compress: lCfg.Compress,
}))
}
return zapcore.NewMultiWriteSyncer(syncers...)
}
syncFile := zapcore.AddSync(lumberJackLogger)
return zapcore.NewMultiWriteSyncer(syncFile, syncConsole)
// containerFields reads K8s Downward API environment variables and returns them as global zap fields.
// These fields appear on every log line, allowing Loki/Grafana to filter by pod, namespace, and node.
// Inject them in the Deployment manifest:
//
// env:
// - name: K8S_NAMESPACE
// valueFrom: {fieldRef: {fieldPath: metadata.namespace}}
// - name: K8S_NODE_NAME
// valueFrom: {fieldRef: {fieldPath: spec.nodeName}}
func containerFields() []zap.Field {
var fields []zap.Field
// HOSTNAME is automatically set to the pod name by Kubernetes.
if pod := os.Getenv("HOSTNAME"); pod != "" {
fields = append(fields, zap.String("pod", pod))
}
if ns := os.Getenv("K8S_NAMESPACE"); ns != "" {
fields = append(fields, zap.String("namespace", ns))
}
if node := os.Getenv("K8S_NODE_NAME"); node != "" {
fields = append(fields, zap.String("node", node))
}
return fields
}
// initLogger return successfully initialized zap logger
func initLogger(lCfg config.LoggerConfig) *zap.Logger {
writeSyncer := getLogWriter(lCfg.Mode, lCfg.FilePath, lCfg.MaxSize, lCfg.MaxBackups, lCfg.MaxAge, lCfg.Compress)
encoder := getEncoder()
writeSyncer := getWriteSyncer(lCfg)
encoder := getEncoder(lCfg.Mode)
l := new(zapcore.Level)
err := l.UnmarshalText([]byte(lCfg.Level))
if err != nil {
if err := l.UnmarshalText([]byte(lCfg.Level)); err != nil {
panic(err)
}
core := zapcore.NewCore(encoder, writeSyncer, l)
logger := zap.New(core, zap.AddCaller())
opts := []zap.Option{zap.AddCaller()}
if lCfg.Mode != constants.DevelopmentLogMode {
opts = append(opts, zap.Fields(containerFields()...))
}
logger := zap.New(core, opts...)
// 替换全局日志实例
zap.ReplaceGlobals(logger)
return logger
}

View File

@ -174,6 +174,7 @@ func main() {
// init rabbitmq connection
mq.InitRabbitProxy(ctx, modelRTConfig.RabbitMQConfig)
defer mq.CloseRabbitProxy()
// init async task worker
taskWorker, err := task.InitTaskWorker(ctx, modelRTConfig, postgresDBClient)
@ -284,7 +285,6 @@ func main() {
if err := server.Shutdown(context.Background()); err != nil {
logger.Error(ctx, "shutdown serverError", "err", err)
}
mq.CloseRabbitProxy()
logger.Info(ctx, "resources cleaned up, exiting")
}()
@ -300,4 +300,3 @@ func main() {
}
}
}

View File

@ -41,11 +41,17 @@ func (c amqpHeaderCarrier) Keys() []string {
var _ propagation.TextMapCarrier = amqpHeaderCarrier{}
// EventMessage wraps an EventRecord with the trace context of the computation cycle that produced it.
type EventMessage struct {
Record *event.EventRecord
TraceCarrier map[string]string
}
// MsgChan define variable of channel to store messages that need to be sent to rabbitMQ
var MsgChan chan *event.EventRecord
var MsgChan chan *EventMessage
func init() {
MsgChan = make(chan *event.EventRecord, 10000)
MsgChan = make(chan *EventMessage, 10000)
}
func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
@ -107,7 +113,7 @@ func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
}
// PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ
func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.EventRecord) {
func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *EventMessage) {
channel, err := initUpDownLimitEventChannel(ctx)
if err != nil {
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
@ -138,13 +144,15 @@ func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.Eve
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel")
channel.Close()
return
case eventRecord, ok := <-msgChan:
case msg, ok := <-msgChan:
if !ok {
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop")
channel.Close()
return
}
eventRecord := msg.Record
// TODO 将消息的序列化移动到发送之前以便使用eventRecord的category来作为routing key
recordBytes, err := json.Marshal(eventRecord)
if err != nil {
@ -152,9 +160,10 @@ func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.Eve
continue
}
// inject current trace context into AMQP headers so eventRT can restore the span chain
// restore computation cycle trace context so the AMQP message carries the correct parent span
msgCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier))
headers := amqp.Table{}
otel.GetTextMapPropagator().Inject(ctx, amqpHeaderCarrier(headers))
otel.GetTextMapPropagator().Inject(msgCtx, amqpHeaderCarrier(headers))
// send event alarm message to rabbitMQ queue
routingKey := eventRecord.Category

View File

@ -11,6 +11,9 @@ import (
"modelRT/logger"
"modelRT/mq"
"modelRT/mq/event"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
@ -146,6 +149,9 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE
}
}
carrier := make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier))
for breachType, trigger := range breachTriggers {
// trigger Action
command, mainBody := genTEEventCommandAndMainBody(ctx, conf.Action)
@ -155,7 +161,7 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE
logger.Error(ctx, "trigger event action failed", "error", err)
return
}
mq.MsgChan <- eventRecord
mq.MsgChan <- &mq.EventMessage{Record: eventRecord, TraceCarrier: carrier}
}
}
@ -330,7 +336,9 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE
logger.Error(ctx, "trigger event action failed", "error", err)
return
}
mq.MsgChan <- eventRecord
carrier := make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier))
mq.MsgChan <- &mq.EventMessage{Record: eventRecord, TraceCarrier: carrier}
return
}
}

View File

@ -14,6 +14,10 @@ import (
"modelRT/network"
"modelRT/orm"
"modelRT/util"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
oteltrace "go.opentelemetry.io/otel/trace"
)
var (
@ -205,25 +209,41 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) {
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
return
case <-ticker.C:
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
// start a new root span for this computation cycle, linked to the startup span
startupSpanCtx := oteltrace.SpanFromContext(ctx).SpanContext()
cycleCtx, cycleSpan := otel.Tracer("modelRT/realtime").Start(
context.Background(),
"realtime.compute_cycle",
oteltrace.WithNewRoot(),
oteltrace.WithLinks(oteltrace.Link{SpanContext: startupSpanCtx}),
oteltrace.WithAttributes(
attribute.String("measurement_uuid", uuid),
attribute.String("query_key", conf.QueryKey),
),
)
queryCtx, cancel := context.WithTimeout(cycleCtx, 2*time.Second)
members, err := client.QueryByZRange(queryCtx, conf.QueryKey, conf.DataSize)
cancel()
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
logger.Error(cycleCtx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
cycleSpan.End()
continue
}
realTimedatas := util.ConvertZSetMembersToFloat64(members)
if len(realTimedatas) == 0 {
logger.Info(ctx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey)
logger.Info(cycleCtx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey)
cycleSpan.End()
continue
}
if conf.Analyzer != nil {
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
conf.Analyzer.AnalyzeAndTriggerEvent(cycleCtx, conf, realTimedatas)
} else {
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
logger.Error(cycleCtx, "analyzer is not initialized for this measurement", "uuid", uuid)
}
cycleSpan.End()
}
}
}

View File

@ -537,11 +537,11 @@ func (w *TaskWorker) Stop() error {
// Close channel
if w.ch != nil {
if err := w.ch.Close(); err != nil {
logger.Error(w.ctx, "Failed to close channel", "error", err)
logger.Error(w.ctx, "failed to close channel", "error", err)
}
}
logger.Info(w.ctx, "Task worker stopped")
logger.Info(w.ctx, "task worker stopped")
return nil
}