optimzie logic of real time data pull api

This commit is contained in:
douxu 2025-12-12 17:22:20 +08:00
parent d4d8c2c975
commit c92cee9575
3 changed files with 56 additions and 17 deletions

View File

@ -4,8 +4,14 @@ package constants
import "time"
const (
// FanInChanMaxSize define maximum buffer capacity by fanChannel
FanInChanMaxSize = 10000
// SendMaxBatchSize define maximum buffer capacity
// TODO 后续优化批处理大小
SendMaxBatchSize = 100
// SendChanBufferSize define maximum buffer capacity by channel
SendChanBufferSize = 100
// SendMaxBatchInterval define maximum aggregate latency
SendMaxBatchInterval = 200 * time.Millisecond
SendMaxBatchInterval = 20 * time.Millisecond
)

View File

@ -17,6 +17,12 @@ import (
"github.com/gin-gonic/gin"
)
type linkSetConfig struct {
CurrKey string
PrevKeyTemplate string
PrevIsNil bool
}
var linkSetConfigs = map[int]linkSetConfig{
// grid hierarchy
0: {CurrKey: constants.RedisAllGridSetKey, PrevIsNil: true},
@ -180,9 +186,3 @@ func processDiagramLinkError(err1, err2 error, action string) error {
}
return err
}
type linkSetConfig struct {
CurrKey string
PrevKeyTemplate string
PrevIsNil bool
}

View File

@ -61,9 +61,13 @@ func PullRealTimeDataHandler(c *gin.Context) {
defer cancel()
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
fanInChan := make(chan network.RealTimePullTarget, 10000)
fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize)
sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize)
go processTargetPolling(ctx, globalSubState, clientID, fanInChan)
go readClientMessages(ctx, conn, clientID, cancel)
go sendDataStream(ctx, conn, clientID, sendChan, cancel)
defer close(sendChan)
bufferMaxSize := constants.SendMaxBatchSize
sendMaxInterval := constants.SendMaxBatchInterval
@ -82,10 +86,12 @@ func PullRealTimeDataHandler(c *gin.Context) {
if len(buffer) >= bufferMaxSize {
// buffer is full, send immediately
if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil {
logger.Error(ctx, "when buffer is full, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err)
return
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (buffer is full)", "client_id", clientID)
}
// reset buffer
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
// reset the ticker to prevent it from triggering immediately after the ticker is sent
@ -94,19 +100,25 @@ func PullRealTimeDataHandler(c *gin.Context) {
case <-ticker.C:
if len(buffer) > 0 {
// when the ticker is triggered, all data in the send buffer is sent
if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil {
logger.Error(ctx, "when the ticker is triggered, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err)
return
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (ticker is triggered)", "client_id", clientID)
}
// reset buffer
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
}
case <-ctx.Done():
// send the last remaining data
if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil {
logger.Error(ctx, "send the last remaining data failed", "client_id", clientID, "buffer", buffer, "error", err)
if len(buffer) > 0 {
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, cannot send last remaining data during shutdown.", "client_id", clientID)
}
logger.Info(ctx, "PullRealTimeDataHandler exiting as context is done.", "client_id", clientID)
}
logger.Info(ctx, "pullRealTimeDataHandler exiting as context is done.", "client_id", clientID)
return
}
}
@ -152,6 +164,27 @@ func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network
return conn.WriteJSON(response)
}
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) {
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
for {
select {
case targetsData, ok := <-sendChan:
if !ok {
logger.Info(ctx, "send channel closed, sender goroutine exiting", "client_id", clientID)
return
}
if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil {
logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err)
cancel()
return
}
case <-ctx.Done():
logger.Info(ctx, "sender goroutine exiting as context is done", "client_id", clientID)
return
}
}
}
// processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget) {
// ensure the fanInChan will not leak