From 3ff29cc0727bc723249fc619735522ecdd54dd58 Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 28 Jan 2026 14:03:25 +0800 Subject: [PATCH] optimize code of real time data pull api --- handler/helper.go | 11 +++ handler/real_time_data_pull.go | 127 +++++++++++++-------------------- network/response.go | 15 ++-- 3 files changed, 70 insertions(+), 83 deletions(-) diff --git a/handler/helper.go b/handler/helper.go index 6589409..532d93b 100644 --- a/handler/helper.go +++ b/handler/helper.go @@ -30,3 +30,14 @@ func renderRespSuccess(c *gin.Context, code int, msg string, payload any) { } c.JSON(http.StatusOK, resp) } + +func renderWSRespFailure(c *gin.Context, code int, msg string, payload any) { + resp := network.WSResponse{ + Code: code, + Msg: msg, + } + if payload != nil { + resp.Payload = payload + } + c.JSON(http.StatusOK, resp) +} diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index a360a49..24ccf50 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -39,20 +39,14 @@ func PullRealTimeDataHandler(c *gin.Context) { if clientID == "" { err := fmt.Errorf("clientID is missing from the path") logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: err.Error(), - }) + renderWSRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil) return } conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { - logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: err.Error(), - }) + logger.Error(c, "upgrade http protocol to websocket protocol failed", "error", err) + renderWSRespFailure(c, constants.RespCodeServerError, err.Error(), nil) return } defer conn.Close() @@ -71,7 +65,7 @@ func PullRealTimeDataHandler(c *gin.Context) { // TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1 fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize) - sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize) + sendChan := make(chan network.WSResponse, constants.SendChanBufferSize) go processTargetPolling(ctx, globalSubState, clientID, fanInChan, sendChan) go readClientMessages(ctx, conn, clientID, cancel) @@ -88,52 +82,33 @@ func PullRealTimeDataHandler(c *gin.Context) { select { case targetData, ok := <-fanInChan: if !ok { - logger.Error(ctx, "fanInChan closed unexpectedly", "client_id", clientID) + sendChan <- network.WSResponse{ + Code: constants.RespCodeServerError, + Msg: "abnormal shutdown of data fan-in channel", + } return } + buffer = append(buffer, targetData) - if len(buffer) >= bufferMaxSize { - // buffer is full, send immediately - select { - case sendChan <- buffer: - default: - logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (buffer is full)", "client_id", clientID) - } - - // reset buffer - buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize) - // reset the ticker to prevent it from triggering immediately after the ticker is sent + flushBuffer(ctx, &buffer, sendChan, clientID, "buffer_full") ticker.Reset(sendMaxInterval) } case <-ticker.C: if len(buffer) > 0 { - // when the ticker is triggered, all data in the send buffer is sent - select { - case sendChan <- buffer: - default: - logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (ticker is triggered)", "client_id", clientID) - } - - // reset buffer - buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize) + flushBuffer(ctx, &buffer, sendChan, clientID, "ticker_timeout") } case <-ctx.Done(): - // send the last remaining data + // last refresh before exiting if len(buffer) > 0 { - select { - case sendChan <- buffer: - default: - logger.Warn(ctx, "sendChan is full, cannot send last remaining data during shutdown.", "client_id", clientID) - } + flushBuffer(ctx, &buffer, sendChan, clientID, "shutdown") } - logger.Info(ctx, "pullRealTimeDataHandler exiting as context is done.", "client_id", clientID) return } } } -// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令) +// readClientMessages define func to responsible for continuously listening for messages sent by clients (such as Ping/Pong, Close Frame, or control commands) func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) { // conn.SetReadLimit(512) for { @@ -158,54 +133,47 @@ func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID stri } } -// sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client -func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network.RealTimePullTarget) error { - if len(targetsData) == 0 { - return nil +func flushBuffer(ctx context.Context, buffer *[]network.RealTimePullTarget, sendChan chan<- network.WSResponse, clientID string, reason string) { + if len(*buffer) == 0 { + return } - response := network.SuccessResponse{ - Code: 200, - Msg: "success", + + resp := network.WSResponse{ + Code: constants.RespCodeSuccess, + Msg: "process completed", Payload: network.RealTimePullPayload{ - Targets: targetsData, + Targets: *buffer, }, } - return conn.WriteJSON(response) + + select { + case sendChan <- resp: + default: + logger.Warn(ctx, "sendChan blocked, dropping data batch", "client_id", clientID, "reason", reason) + } + *buffer = make([]network.RealTimePullTarget, 0, constants.SendMaxBatchSize) } // sendDataStream define func to manages a dedicated goroutine to push data batches or system signals to the websocket client -func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) { - logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID) - for targetsData := range sendChan { - // TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展 - if len(targetsData) == 1 && targetsData[0].ID == constants.SysCtrlAllRemoved { - err := conn.WriteJSON(map[string]any{ - "code": 2101, - "msg": "all targets removed in given client_id", - "payload": map[string]int{ - "active_targets_count": 0, - }, - }) - if err != nil { - logger.Error(ctx, "send all targets removed system signal failed", "client_id", clientID, "error", err) - cancel() - } - continue +func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan network.WSResponse, cancel context.CancelFunc) { + defer func() { + if r := recover(); r != nil { + logger.Error(ctx, "sendDataStream recovered from panic", "err", r) } + }() - if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil { - logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err) + logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID) + for resp := range sendChan { + if err := conn.WriteJSON(resp); err != nil { + logger.Error(ctx, "websocket write failed", "client_id", clientID, "error", err) cancel() return } } - logger.Info(ctx, "sender goroutine exiting as channel is closed", "client_id", clientID) } // processTargetPolling define func to process target in subscription map and data is continuously retrieved from redis based on the target -func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- []network.RealTimePullTarget) { - // ensure the fanInChan will not leak - defer close(fanInChan) +func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- network.WSResponse) { logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID)) stopChanMap := make(map[string]chan struct{}) s.globalMutex.RLock() @@ -392,7 +360,7 @@ func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m } // removeTargets define func to stops running polling goroutines for targets that were removed -func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- []network.RealTimePullTarget) { +func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- network.WSResponse) { for _, target := range removeTargets { stopChan, exists := stopChanMap[target] if !exists { @@ -411,17 +379,18 @@ func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, re } } -func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- []network.RealTimePullTarget) { - specialTarget := network.RealTimePullTarget{ - ID: constants.SysCtrlAllRemoved, - Datas: []network.RealTimePullData{}, +func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- network.WSResponse) { + // TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展 + resp := network.WSResponse{ + Code: constants.RespCodeSuccessWithNoSub, + Msg: "all targets removed", + Payload: map[string]int{"active_targets_count": 0}, } select { - case sendChan <- []network.RealTimePullTarget{specialTarget}: - logger.Info(ctx, "sent 2101 status request to sendChan") + case sendChan <- resp: default: - logger.Warn(ctx, "sendChan is full, skipping 2101 status message") + logger.Warn(ctx, "sendChan is full, skipping 2101 status") } } diff --git a/network/response.go b/network/response.go index 4bed98d..b111df7 100644 --- a/network/response.go +++ b/network/response.go @@ -3,18 +3,25 @@ package network // FailureResponse define struct of standard failure API response format type FailureResponse struct { - Code int `json:"code" example:"500"` - Msg string `json:"msg" example:"failed to get recommend data from redis"` + Code int `json:"code" example:"3000"` + Msg string `json:"msg" example:"process completed with partial failures"` Payload any `json:"payload" swaggertype:"object"` } // SuccessResponse define struct of standard successful API response format type SuccessResponse struct { - Code int `json:"code" example:"200"` - Msg string `json:"msg" example:"success"` + Code int `json:"code" example:"2000"` + Msg string `json:"msg" example:"process completed"` Payload any `json:"payload" swaggertype:"object"` } +// WSResponse define struct of standard websocket API response format +type WSResponse struct { + Code int `json:"code" example:"2000"` + Msg string `json:"msg" example:"process completed"` + Payload any `json:"payload,omitempty" swaggertype:"object"` +} + // MeasurementRecommendPayload define struct of represents the data payload for the successful recommendation response. type MeasurementRecommendPayload struct { Input string `json:"input" example:"transformfeeder1_220."`