refactor: sync logger improvements from modelrt to eventrt

- add LokiConfig struct and Loki field to LoggerConfig for dev direct-push
  - replace getEncoder/getLogWriter with mode-aware encoder and getWriteSyncer
    (colored console in dev, JSON in container; stdout instead of stderr)
  - add containerFields() to inject K8s pod/namespace/node as global log fields
  - add loki_syncer.go: async batched push to Loki with 512-entry channel
  - introduce makeLogFieldsSkip/getLoggerCallerInfoSkip for wrapper call-frame support
  - expose ErrorSkip/WarnSkip/InfoSkip facade functions for skip-frame logging
  - add loki.endpoint placeholder to eventrt-configmap.yaml for self-documentation
This commit is contained in:
douxu 2026-06-08 13:58:48 +08:00
parent 04d81cedce
commit 4c57c37c26
6 changed files with 246 additions and 41 deletions

View File

@ -8,15 +8,22 @@ import (
"github.com/spf13/viper"
)
// LokiConfig define config struct of loki direct-push (used in development mode)
type LokiConfig struct {
Endpoint string `mapstructure:"endpoint"` // empty disables direct push
Labels map[string]string `mapstructure:"labels"`
}
// LoggerConfig define config struct of zap logger config
type LoggerConfig struct {
Mode string `mapstructure:"mode"`
Level string `mapstructure:"level"`
FilePath string `mapstructure:"filepath"`
MaxSize int `mapstructure:"maxsize"`
MaxBackups int `mapstructure:"maxbackups"`
MaxAge int `mapstructure:"maxage"`
Compress bool `mapstructure:"compress"`
Mode string `mapstructure:"mode"`
Level string `mapstructure:"level"`
FilePath string `mapstructure:"filepath"` // empty disables file rotation in container modes
MaxSize int `mapstructure:"maxsize"`
MaxBackups int `mapstructure:"maxbackups"`
MaxAge int `mapstructure:"maxage"`
Compress bool `mapstructure:"compress"`
Loki LokiConfig `mapstructure:"loki"`
}
// MongoDBConfig define config struct of mongoDB config

View File

@ -33,6 +33,8 @@ data:
maxbackups: 5
maxage: 30
compress: false
loki:
endpoint: "" # Promtail handles log collection in K8s, direct push disabled
service:
service_addr: ":8081"

View File

@ -39,11 +39,30 @@ func Error(ctx context.Context, msg string, kv ...any) {
}
func (f *facade) log(ctx context.Context, lvl zapcore.Level, msg string, kv ...any) {
fields := makeLogFields(ctx, kv...)
f.logSkip(ctx, lvl, 0, msg, kv...)
}
func (f *facade) logSkip(ctx context.Context, lvl zapcore.Level, extraSkip int, msg string, kv ...any) {
fields := makeLogFieldsSkip(ctx, extraSkip, kv...)
ce := f._logger.Check(lvl, msg)
ce.Write(fields...)
}
// ErrorSkip logs at error level with extra caller skip frames for wrapper functions.
func ErrorSkip(ctx context.Context, extraSkip int, msg string, kv ...any) {
logFacade().logSkip(ctx, zapcore.ErrorLevel, extraSkip, msg, kv...)
}
// WarnSkip logs at warn level with extra caller skip frames for wrapper functions.
func WarnSkip(ctx context.Context, extraSkip int, msg string, kv ...any) {
logFacade().logSkip(ctx, zapcore.WarnLevel, extraSkip, msg, kv...)
}
// InfoSkip logs at info level with extra caller skip frames for wrapper functions.
func InfoSkip(ctx context.Context, extraSkip int, msg string, kv ...any) {
logFacade().logSkip(ctx, zapcore.InfoLevel, extraSkip, msg, kv...)
}
func logFacade() *facade {
fOnce.Do(func() {
f = &facade{

View File

@ -47,6 +47,10 @@ func (l *logger) log(lvl zapcore.Level, msg string, kv ...any) {
}
func makeLogFields(ctx context.Context, kv ...any) []zap.Field {
return makeLogFieldsSkip(ctx, 0, kv...)
}
func makeLogFieldsSkip(ctx context.Context, extraSkip int, kv ...any) []zap.Field {
if len(kv)%2 != 0 {
kv = append(kv, "unknown")
}
@ -56,7 +60,7 @@ func makeLogFields(ctx context.Context, kv ...any) []zap.Field {
spanID := spanCtx.SpanID().String()
kv = append(kv, "traceID", traceID, "spanID", spanID)
funcName, file, line := getLoggerCallerInfo()
funcName, file, line := getLoggerCallerInfoSkip(extraSkip)
kv = append(kv, "func", funcName, "file", file, "line", line)
fields := make([]zap.Field, 0, len(kv)/2)
for i := 0; i < len(kv); i += 2 {
@ -84,9 +88,14 @@ func makeLogFields(ctx context.Context, kv ...any) []zap.Field {
return fields
}
// getLoggerCallerInfo define func of return log caller information、method name、file name、line number
// getLoggerCallerInfo returns caller info at a fixed depth for the standard facade call chain.
func getLoggerCallerInfo() (funcName, file string, line int) {
pc, file, line, ok := runtime.Caller(4)
return getLoggerCallerInfoSkip(0)
}
// getLoggerCallerInfoSkip returns caller info with additional skip frames beyond the standard depth.
func getLoggerCallerInfoSkip(extraSkip int) (funcName, file string, line int) {
pc, file, line, ok := runtime.Caller(4 + extraSkip)
if !ok {
return
}

132
logger/loki_syncer.go Normal file
View File

@ -0,0 +1,132 @@
// Package logger define log struct of eventRT project
package logger
import (
"bytes"
"context"
"encoding/json"
"fmt"
"maps"
"net/http"
"os"
"strconv"
"sync"
"time"
"eventRT/config"
)
type lokiPushRequest struct {
Streams []lokiStream `json:"streams"`
}
type lokiStream struct {
Stream map[string]string `json:"stream"`
Values [][2]string `json:"values"`
}
// lokiSyncer implements zapcore.WriteSyncer, batching log lines and pushing them
// to Loki's push API asynchronously. Errors are silently dropped so a unreachable
// Loki instance never blocks or crashes the application.
type lokiSyncer struct {
endpoint string
labels map[string]string
client *http.Client
ch chan string
wg sync.WaitGroup
closeOnce sync.Once
}
func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer {
// always tag development logs with env=development; caller-supplied labels override if needed
labels := map[string]string{"env": "development"}
maps.Copy(labels, lCfg.Labels)
ls := &lokiSyncer{
endpoint: lCfg.Endpoint + "/loki/api/v1/push",
labels: labels,
client: &http.Client{Timeout: 5 * time.Second},
ch: make(chan string, 512),
}
ls.wg.Add(1)
go ls.run()
return ls
}
func (ls *lokiSyncer) Write(p []byte) (int, error) {
select {
case ls.ch <- string(p):
default:
// channel full: drop the line rather than block the caller
}
return len(p), nil
}
// Sync flushes remaining buffered lines and shuts down the background goroutine.
// Called by zap.Logger.Sync() at application shutdown.
func (ls *lokiSyncer) Sync() error {
ls.closeOnce.Do(func() { close(ls.ch) })
ls.wg.Wait()
return nil
}
func (ls *lokiSyncer) run() {
defer ls.wg.Done()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
var batch []string
flush := func() {
if len(batch) == 0 {
return
}
ls.push(batch)
batch = batch[:0]
}
for {
select {
case line, ok := <-ls.ch:
if !ok {
flush()
return
}
batch = append(batch, line)
if len(batch) >= 100 {
flush()
}
case <-ticker.C:
flush()
}
}
}
func (ls *lokiSyncer) push(lines []string) {
ts := strconv.FormatInt(time.Now().UnixNano(), 10)
values := make([][2]string, len(lines))
for i, line := range lines {
values[i] = [2]string{ts, line}
}
body, err := json.Marshal(lokiPushRequest{
Streams: []lokiStream{{Stream: ls.labels, Values: values}},
})
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ls.endpoint, bytes.NewReader(body))
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := ls.client.Do(req)
if err != nil {
fmt.Fprintf(os.Stderr, "loki syncer: push failed: %v\n", err)
return
}
defer resp.Body.Close()
}

View File

@ -21,53 +21,89 @@ var (
_globalLogger *zap.Logger
)
// getEncoder responsible for setting the log format for encoding
func getEncoder() zapcore.Encoder {
encoderConfig := zap.NewProductionEncoderConfig()
// serialization time eg:2006-01-02 15:04:05
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
encoderConfig.TimeKey = "time"
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
return zapcore.NewJSONEncoder(encoderConfig)
// getEncoder returns a console encoder for development (human-readable, colored) and a JSON encoder
// for container modes (parseable by Promtail pipeline_stages).
func getEncoder(mode string) zapcore.Encoder {
cfg := zap.NewProductionEncoderConfig()
cfg.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
cfg.TimeKey = "time"
cfg.EncodeCaller = zapcore.ShortCallerEncoder
if mode == constants.DevelopmentLogMode {
cfg.EncodeLevel = zapcore.CapitalColorLevelEncoder
return zapcore.NewConsoleEncoder(cfg)
}
cfg.EncodeLevel = zapcore.CapitalLevelEncoder
return zapcore.NewJSONEncoder(cfg)
}
// getLogWriter responsible for setting the location of log storage
func getLogWriter(mode, filename string, maxsize, maxBackup, maxAge int, compress bool) zapcore.WriteSyncer {
dateStr := time.Now().Format("2006-01-02 15:04:05")
finalFilename := fmt.Sprintf(filename, dateStr)
lumberJackLogger := &lumberjack.Logger{
Filename: finalFilename, // log file position
MaxSize: maxsize, // log file maxsize
MaxAge: maxAge, // maximum number of day files retained
MaxBackups: maxBackup, // maximum number of old files retained
Compress: compress, // whether to compress
// getWriteSyncer returns write targets based on mode:
// - development: stdout + optional Loki direct-push (when loki.endpoint is set)
// - container modes: stdout always (Promtail collects) + rotating file (when filepath is set)
func getWriteSyncer(lCfg config.LoggerConfig) zapcore.WriteSyncer {
stdout := zapcore.AddSync(os.Stdout)
if lCfg.Mode == constants.DevelopmentLogMode {
if lCfg.Loki.Endpoint == "" {
return stdout
}
return zapcore.NewMultiWriteSyncer(stdout, newLokiSyncer(lCfg.Loki))
}
syncConsole := zapcore.AddSync(os.Stderr)
if mode == constants.DevelopmentLogMode {
return syncConsole
syncers := []zapcore.WriteSyncer{stdout}
if lCfg.FilePath != "" {
dateStr := time.Now().Format("2006-01-02 15:04:05")
syncers = append(syncers, zapcore.AddSync(&lumberjack.Logger{
Filename: fmt.Sprintf(lCfg.FilePath, dateStr),
MaxSize: lCfg.MaxSize,
MaxAge: lCfg.MaxAge,
MaxBackups: lCfg.MaxBackups,
Compress: lCfg.Compress,
}))
}
return zapcore.NewMultiWriteSyncer(syncers...)
}
syncFile := zapcore.AddSync(lumberJackLogger)
return zapcore.NewMultiWriteSyncer(syncFile, syncConsole)
// containerFields reads K8s Downward API environment variables and returns them as global zap fields.
// These fields appear on every log line, allowing Loki/Grafana to filter by pod, namespace, and node.
// Inject them in the Deployment manifest:
//
// env:
// - name: K8S_NAMESPACE
// valueFrom: {fieldRef: {fieldPath: metadata.namespace}}
// - name: K8S_NODE_NAME
// valueFrom: {fieldRef: {fieldPath: spec.nodeName}}
func containerFields() []zap.Field {
var fields []zap.Field
// HOSTNAME is automatically set to the pod name by Kubernetes.
if pod := os.Getenv("HOSTNAME"); pod != "" {
fields = append(fields, zap.String("pod", pod))
}
if ns := os.Getenv("K8S_NAMESPACE"); ns != "" {
fields = append(fields, zap.String("namespace", ns))
}
if node := os.Getenv("K8S_NODE_NAME"); node != "" {
fields = append(fields, zap.String("node", node))
}
return fields
}
// initLogger return successfully initialized zap logger
func initLogger(lCfg config.LoggerConfig) *zap.Logger {
writeSyncer := getLogWriter(lCfg.Mode, lCfg.FilePath, lCfg.MaxSize, lCfg.MaxBackups, lCfg.MaxAge, lCfg.Compress)
encoder := getEncoder()
writeSyncer := getWriteSyncer(lCfg)
encoder := getEncoder(lCfg.Mode)
l := new(zapcore.Level)
err := l.UnmarshalText([]byte(lCfg.Level))
if err != nil {
if err := l.UnmarshalText([]byte(lCfg.Level)); err != nil {
panic(err)
}
core := zapcore.NewCore(encoder, writeSyncer, l)
logger := zap.New(core, zap.AddCaller())
opts := []zap.Option{zap.AddCaller()}
if lCfg.Mode != constants.DevelopmentLogMode {
opts = append(opts, zap.Fields(containerFields()...))
}
logger := zap.New(core, opts...)
// 替换全局日志实例
zap.ReplaceGlobals(logger)
return logger
}