modelRT/handler/real_time_data_pull.go

265 lines
8.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package handler provides HTTP handlers for various endpoints.
package handler
import (
"context"
"fmt"
"net/http"
"time"
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
"modelRT/model"
"modelRT/network"
"modelRT/util"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var pullUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(_ *http.Request) bool {
return true
},
}
// PullRealTimeDataHandler define real time data pull API
// @Summary 实时数据拉取 websocket api
// @Description 根据用户输入的clientID拉取对应的实时数据
// @Tags RealTime Component Websocket
// @Router /monitors/data/realtime/stream/:clientID [get]
func PullRealTimeDataHandler(c *gin.Context) {
clientID := c.Param("clientID")
if clientID == "" {
err := fmt.Errorf("clientID is missing from the path")
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
defer conn.Close()
ctx, cancel := context.WithCancel(c.Request.Context())
defer cancel()
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
fanInChan := make(chan network.RealTimePullTarget, 10000)
go processTargetPolling(ctx, globalMonitorState, clientID, fanInChan)
go readClientMessages(ctx, conn, clientID, cancel)
for {
select {
case targetData, ok := <-fanInChan:
if !ok {
logger.Error(ctx, "fanInChan closed unexpectedly", "clientID", clientID)
return
}
// TODO 考虑后续将多个 targets 聚合发送
err := sendRealTimeDataStream(conn, targetData)
if err != nil {
logger.Error(nil, "clientID: %s 写入数据失败: %v", clientID, err)
}
default:
// TODO[BACKPRESSURE-ISSUE] 考虑在此使用双重背压方式解决阻塞问题 #1
logger.Warn(ctx, "Write channel full, dropping data for slow client.")
}
}
}
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) {
// conn.SetReadLimit(512)
for {
msgType, msgBytes, err := conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
logger.Info(ctx, "client actively and normally closed the connection", "client_id", clientID)
} else if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
// 这是您代码中已有的逻辑,用于处理非正常或服务器离开的关闭(如网络中断、浏览器关闭 Tab 但未发送关闭帧)。
logger.Error(ctx, "clientID: %s 读取时发生错误: %v", clientID, err)
} else {
// 处理其他读取错误(如 I/O 错误)
logger.Error(ctx, "clientID: %s 读取时发生错误: %v", clientID, err)
fmt.Printf("客户端 %s 读取时发生未知错误: %v\n", clientID, err)
}
cancel()
break
}
// process normal message from client
if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage {
logger.Info(ctx, "read normal message from client", "client_id", clientID, "msg", string(msgBytes))
}
}
}
// sendRealTimeDataStream define func to responsible for continuously pushing real-time data to the client
func sendRealTimeDataStream(conn *websocket.Conn, targetData network.RealTimePullTarget) error {
response := network.SuccessResponse{
Code: 200,
Msg: "success",
PayLoad: network.RealTimePullPayload{
Targets: []network.RealTimePullTarget{targetData},
},
}
return conn.WriteJSON(response)
}
// processTargetPolling define function to process target in monitor map and data is continuously retrieved from redis based on the target
func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID string, fanInChan chan network.RealTimePullTarget) {
// ensure the fanInChan will not leak
defer close(fanInChan)
stopChanMap := make(map[string]chan struct{})
s.mutex.RLock()
config, confExist := s.monitorMap[clientID]
if !confExist {
logger.Error(ctx, "can not found config into local stored map by clientID", "clientID", clientID)
return
}
for interval, componentItems := range config.components {
for _, target := range componentItems.targets {
// add a secondary check to prevent the target from already existing in the stopChanMap
if _, exists := stopChanMap[target]; exists {
logger.Warn(ctx, "target already exists in polling map, skipping start-up", "target", target)
continue
}
measurement, exist := componentItems.targetParam[target]
if !exist {
logger.Error(ctx, "can not found subscription node param into param map", "target", target)
continue
}
queryGStopChan := make(chan struct{})
// store stop channel with target into map
stopChanMap[target] = queryGStopChan
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
if err != nil {
logger.Error(ctx, "generate measurement indentifier by data_source field failed", "data_source", measurement.DataSource, "error", err)
continue
}
config := redisPollingConfig{
targetID: target,
queryKey: queryKey,
interval: interval,
dataSize: int64(measurement.Size),
}
go realTimeDataQueryFromRedis(ctx, config, fanInChan, queryGStopChan)
}
}
s.mutex.RUnlock()
for {
select {
case transportTargets, ok := <-config.noticeChan:
if !ok {
logger.Error(ctx, "notice channel was closed unexpectedly", "clientID", clientID)
stopAllPolling(ctx, stopChanMap)
return
}
switch transportTargets.OperationType {
case constants.OpAppend:
// TODO 考虑精细化锁结果将RW锁置于ClientID层面之下
s.mutex.Lock()
defer s.mutex.Unlock()
// TODO 增加 append 函数调用
fmt.Println(transportTargets.Targets)
case constants.OpRemove:
s.mutex.Lock()
defer s.mutex.Unlock()
// TODO 增加 remove 函数调用
fmt.Println(transportTargets.Targets)
}
case <-ctx.Done():
logger.Info(ctx, fmt.Sprintf("stop all data retrieval goroutines under this clientID:%s", clientID))
stopAllPolling(ctx, stopChanMap)
return
}
}
}
func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) {
for target, stopChan := range stopChanMap {
logger.Info(ctx, fmt.Sprintf("stop the data fetching behavior for the corresponding target:%s", target))
close(stopChan)
}
clear(stopChanMap)
return
}
// redisPollingConfig define struct for param which query real time data from redis
type redisPollingConfig struct {
targetID string
queryKey string
interval string
dataSize int64
}
func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig, fanInChan chan network.RealTimePullTarget, stopChan chan struct{}) {
duration, err := time.ParseDuration(config.interval)
if err != nil {
logger.Error(ctx, "failed to parse the time string", "interval", config.interval, "error", err)
return
}
ticker := time.NewTicker(duration * time.Second)
defer ticker.Stop()
client := diagram.NewRedisClient()
startTimestamp := util.GenNanoTsStr()
for {
select {
case <-ticker.C:
stopTimestamp := util.GenNanoTsStr()
members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize, startTimestamp, stopTimestamp)
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err)
continue
}
// use end timestamp reset start timestamp
startTimestamp = stopTimestamp
pullDatas := make([]network.RealTimePullData, 0, len(members))
for _, member := range members {
pullDatas = append(pullDatas, network.RealTimePullData{
Time: member.Member.(string),
Value: member.Score,
})
}
targetData := network.RealTimePullTarget{
ID: config.targetID,
Datas: pullDatas,
}
select {
case fanInChan <- targetData:
default:
// TODO[BACKPRESSURE-ISSUE] 考虑 fanInChan 阻塞,当出现大量数据阻塞查询循环并丢弃时,采取背压方式解决问题 #1
logger.Warn(ctx, "fanInChan is full, dropping real-time data frame", "key", config.queryKey, "data_size", len(members))
}
case <-stopChan:
logger.Info(ctx, "stop the redis query goroutine via a singal")
return
}
}
}