diff --git a/config/config.go b/config/config.go index 45602ff..18ca080 100644 --- a/config/config.go +++ b/config/config.go @@ -8,15 +8,22 @@ import ( "github.com/spf13/viper" ) +// LokiConfig define config struct of loki direct-push (used in development mode) +type LokiConfig struct { + Endpoint string `mapstructure:"endpoint"` // empty disables direct push + Labels map[string]string `mapstructure:"labels"` +} + // LoggerConfig define config struct of zap logger config type LoggerConfig struct { - Mode string `mapstructure:"mode"` - Level string `mapstructure:"level"` - FilePath string `mapstructure:"filepath"` - MaxSize int `mapstructure:"maxsize"` - MaxBackups int `mapstructure:"maxbackups"` - MaxAge int `mapstructure:"maxage"` - Compress bool `mapstructure:"compress"` + Mode string `mapstructure:"mode"` + Level string `mapstructure:"level"` + FilePath string `mapstructure:"filepath"` // empty disables file rotation in container modes + MaxSize int `mapstructure:"maxsize"` + MaxBackups int `mapstructure:"maxbackups"` + MaxAge int `mapstructure:"maxage"` + Compress bool `mapstructure:"compress"` + Loki LokiConfig `mapstructure:"loki"` } // MongoDBConfig define config struct of mongoDB config diff --git a/deploy/k8s/eventrt-configmap.yaml b/deploy/k8s/eventrt-configmap.yaml index c9a1589..5375875 100644 --- a/deploy/k8s/eventrt-configmap.yaml +++ b/deploy/k8s/eventrt-configmap.yaml @@ -33,6 +33,8 @@ data: maxbackups: 5 maxage: 30 compress: false + loki: + endpoint: "" # Promtail handles log collection in K8s, direct push disabled service: service_addr: ":8081" diff --git a/logger/facede.go b/logger/facede.go index 2ed193a..cf88ccf 100644 --- a/logger/facede.go +++ b/logger/facede.go @@ -39,11 +39,30 @@ func Error(ctx context.Context, msg string, kv ...any) { } func (f *facade) log(ctx context.Context, lvl zapcore.Level, msg string, kv ...any) { - fields := makeLogFields(ctx, kv...) + f.logSkip(ctx, lvl, 0, msg, kv...) +} + +func (f *facade) logSkip(ctx context.Context, lvl zapcore.Level, extraSkip int, msg string, kv ...any) { + fields := makeLogFieldsSkip(ctx, extraSkip, kv...) ce := f._logger.Check(lvl, msg) ce.Write(fields...) } +// ErrorSkip logs at error level with extra caller skip frames for wrapper functions. +func ErrorSkip(ctx context.Context, extraSkip int, msg string, kv ...any) { + logFacade().logSkip(ctx, zapcore.ErrorLevel, extraSkip, msg, kv...) +} + +// WarnSkip logs at warn level with extra caller skip frames for wrapper functions. +func WarnSkip(ctx context.Context, extraSkip int, msg string, kv ...any) { + logFacade().logSkip(ctx, zapcore.WarnLevel, extraSkip, msg, kv...) +} + +// InfoSkip logs at info level with extra caller skip frames for wrapper functions. +func InfoSkip(ctx context.Context, extraSkip int, msg string, kv ...any) { + logFacade().logSkip(ctx, zapcore.InfoLevel, extraSkip, msg, kv...) +} + func logFacade() *facade { fOnce.Do(func() { f = &facade{ diff --git a/logger/logger.go b/logger/logger.go index 12deeef..862216f 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -47,6 +47,10 @@ func (l *logger) log(lvl zapcore.Level, msg string, kv ...any) { } func makeLogFields(ctx context.Context, kv ...any) []zap.Field { + return makeLogFieldsSkip(ctx, 0, kv...) +} + +func makeLogFieldsSkip(ctx context.Context, extraSkip int, kv ...any) []zap.Field { if len(kv)%2 != 0 { kv = append(kv, "unknown") } @@ -56,7 +60,7 @@ func makeLogFields(ctx context.Context, kv ...any) []zap.Field { spanID := spanCtx.SpanID().String() kv = append(kv, "traceID", traceID, "spanID", spanID) - funcName, file, line := getLoggerCallerInfo() + funcName, file, line := getLoggerCallerInfoSkip(extraSkip) kv = append(kv, "func", funcName, "file", file, "line", line) fields := make([]zap.Field, 0, len(kv)/2) for i := 0; i < len(kv); i += 2 { @@ -84,9 +88,14 @@ func makeLogFields(ctx context.Context, kv ...any) []zap.Field { return fields } -// getLoggerCallerInfo define func of return log caller information、method name、file name、line number +// getLoggerCallerInfo returns caller info at a fixed depth for the standard facade call chain. func getLoggerCallerInfo() (funcName, file string, line int) { - pc, file, line, ok := runtime.Caller(4) + return getLoggerCallerInfoSkip(0) +} + +// getLoggerCallerInfoSkip returns caller info with additional skip frames beyond the standard depth. +func getLoggerCallerInfoSkip(extraSkip int) (funcName, file string, line int) { + pc, file, line, ok := runtime.Caller(4 + extraSkip) if !ok { return } diff --git a/logger/loki_syncer.go b/logger/loki_syncer.go new file mode 100644 index 0000000..3792c2e --- /dev/null +++ b/logger/loki_syncer.go @@ -0,0 +1,132 @@ +// Package logger define log struct of eventRT project +package logger + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "maps" + "net/http" + "os" + "strconv" + "sync" + "time" + + "eventRT/config" +) + +type lokiPushRequest struct { + Streams []lokiStream `json:"streams"` +} + +type lokiStream struct { + Stream map[string]string `json:"stream"` + Values [][2]string `json:"values"` +} + +// lokiSyncer implements zapcore.WriteSyncer, batching log lines and pushing them +// to Loki's push API asynchronously. Errors are silently dropped so a unreachable +// Loki instance never blocks or crashes the application. +type lokiSyncer struct { + endpoint string + labels map[string]string + client *http.Client + ch chan string + wg sync.WaitGroup + closeOnce sync.Once +} + +func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer { + // always tag development logs with env=development; caller-supplied labels override if needed + labels := map[string]string{"env": "development"} + maps.Copy(labels, lCfg.Labels) + ls := &lokiSyncer{ + endpoint: lCfg.Endpoint + "/loki/api/v1/push", + labels: labels, + client: &http.Client{Timeout: 5 * time.Second}, + ch: make(chan string, 512), + } + ls.wg.Add(1) + go ls.run() + return ls +} + +func (ls *lokiSyncer) Write(p []byte) (int, error) { + select { + case ls.ch <- string(p): + default: + // channel full: drop the line rather than block the caller + } + return len(p), nil +} + +// Sync flushes remaining buffered lines and shuts down the background goroutine. +// Called by zap.Logger.Sync() at application shutdown. +func (ls *lokiSyncer) Sync() error { + ls.closeOnce.Do(func() { close(ls.ch) }) + ls.wg.Wait() + return nil +} + +func (ls *lokiSyncer) run() { + defer ls.wg.Done() + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + var batch []string + flush := func() { + if len(batch) == 0 { + return + } + ls.push(batch) + batch = batch[:0] + } + + for { + select { + case line, ok := <-ls.ch: + if !ok { + flush() + return + } + batch = append(batch, line) + if len(batch) >= 100 { + flush() + } + case <-ticker.C: + flush() + } + } +} + +func (ls *lokiSyncer) push(lines []string) { + ts := strconv.FormatInt(time.Now().UnixNano(), 10) + values := make([][2]string, len(lines)) + for i, line := range lines { + values[i] = [2]string{ts, line} + } + + body, err := json.Marshal(lokiPushRequest{ + Streams: []lokiStream{{Stream: ls.labels, Values: values}}, + }) + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, ls.endpoint, bytes.NewReader(body)) + if err != nil { + return + } + req.Header.Set("Content-Type", "application/json") + + resp, err := ls.client.Do(req) + if err != nil { + fmt.Fprintf(os.Stderr, "loki syncer: push failed: %v\n", err) + return + } + defer resp.Body.Close() +} diff --git a/logger/zap.go b/logger/zap.go index cbf3d22..c716ffb 100644 --- a/logger/zap.go +++ b/logger/zap.go @@ -21,53 +21,89 @@ var ( _globalLogger *zap.Logger ) -// getEncoder responsible for setting the log format for encoding -func getEncoder() zapcore.Encoder { - encoderConfig := zap.NewProductionEncoderConfig() - // serialization time eg:2006-01-02 15:04:05 - encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05") - encoderConfig.TimeKey = "time" - encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder - encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder - return zapcore.NewJSONEncoder(encoderConfig) +// getEncoder returns a console encoder for development (human-readable, colored) and a JSON encoder +// for container modes (parseable by Promtail pipeline_stages). +func getEncoder(mode string) zapcore.Encoder { + cfg := zap.NewProductionEncoderConfig() + cfg.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05") + cfg.TimeKey = "time" + cfg.EncodeCaller = zapcore.ShortCallerEncoder + if mode == constants.DevelopmentLogMode { + cfg.EncodeLevel = zapcore.CapitalColorLevelEncoder + return zapcore.NewConsoleEncoder(cfg) + } + cfg.EncodeLevel = zapcore.CapitalLevelEncoder + return zapcore.NewJSONEncoder(cfg) } -// getLogWriter responsible for setting the location of log storage -func getLogWriter(mode, filename string, maxsize, maxBackup, maxAge int, compress bool) zapcore.WriteSyncer { - dateStr := time.Now().Format("2006-01-02 15:04:05") - finalFilename := fmt.Sprintf(filename, dateStr) - lumberJackLogger := &lumberjack.Logger{ - Filename: finalFilename, // log file position - MaxSize: maxsize, // log file maxsize - MaxAge: maxAge, // maximum number of day files retained - MaxBackups: maxBackup, // maximum number of old files retained - Compress: compress, // whether to compress +// getWriteSyncer returns write targets based on mode: +// - development: stdout + optional Loki direct-push (when loki.endpoint is set) +// - container modes: stdout always (Promtail collects) + rotating file (when filepath is set) +func getWriteSyncer(lCfg config.LoggerConfig) zapcore.WriteSyncer { + stdout := zapcore.AddSync(os.Stdout) + + if lCfg.Mode == constants.DevelopmentLogMode { + if lCfg.Loki.Endpoint == "" { + return stdout + } + return zapcore.NewMultiWriteSyncer(stdout, newLokiSyncer(lCfg.Loki)) } - syncConsole := zapcore.AddSync(os.Stderr) - if mode == constants.DevelopmentLogMode { - return syncConsole + syncers := []zapcore.WriteSyncer{stdout} + if lCfg.FilePath != "" { + dateStr := time.Now().Format("2006-01-02 15:04:05") + syncers = append(syncers, zapcore.AddSync(&lumberjack.Logger{ + Filename: fmt.Sprintf(lCfg.FilePath, dateStr), + MaxSize: lCfg.MaxSize, + MaxAge: lCfg.MaxAge, + MaxBackups: lCfg.MaxBackups, + Compress: lCfg.Compress, + })) } + return zapcore.NewMultiWriteSyncer(syncers...) +} - syncFile := zapcore.AddSync(lumberJackLogger) - return zapcore.NewMultiWriteSyncer(syncFile, syncConsole) +// containerFields reads K8s Downward API environment variables and returns them as global zap fields. +// These fields appear on every log line, allowing Loki/Grafana to filter by pod, namespace, and node. +// Inject them in the Deployment manifest: +// +// env: +// - name: K8S_NAMESPACE +// valueFrom: {fieldRef: {fieldPath: metadata.namespace}} +// - name: K8S_NODE_NAME +// valueFrom: {fieldRef: {fieldPath: spec.nodeName}} +func containerFields() []zap.Field { + var fields []zap.Field + // HOSTNAME is automatically set to the pod name by Kubernetes. + if pod := os.Getenv("HOSTNAME"); pod != "" { + fields = append(fields, zap.String("pod", pod)) + } + if ns := os.Getenv("K8S_NAMESPACE"); ns != "" { + fields = append(fields, zap.String("namespace", ns)) + } + if node := os.Getenv("K8S_NODE_NAME"); node != "" { + fields = append(fields, zap.String("node", node)) + } + return fields } // initLogger return successfully initialized zap logger func initLogger(lCfg config.LoggerConfig) *zap.Logger { - writeSyncer := getLogWriter(lCfg.Mode, lCfg.FilePath, lCfg.MaxSize, lCfg.MaxBackups, lCfg.MaxAge, lCfg.Compress) - encoder := getEncoder() + writeSyncer := getWriteSyncer(lCfg) + encoder := getEncoder(lCfg.Mode) l := new(zapcore.Level) - err := l.UnmarshalText([]byte(lCfg.Level)) - if err != nil { + if err := l.UnmarshalText([]byte(lCfg.Level)); err != nil { panic(err) } core := zapcore.NewCore(encoder, writeSyncer, l) - logger := zap.New(core, zap.AddCaller()) + opts := []zap.Option{zap.AddCaller()} + if lCfg.Mode != constants.DevelopmentLogMode { + opts = append(opts, zap.Fields(containerFields()...)) + } + logger := zap.New(core, opts...) - // 替换全局日志实例 zap.ReplaceGlobals(logger) return logger }