optimize code of real time data pull api

This commit is contained in:
douxu 2026-01-28 14:03:25 +08:00
parent 617d21500e
commit 3ff29cc072
3 changed files with 70 additions and 83 deletions

View File

@ -30,3 +30,14 @@ func renderRespSuccess(c *gin.Context, code int, msg string, payload any) {
}
c.JSON(http.StatusOK, resp)
}
func renderWSRespFailure(c *gin.Context, code int, msg string, payload any) {
resp := network.WSResponse{
Code: code,
Msg: msg,
}
if payload != nil {
resp.Payload = payload
}
c.JSON(http.StatusOK, resp)
}

View File

@ -39,20 +39,14 @@ func PullRealTimeDataHandler(c *gin.Context) {
if clientID == "" {
err := fmt.Errorf("clientID is missing from the path")
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
renderWSRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
return
}
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
logger.Error(c, "upgrade http protocol to websocket protocol failed", "error", err)
renderWSRespFailure(c, constants.RespCodeServerError, err.Error(), nil)
return
}
defer conn.Close()
@ -71,7 +65,7 @@ func PullRealTimeDataHandler(c *gin.Context) {
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize)
sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize)
sendChan := make(chan network.WSResponse, constants.SendChanBufferSize)
go processTargetPolling(ctx, globalSubState, clientID, fanInChan, sendChan)
go readClientMessages(ctx, conn, clientID, cancel)
@ -88,52 +82,33 @@ func PullRealTimeDataHandler(c *gin.Context) {
select {
case targetData, ok := <-fanInChan:
if !ok {
logger.Error(ctx, "fanInChan closed unexpectedly", "client_id", clientID)
sendChan <- network.WSResponse{
Code: constants.RespCodeServerError,
Msg: "abnormal shutdown of data fan-in channel",
}
return
}
buffer = append(buffer, targetData)
if len(buffer) >= bufferMaxSize {
// buffer is full, send immediately
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (buffer is full)", "client_id", clientID)
}
// reset buffer
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
// reset the ticker to prevent it from triggering immediately after the ticker is sent
flushBuffer(ctx, &buffer, sendChan, clientID, "buffer_full")
ticker.Reset(sendMaxInterval)
}
case <-ticker.C:
if len(buffer) > 0 {
// when the ticker is triggered, all data in the send buffer is sent
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (ticker is triggered)", "client_id", clientID)
}
// reset buffer
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
flushBuffer(ctx, &buffer, sendChan, clientID, "ticker_timeout")
}
case <-ctx.Done():
// send the last remaining data
// last refresh before exiting
if len(buffer) > 0 {
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, cannot send last remaining data during shutdown.", "client_id", clientID)
}
flushBuffer(ctx, &buffer, sendChan, clientID, "shutdown")
}
logger.Info(ctx, "pullRealTimeDataHandler exiting as context is done.", "client_id", clientID)
return
}
}
}
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
// readClientMessages define func to responsible for continuously listening for messages sent by clients (such as Ping/Pong, Close Frame, or control commands)
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) {
// conn.SetReadLimit(512)
for {
@ -158,54 +133,47 @@ func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID stri
}
}
// sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client
func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network.RealTimePullTarget) error {
if len(targetsData) == 0 {
return nil
func flushBuffer(ctx context.Context, buffer *[]network.RealTimePullTarget, sendChan chan<- network.WSResponse, clientID string, reason string) {
if len(*buffer) == 0 {
return
}
response := network.SuccessResponse{
Code: 200,
Msg: "success",
resp := network.WSResponse{
Code: constants.RespCodeSuccess,
Msg: "process completed",
Payload: network.RealTimePullPayload{
Targets: targetsData,
Targets: *buffer,
},
}
return conn.WriteJSON(response)
select {
case sendChan <- resp:
default:
logger.Warn(ctx, "sendChan blocked, dropping data batch", "client_id", clientID, "reason", reason)
}
*buffer = make([]network.RealTimePullTarget, 0, constants.SendMaxBatchSize)
}
// sendDataStream define func to manages a dedicated goroutine to push data batches or system signals to the websocket client
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) {
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
for targetsData := range sendChan {
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
if len(targetsData) == 1 && targetsData[0].ID == constants.SysCtrlAllRemoved {
err := conn.WriteJSON(map[string]any{
"code": 2101,
"msg": "all targets removed in given client_id",
"payload": map[string]int{
"active_targets_count": 0,
},
})
if err != nil {
logger.Error(ctx, "send all targets removed system signal failed", "client_id", clientID, "error", err)
cancel()
}
continue
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan network.WSResponse, cancel context.CancelFunc) {
defer func() {
if r := recover(); r != nil {
logger.Error(ctx, "sendDataStream recovered from panic", "err", r)
}
}()
if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil {
logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err)
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
for resp := range sendChan {
if err := conn.WriteJSON(resp); err != nil {
logger.Error(ctx, "websocket write failed", "client_id", clientID, "error", err)
cancel()
return
}
}
logger.Info(ctx, "sender goroutine exiting as channel is closed", "client_id", clientID)
}
// processTargetPolling define func to process target in subscription map and data is continuously retrieved from redis based on the target
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- []network.RealTimePullTarget) {
// ensure the fanInChan will not leak
defer close(fanInChan)
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- network.WSResponse) {
logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID))
stopChanMap := make(map[string]chan struct{})
s.globalMutex.RLock()
@ -392,7 +360,7 @@ func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
}
// removeTargets define func to stops running polling goroutines for targets that were removed
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- []network.RealTimePullTarget) {
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- network.WSResponse) {
for _, target := range removeTargets {
stopChan, exists := stopChanMap[target]
if !exists {
@ -411,17 +379,18 @@ func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, re
}
}
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- []network.RealTimePullTarget) {
specialTarget := network.RealTimePullTarget{
ID: constants.SysCtrlAllRemoved,
Datas: []network.RealTimePullData{},
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- network.WSResponse) {
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
resp := network.WSResponse{
Code: constants.RespCodeSuccessWithNoSub,
Msg: "all targets removed",
Payload: map[string]int{"active_targets_count": 0},
}
select {
case sendChan <- []network.RealTimePullTarget{specialTarget}:
logger.Info(ctx, "sent 2101 status request to sendChan")
case sendChan <- resp:
default:
logger.Warn(ctx, "sendChan is full, skipping 2101 status message")
logger.Warn(ctx, "sendChan is full, skipping 2101 status")
}
}

View File

@ -3,18 +3,25 @@ package network
// FailureResponse define struct of standard failure API response format
type FailureResponse struct {
Code int `json:"code" example:"500"`
Msg string `json:"msg" example:"failed to get recommend data from redis"`
Code int `json:"code" example:"3000"`
Msg string `json:"msg" example:"process completed with partial failures"`
Payload any `json:"payload" swaggertype:"object"`
}
// SuccessResponse define struct of standard successful API response format
type SuccessResponse struct {
Code int `json:"code" example:"200"`
Msg string `json:"msg" example:"success"`
Code int `json:"code" example:"2000"`
Msg string `json:"msg" example:"process completed"`
Payload any `json:"payload" swaggertype:"object"`
}
// WSResponse define struct of standard websocket API response format
type WSResponse struct {
Code int `json:"code" example:"2000"`
Msg string `json:"msg" example:"process completed"`
Payload any `json:"payload,omitempty" swaggertype:"object"`
}
// MeasurementRecommendPayload define struct of represents the data payload for the successful recommendation response.
type MeasurementRecommendPayload struct {
Input string `json:"input" example:"transformfeeder1_220."`