// Package logger define log struct of eventRT project package logger import ( "bytes" "context" "encoding/json" "fmt" "maps" "net/http" "os" "strconv" "sync" "time" "eventRT/config" ) type lokiPushRequest struct { Streams []lokiStream `json:"streams"` } type lokiStream struct { Stream map[string]string `json:"stream"` Values [][2]string `json:"values"` } // lokiSyncer implements zapcore.WriteSyncer, batching log lines and pushing them // to Loki's push API asynchronously. Errors are silently dropped so a unreachable // Loki instance never blocks or crashes the application. type lokiSyncer struct { endpoint string labels map[string]string client *http.Client ch chan string wg sync.WaitGroup closeOnce sync.Once } func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer { // always tag development logs with env=development; caller-supplied labels override if needed labels := map[string]string{"env": "development"} maps.Copy(labels, lCfg.Labels) ls := &lokiSyncer{ endpoint: lCfg.Endpoint + "/loki/api/v1/push", labels: labels, client: &http.Client{Timeout: 5 * time.Second}, ch: make(chan string, 512), } ls.wg.Add(1) go ls.run() return ls } func (ls *lokiSyncer) Write(p []byte) (int, error) { select { case ls.ch <- string(p): default: // channel full: drop the line rather than block the caller } return len(p), nil } // Sync flushes remaining buffered lines and shuts down the background goroutine. // Called by zap.Logger.Sync() at application shutdown. func (ls *lokiSyncer) Sync() error { ls.closeOnce.Do(func() { close(ls.ch) }) ls.wg.Wait() return nil } func (ls *lokiSyncer) run() { defer ls.wg.Done() ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() var batch []string flush := func() { if len(batch) == 0 { return } ls.push(batch) batch = batch[:0] } for { select { case line, ok := <-ls.ch: if !ok { flush() return } batch = append(batch, line) if len(batch) >= 100 { flush() } case <-ticker.C: flush() } } } func (ls *lokiSyncer) push(lines []string) { ts := strconv.FormatInt(time.Now().UnixNano(), 10) values := make([][2]string, len(lines)) for i, line := range lines { values[i] = [2]string{ts, line} } body, err := json.Marshal(lokiPushRequest{ Streams: []lokiStream{{Stream: ls.labels, Values: values}}, }) if err != nil { return } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodPost, ls.endpoint, bytes.NewReader(body)) if err != nil { return } req.Header.Set("Content-Type", "application/json") resp, err := ls.client.Do(req) if err != nil { fmt.Fprintf(os.Stderr, "loki syncer: push failed: %v\n", err) return } defer resp.Body.Close() }