From c92cee9575489d1d749cd05ab1f8246df48e0740 Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 12 Dec 2025 17:22:20 +0800 Subject: [PATCH] optimzie logic of real time data pull api --- constants/buffer.go | 8 ++++- handler/diagram_node_link.go | 12 ++++---- handler/real_time_data_pull.go | 53 +++++++++++++++++++++++++++------- 3 files changed, 56 insertions(+), 17 deletions(-) diff --git a/constants/buffer.go b/constants/buffer.go index e8079f8..9e99f90 100644 --- a/constants/buffer.go +++ b/constants/buffer.go @@ -4,8 +4,14 @@ package constants import "time" const ( + // FanInChanMaxSize define maximum buffer capacity by fanChannel + FanInChanMaxSize = 10000 // SendMaxBatchSize define maximum buffer capacity + // TODO 后续优化批处理大小 SendMaxBatchSize = 100 + // SendChanBufferSize define maximum buffer capacity by channel + SendChanBufferSize = 100 + // SendMaxBatchInterval define maximum aggregate latency - SendMaxBatchInterval = 200 * time.Millisecond + SendMaxBatchInterval = 20 * time.Millisecond ) diff --git a/handler/diagram_node_link.go b/handler/diagram_node_link.go index 23696b4..0c2bd5c 100644 --- a/handler/diagram_node_link.go +++ b/handler/diagram_node_link.go @@ -17,6 +17,12 @@ import ( "github.com/gin-gonic/gin" ) +type linkSetConfig struct { + CurrKey string + PrevKeyTemplate string + PrevIsNil bool +} + var linkSetConfigs = map[int]linkSetConfig{ // grid hierarchy 0: {CurrKey: constants.RedisAllGridSetKey, PrevIsNil: true}, @@ -180,9 +186,3 @@ func processDiagramLinkError(err1, err2 error, action string) error { } return err } - -type linkSetConfig struct { - CurrKey string - PrevKeyTemplate string - PrevIsNil bool -} diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index f54d756..f8fe232 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -61,9 +61,13 @@ func PullRealTimeDataHandler(c *gin.Context) { defer cancel() // TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1 - fanInChan := make(chan network.RealTimePullTarget, 10000) + fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize) + sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize) + go processTargetPolling(ctx, globalSubState, clientID, fanInChan) go readClientMessages(ctx, conn, clientID, cancel) + go sendDataStream(ctx, conn, clientID, sendChan, cancel) + defer close(sendChan) bufferMaxSize := constants.SendMaxBatchSize sendMaxInterval := constants.SendMaxBatchInterval @@ -82,10 +86,12 @@ func PullRealTimeDataHandler(c *gin.Context) { if len(buffer) >= bufferMaxSize { // buffer is full, send immediately - if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { - logger.Error(ctx, "when buffer is full, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err) - return + select { + case sendChan <- buffer: + default: + logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (buffer is full)", "client_id", clientID) } + // reset buffer buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize) // reset the ticker to prevent it from triggering immediately after the ticker is sent @@ -94,19 +100,25 @@ func PullRealTimeDataHandler(c *gin.Context) { case <-ticker.C: if len(buffer) > 0 { // when the ticker is triggered, all data in the send buffer is sent - if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { - logger.Error(ctx, "when the ticker is triggered, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err) - return + select { + case sendChan <- buffer: + default: + logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (ticker is triggered)", "client_id", clientID) } + // reset buffer buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize) } case <-ctx.Done(): // send the last remaining data - if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { - logger.Error(ctx, "send the last remaining data failed", "client_id", clientID, "buffer", buffer, "error", err) + if len(buffer) > 0 { + select { + case sendChan <- buffer: + default: + logger.Warn(ctx, "sendChan is full, cannot send last remaining data during shutdown.", "client_id", clientID) + } } - logger.Info(ctx, "PullRealTimeDataHandler exiting as context is done.", "client_id", clientID) + logger.Info(ctx, "pullRealTimeDataHandler exiting as context is done.", "client_id", clientID) return } } @@ -152,6 +164,27 @@ func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network return conn.WriteJSON(response) } +func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) { + logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID) + for { + select { + case targetsData, ok := <-sendChan: + if !ok { + logger.Info(ctx, "send channel closed, sender goroutine exiting", "client_id", clientID) + return + } + if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil { + logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err) + cancel() + return + } + case <-ctx.Done(): + logger.Info(ctx, "sender goroutine exiting as context is done", "client_id", clientID) + return + } + } +} + // processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget) { // ensure the fanInChan will not leak