modelRT/handler/real_time_data_pull.go

207 lines
6.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package handler provides HTTP handlers for various endpoints.
package handler
import (
"context"
"fmt"
"net/http"
"time"
"modelRT/logger"
"modelRT/network"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var pullUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(_ *http.Request) bool {
return true
},
}
// PullRealTimeDataHandler define real time data pull API
// @Summary 实时数据拉取 websocket api
// @Description 根据用户输入的clientID拉取对应的实时数据
// @Tags RealTime Component Websocket
// @Router /monitors/data/realtime/stream/:clientID [get]
func PullRealTimeDataHandler(c *gin.Context) {
clientID := c.Param("clientID")
if clientID == "" {
err := fmt.Errorf("clientID is missing from the path")
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
defer conn.Close()
ctx, cancel := context.WithCancel(c.Request.Context())
defer cancel()
fanInChan := make(chan []float64)
go processTargetPolling(ctx, globalMonitorState, clientID, fanInChan)
// 主循环:负责接收客户端消息(如心跳包、停止拉取命令等)
go readClientMessages(ctx, conn, clientID)
for {
select {
case value, ok := <-fanInChan:
// 从扇入通道拿去数据后将数据写入websocket 返回流中
sendRealTimeDataStream(conn, clientID)
fmt.Println(value, ok)
default:
fmt.Println("default")
}
}
}
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string) {
// conn.SetReadLimit(512)
for {
msgType, _, err := conn.ReadMessage()
if err != nil {
// **【核心逻辑】判断是否为客户端主动正常关闭**
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
fmt.Printf("客户端 %s 主动正常关闭连接 (Code 1000)。\n", clientID)
// 客户端主动发起的正常关闭,这是最清晰的主动退出信号。
} else if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
// 这是您代码中已有的逻辑,用于处理非正常或服务器离开的关闭(如网络中断、浏览器关闭 Tab 但未发送关闭帧)。
logger.Error(ctx, "clientID: %s 读取时发生错误: %v", clientID, err)
} else {
// 处理其他读取错误(如 I/O 错误)
logger.Error(ctx, "clientID: %s 读取时发生错误: %v", clientID, err)
fmt.Printf("客户端 %s 读取时发生未知错误: %v\n", clientID, err)
}
break // 退出循环,断开连接
}
// 客户端发送关闭帧时msgType 可能是 websocket.CloseMessage但通常在 ReadMessage 中被转换为错误。
// 如果客户端主动发送了数据(非关闭帧),在这里继续处理
if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage {
fmt.Println(msgType)
// ... 处理正常接收到的消息 ...
}
}
}
// sendRealTimeDataStream 负责持续向客户端推送实时数据
func sendRealTimeDataStream(conn *websocket.Conn, clientID string) {
fmt.Println(conn, clientID)
// ⚠️ 这是一个模拟的推送逻辑,您需要替换为您的实际数据源
// 模拟数据源
// dataChannel := globalMonitorState.GetDataChannel(clientID)
// 持续监听数据,并写入 WebSocket
// for data := range dataChannel {
// err := conn.WriteJSON(data) // 使用 WriteJSON 发送结构化数据
// if err != nil {
// log.Printf("clientID: %s 写入数据失败: %v", clientID, err)
// break // 写入失败,断开连接
// }
// }
}
// processTargetPolling define function to process target in monitor map and data is continuously retrieved from redis based on the target
func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID string, fanInChan chan []float64) {
stopChanMap := make(map[string]chan struct{})
s.mutex.RLock()
config, confExist := s.monitorMap[clientID]
if !confExist {
logger.Error(ctx, "can not found config into local stored map by clientID", "clientID", clientID)
return
}
for interval, componentItems := range config.components {
fmt.Println(componentItems)
for _, target := range componentItems.targets {
dataSize, exist := componentItems.targetParam[target]
if !exist {
logger.Error(ctx, "can not found subscription node param into param map", "target", target)
continue
}
stopChan := make(chan struct{})
// store stop channel with target into map
stopChanMap[target] = stopChan
go realTimeDataQueryFromRedis(ctx, interval, dataSize, fanInChan, stopChan)
}
}
s.mutex.RUnlock()
// TODO 持续监听noticeChannel
for {
select {
case noticeValue, ok := <-config.noticeChan:
if !ok {
logger.Error(ctx, "notice channel was closed unexpectedly", "clientID", clientID)
stopAllPolling(ctx, stopChanMap)
return
}
// TODO 判断传递数据类型,决定是调用新增函数或者移除函数
fmt.Println(noticeValue, ok)
case <-ctx.Done():
logger.Info(ctx, fmt.Sprintf("stop all data retrieval goroutines under this clientID:%s", clientID))
stopAllPolling(ctx, stopChanMap)
return
}
}
}
func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) {
for target, stopChan := range stopChanMap {
logger.Info(ctx, fmt.Sprintf("stop the data fetching behavior for the corresponding target:%s", target))
close(stopChan)
}
clear(stopChanMap)
return
}
// TODO 根据Measure表 datasource 字段从 redis 查询信息
func realTimeDataQueryFromRedis(ctx context.Context, interval string, dataSize int, fanInChan chan []float64, stopChan chan struct{}) {
duration, err := time.ParseDuration(interval)
if err != nil {
logger.Error(ctx, "failed to parse the time string", "interval", interval, "error", err)
return
}
ticker := time.NewTicker(duration * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// TODO 添加 redis 查询逻辑
result := make([]float64, dataSize)
select {
case fanInChan <- result:
default:
// TODO 考虑如果 fanInChan 阻塞,避免阻塞查询循环,可以丢弃数据或记录日志
// logger.Warning("Fan-in channel is full, skipping data push.")
}
case <-stopChan:
// TODO 优化日志输出
logger.Info(ctx, "redis query goroutine have stopped")
return
}
}
}