modelRT/handler/real_time_data_pull.go

230 lines
7.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package handler provides HTTP handlers for various endpoints.
package handler
import (
"context"
"fmt"
"net/http"
"time"
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
"modelRT/model"
"modelRT/network"
"modelRT/util"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var pullUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(_ *http.Request) bool {
return true
},
}
// PullRealTimeDataHandler define real time data pull API
// @Summary 实时数据拉取 websocket api
// @Description 根据用户输入的clientID拉取对应的实时数据
// @Tags RealTime Component Websocket
// @Router /monitors/data/realtime/stream/:clientID [get]
func PullRealTimeDataHandler(c *gin.Context) {
clientID := c.Param("clientID")
if clientID == "" {
err := fmt.Errorf("clientID is missing from the path")
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
defer conn.Close()
ctx, cancel := context.WithCancel(c.Request.Context())
defer cancel()
// TODO 考虑数据量庞大时候channel的缓存大小问题
fanInChan := make(chan []float64, 10000)
go processTargetPolling(ctx, globalMonitorState, clientID, fanInChan)
// 主循环:负责接收客户端消息(如心跳包、停止拉取命令等)
go readClientMessages(ctx, conn, clientID)
for {
select {
case value, ok := <-fanInChan:
// 从扇入通道拿去数据后将数据写入websocket 返回流中
sendRealTimeDataStream(conn, clientID)
fmt.Println(value, ok)
default:
fmt.Println("default")
}
}
}
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string) {
// conn.SetReadLimit(512)
for {
msgType, _, err := conn.ReadMessage()
if err != nil {
// **【核心逻辑】判断是否为客户端主动正常关闭**
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
fmt.Printf("客户端 %s 主动正常关闭连接 (Code 1000)。\n", clientID)
// 客户端主动发起的正常关闭,这是最清晰的主动退出信号。
} else if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
// 这是您代码中已有的逻辑,用于处理非正常或服务器离开的关闭(如网络中断、浏览器关闭 Tab 但未发送关闭帧)。
logger.Error(ctx, "clientID: %s 读取时发生错误: %v", clientID, err)
} else {
// 处理其他读取错误(如 I/O 错误)
logger.Error(ctx, "clientID: %s 读取时发生错误: %v", clientID, err)
fmt.Printf("客户端 %s 读取时发生未知错误: %v\n", clientID, err)
}
break // 退出循环,断开连接
}
// 客户端发送关闭帧时msgType 可能是 websocket.CloseMessage但通常在 ReadMessage 中被转换为错误。
// 如果客户端主动发送了数据(非关闭帧),在这里继续处理
if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage {
fmt.Println(msgType)
// ... 处理正常接收到的消息 ...
}
}
}
// sendRealTimeDataStream 负责持续向客户端推送实时数据
func sendRealTimeDataStream(conn *websocket.Conn, clientID string) {
fmt.Println(conn, clientID)
// ⚠️ 这是一个模拟的推送逻辑,您需要替换为您的实际数据源
// 模拟数据源
// dataChannel := globalMonitorState.GetDataChannel(clientID)
// 持续监听数据,并写入 WebSocket
// for data := range dataChannel {
// err := conn.WriteJSON(data) // 使用 WriteJSON 发送结构化数据
// if err != nil {
// log.Printf("clientID: %s 写入数据失败: %v", clientID, err)
// break // 写入失败,断开连接
// }
// }
}
// processTargetPolling define function to process target in monitor map and data is continuously retrieved from redis based on the target
func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID string, fanInChan chan []float64) {
stopChanMap := make(map[string]chan struct{})
s.mutex.RLock()
config, confExist := s.monitorMap[clientID]
if !confExist {
logger.Error(ctx, "can not found config into local stored map by clientID", "clientID", clientID)
return
}
for interval, componentItems := range config.components {
fmt.Println(componentItems)
for _, target := range componentItems.targets {
measurement, exist := componentItems.targetParam[target]
if !exist {
logger.Error(ctx, "can not found subscription node param into param map", "target", target)
continue
}
queryGStopChan := make(chan struct{})
// store stop channel with target into map
// TODO 增加二次检查首先判断target是否存在于stopChanMap中
stopChanMap[target] = queryGStopChan
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
if err != nil {
logger.Error(ctx, "generate measurement indentifier by data_source field failed", "data_source", measurement.DataSource, "error", err)
continue
}
go realTimeDataQueryFromRedis(ctx, queryKey, interval, int64(measurement.Size), fanInChan, queryGStopChan)
}
}
s.mutex.RUnlock()
for {
select {
case transportTargets, ok := <-config.noticeChan:
if !ok {
logger.Error(ctx, "notice channel was closed unexpectedly", "clientID", clientID)
stopAllPolling(ctx, stopChanMap)
return
}
switch transportTargets.OperationType {
case constants.OpAppend:
// TODO 考虑精细化锁结果将RW锁置于ClientID层面之下
s.mutex.Lock()
defer s.mutex.Unlock()
// TODO 增加 append 函数调用
fmt.Println(transportTargets.Targets)
case constants.OpRemove:
s.mutex.Lock()
defer s.mutex.Unlock()
// TODO 增加 remove 函数调用
fmt.Println(transportTargets.Targets)
}
case <-ctx.Done():
logger.Info(ctx, fmt.Sprintf("stop all data retrieval goroutines under this clientID:%s", clientID))
stopAllPolling(ctx, stopChanMap)
return
}
}
}
func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) {
for target, stopChan := range stopChanMap {
logger.Info(ctx, fmt.Sprintf("stop the data fetching behavior for the corresponding target:%s", target))
close(stopChan)
}
clear(stopChanMap)
return
}
func realTimeDataQueryFromRedis(ctx context.Context, queryKey, interval string, dataSize int64, fanInChan chan []float64, stopChan chan struct{}) {
duration, err := time.ParseDuration(interval)
if err != nil {
logger.Error(ctx, "failed to parse the time string", "interval", interval, "error", err)
return
}
ticker := time.NewTicker(duration * time.Second)
defer ticker.Stop()
client := diagram.NewRedisClient()
startTimestamp := util.GenNanoTsStr()
for {
select {
case <-ticker.C:
stopTimestamp := util.GenNanoTsStr()
datas, err := client.QueryByZRangeByLex(ctx, queryKey, dataSize, startTimestamp, stopTimestamp)
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", queryKey, "error", err)
continue
}
// use end timestamp reset start timestamp
startTimestamp = stopTimestamp
// TODO 考虑如果 fanInChan 阻塞,如何避免阻塞查询循环,是否可以丢弃数据或使用日志记录的方式进行填补
fanInChan <- datas
case <-stopChan:
logger.Info(ctx, "stop the redis query goroutine via a singal")
return
}
}
}