diff --git a/diagram/redis_client.go b/diagram/redis_client.go index 7319b02..6d7bf1a 100644 --- a/diagram/redis_client.go +++ b/diagram/redis_client.go @@ -20,10 +20,9 @@ func NewRedisClient() *RedisClient { } // QueryByZRangeByLex define func to query real time data from redis zset -func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64, startTimestamp, stopTimeStamp string) ([]float64, error) { +func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64, startTimestamp, stopTimeStamp string) ([]redis.Z, error) { client := rc.Client - datas := make([]float64, 0, size) startStr := "[" + startTimestamp stopStr := stopTimeStamp + "]" args := redis.ZRangeArgs{ @@ -34,13 +33,5 @@ func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size Rev: false, Count: size, } - - members, err := client.ZRangeArgsWithScores(ctx, args).Result() - if err != nil { - return nil, err - } - for data := range members { - datas = append(datas, float64(data)) - } - return datas, nil + return client.ZRangeArgsWithScores(ctx, args).Result() } diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index 6c151a5..e386b0a 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -57,35 +57,39 @@ func PullRealTimeDataHandler(c *gin.Context) { ctx, cancel := context.WithCancel(c.Request.Context()) defer cancel() - // TODO 考虑数据量庞大时候,channel的缓存大小问题 - fanInChan := make(chan []float64, 10000) + // TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1 + fanInChan := make(chan network.RealTimePullTarget, 10000) go processTargetPolling(ctx, globalMonitorState, clientID, fanInChan) - // 主循环:负责接收客户端消息(如心跳包、停止拉取命令等) - go readClientMessages(ctx, conn, clientID) + go readClientMessages(ctx, conn, clientID, cancel) for { select { - case value, ok := <-fanInChan: - // 从扇入通道拿去数据后,将数据写入websocket 返回流中 - sendRealTimeDataStream(conn, clientID) - fmt.Println(value, ok) + case targetData, ok := <-fanInChan: + if !ok { + logger.Error(ctx, "fanInChan closed unexpectedly", "clientID", clientID) + return + } + // TODO 考虑后续将多个 targets 聚合发送 + err := sendRealTimeDataStream(conn, targetData) + if err != nil { + logger.Error(nil, "clientID: %s 写入数据失败: %v", clientID, err) + } default: - fmt.Println("default") + // TODO[BACKPRESSURE-ISSUE] 考虑在此使用双重背压方式解决阻塞问题 #1 + logger.Warn(ctx, "Write channel full, dropping data for slow client.") } } } // readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令) -func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string) { +func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) { // conn.SetReadLimit(512) for { - msgType, _, err := conn.ReadMessage() + msgType, msgBytes, err := conn.ReadMessage() if err != nil { - // **【核心逻辑】判断是否为客户端主动正常关闭** if websocket.IsCloseError(err, websocket.CloseNormalClosure) { - fmt.Printf("客户端 %s 主动正常关闭连接 (Code 1000)。\n", clientID) - // 客户端主动发起的正常关闭,这是最清晰的主动退出信号。 + logger.Info(ctx, "client actively and normally closed the connection", "client_id", clientID) } else if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { // 这是您代码中已有的逻辑,用于处理非正常或服务器离开的关闭(如网络中断、浏览器关闭 Tab 但未发送关闭帧)。 logger.Error(ctx, "clientID: %s 读取时发生错误: %v", clientID, err) @@ -94,39 +98,34 @@ func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID stri logger.Error(ctx, "clientID: %s 读取时发生错误: %v", clientID, err) fmt.Printf("客户端 %s 读取时发生未知错误: %v\n", clientID, err) } - - break // 退出循环,断开连接 + cancel() + break } - // 客户端发送关闭帧时,msgType 可能是 websocket.CloseMessage,但通常在 ReadMessage 中被转换为错误。 - // 如果客户端主动发送了数据(非关闭帧),在这里继续处理 + // process normal message from client if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage { - fmt.Println(msgType) - // ... 处理正常接收到的消息 ... + logger.Info(ctx, "read normal message from client", "client_id", clientID, "msg", string(msgBytes)) } } } -// sendRealTimeDataStream 负责持续向客户端推送实时数据 -func sendRealTimeDataStream(conn *websocket.Conn, clientID string) { - fmt.Println(conn, clientID) - // ⚠️ 这是一个模拟的推送逻辑,您需要替换为您的实际数据源 - - // 模拟数据源 - // dataChannel := globalMonitorState.GetDataChannel(clientID) - - // 持续监听数据,并写入 WebSocket - // for data := range dataChannel { - // err := conn.WriteJSON(data) // 使用 WriteJSON 发送结构化数据 - // if err != nil { - // log.Printf("clientID: %s 写入数据失败: %v", clientID, err) - // break // 写入失败,断开连接 - // } - // } +// sendRealTimeDataStream define func to responsible for continuously pushing real-time data to the client +func sendRealTimeDataStream(conn *websocket.Conn, targetData network.RealTimePullTarget) error { + response := network.SuccessResponse{ + Code: 200, + Msg: "success", + PayLoad: network.RealTimePullPayload{ + Targets: []network.RealTimePullTarget{targetData}, + }, + } + return conn.WriteJSON(response) } // processTargetPolling define function to process target in monitor map and data is continuously retrieved from redis based on the target -func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID string, fanInChan chan []float64) { +func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID string, fanInChan chan network.RealTimePullTarget) { + // ensure the fanInChan will not leak + defer close(fanInChan) + stopChanMap := make(map[string]chan struct{}) s.mutex.RLock() @@ -137,8 +136,13 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s } for interval, componentItems := range config.components { - fmt.Println(componentItems) for _, target := range componentItems.targets { + // add a secondary check to prevent the target from already existing in the stopChanMap + if _, exists := stopChanMap[target]; exists { + logger.Warn(ctx, "target already exists in polling map, skipping start-up", "target", target) + continue + } + measurement, exist := componentItems.targetParam[target] if !exist { logger.Error(ctx, "can not found subscription node param into param map", "target", target) @@ -147,14 +151,20 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s queryGStopChan := make(chan struct{}) // store stop channel with target into map - // TODO 增加二次检查,首先判断target是否存在于stopChanMap中 stopChanMap[target] = queryGStopChan queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource) if err != nil { logger.Error(ctx, "generate measurement indentifier by data_source field failed", "data_source", measurement.DataSource, "error", err) continue } - go realTimeDataQueryFromRedis(ctx, queryKey, interval, int64(measurement.Size), fanInChan, queryGStopChan) + + config := redisPollingConfig{ + targetID: target, + queryKey: queryKey, + interval: interval, + dataSize: int64(measurement.Size), + } + go realTimeDataQueryFromRedis(ctx, config, fanInChan, queryGStopChan) } } s.mutex.RUnlock() @@ -197,10 +207,18 @@ func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) { return } -func realTimeDataQueryFromRedis(ctx context.Context, queryKey, interval string, dataSize int64, fanInChan chan []float64, stopChan chan struct{}) { - duration, err := time.ParseDuration(interval) +// redisPollingConfig define struct for param which query real time data from redis +type redisPollingConfig struct { + targetID string + queryKey string + interval string + dataSize int64 +} + +func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig, fanInChan chan network.RealTimePullTarget, stopChan chan struct{}) { + duration, err := time.ParseDuration(config.interval) if err != nil { - logger.Error(ctx, "failed to parse the time string", "interval", interval, "error", err) + logger.Error(ctx, "failed to parse the time string", "interval", config.interval, "error", err) return } ticker := time.NewTicker(duration * time.Second) @@ -212,15 +230,32 @@ func realTimeDataQueryFromRedis(ctx context.Context, queryKey, interval string, select { case <-ticker.C: stopTimestamp := util.GenNanoTsStr() - datas, err := client.QueryByZRangeByLex(ctx, queryKey, dataSize, startTimestamp, stopTimestamp) + members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize, startTimestamp, stopTimestamp) if err != nil { - logger.Error(ctx, "query real time data from redis failed", "key", queryKey, "error", err) + logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err) continue } // use end timestamp reset start timestamp startTimestamp = stopTimestamp - // TODO 考虑如果 fanInChan 阻塞,如何避免阻塞查询循环,是否可以丢弃数据或使用日志记录的方式进行填补 - fanInChan <- datas + + pullDatas := make([]network.RealTimePullData, 0, len(members)) + for _, member := range members { + pullDatas = append(pullDatas, network.RealTimePullData{ + Time: member.Member.(string), + Value: member.Score, + }) + } + targetData := network.RealTimePullTarget{ + ID: config.targetID, + Datas: pullDatas, + } + + select { + case fanInChan <- targetData: + default: + // TODO[BACKPRESSURE-ISSUE] 考虑 fanInChan 阻塞,当出现大量数据阻塞查询循环并丢弃时,采取背压方式解决问题 #1 + logger.Warn(ctx, "fanInChan is full, dropping real-time data frame", "key", config.queryKey, "data_size", len(members)) + } case <-stopChan: logger.Info(ctx, "stop the redis query goroutine via a singal") return diff --git a/network/real_time_data_request.go b/network/real_time_data_request.go index 960439a..2dc51ab 100644 --- a/network/real_time_data_request.go +++ b/network/real_time_data_request.go @@ -28,3 +28,21 @@ type RealTimeComponentItem struct { Interval string `json:"interval" example:"1" description:"数据采集的时间间隔(秒)"` Targets []string `json:"targets" example:"[\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms\",\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms\"]" description:"需要采集数据的测点或标签名称列表"` } + +// RealTimePullPayload define struct of pull real time data payload +type RealTimePullPayload struct { + // required: true + Targets []RealTimePullTarget `json:"targets" example:"{\"targets\":[{\"id\":\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms\",\"datas\":[{\"time\":1736305467506000000,\"value\":1},{\"time\":1736305467506000000,\"value\":1}]},{\"id\":\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms\",\"datas\":[{\"time\":1736305467506000000,\"value\":1},{\"time\":1736305467506000000,\"value\":1}]}]}" description:"实时数据"` +} + +// RealTimePullTarget define struct of pull real time data target +type RealTimePullTarget struct { + ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms" description:"实时数据ID值"` + Datas []RealTimePullData `json:"datas" example:"[{\"time\":1736305467506000000,\"value\":220},{\"time\":1736305467506000000,\"value\":220}]" description:"实时数据值数组"` +} + +// RealTimePullData define struct of pull real time data param +type RealTimePullData struct { + Time string `json:"time" example:"1736305467506000000" description:"实时数据时间戳"` + Value float64 `json:"value" example:"220" description:"实时数据值"` +}