2025-11-08 17:11:07 +08:00
// Package handler provides HTTP handlers for various endpoints.
package handler
import (
"context"
"fmt"
"net/http"
2025-11-25 16:13:55 +08:00
"sort"
"strconv"
2025-11-08 17:11:07 +08:00
"time"
2025-11-10 17:32:18 +08:00
"modelRT/constants"
"modelRT/diagram"
2025-11-08 17:11:07 +08:00
"modelRT/logger"
2025-11-10 17:32:18 +08:00
"modelRT/model"
2025-11-08 17:11:07 +08:00
"modelRT/network"
2025-11-10 17:32:18 +08:00
"modelRT/util"
2025-11-08 17:11:07 +08:00
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var pullUpgrader = websocket . Upgrader {
ReadBufferSize : 1024 ,
WriteBufferSize : 1024 ,
CheckOrigin : func ( _ * http . Request ) bool {
return true
} ,
}
// PullRealTimeDataHandler define real time data pull API
// @Summary 实时数据拉取 websocket api
// @Description 根据用户输入的clientID拉取对应的实时数据
// @Tags RealTime Component Websocket
// @Router /monitors/data/realtime/stream/:clientID [get]
func PullRealTimeDataHandler ( c * gin . Context ) {
clientID := c . Param ( "clientID" )
if clientID == "" {
err := fmt . Errorf ( "clientID is missing from the path" )
logger . Error ( c , "query clientID from path failed" , "error" , err , "url" , c . Request . RequestURI )
c . JSON ( http . StatusOK , network . FailureResponse {
Code : http . StatusBadRequest ,
Msg : err . Error ( ) ,
} )
return
}
conn , err := pullUpgrader . Upgrade ( c . Writer , c . Request , nil )
if err != nil {
logger . Error ( c , "upgrade http protocol to websocket protocal failed" , "error" , err )
c . JSON ( http . StatusOK , network . FailureResponse {
Code : http . StatusBadRequest ,
Msg : err . Error ( ) ,
} )
return
}
defer conn . Close ( )
ctx , cancel := context . WithCancel ( c . Request . Context ( ) )
defer cancel ( )
2025-11-10 17:32:18 +08:00
2025-11-11 11:50:25 +08:00
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
fanInChan := make ( chan network . RealTimePullTarget , 10000 )
2025-11-13 11:48:26 +08:00
go processTargetPolling ( ctx , globalSubState , clientID , fanInChan )
2025-11-11 11:50:25 +08:00
go readClientMessages ( ctx , conn , clientID , cancel )
2025-11-08 17:11:07 +08:00
2025-11-12 17:34:18 +08:00
bufferMaxSize := constants . SendMaxBatchSize
sendMaxInterval := constants . SendMaxBatchInterval
buffer := make ( [ ] network . RealTimePullTarget , 0 , bufferMaxSize )
ticker := time . NewTicker ( sendMaxInterval )
defer ticker . Stop ( )
2025-11-08 17:11:07 +08:00
for {
select {
2025-11-11 11:50:25 +08:00
case targetData , ok := <- fanInChan :
if ! ok {
2025-11-12 17:34:18 +08:00
logger . Error ( ctx , "fanInChan closed unexpectedly" , "client_id" , clientID )
2025-11-11 11:50:25 +08:00
return
}
2025-11-12 17:34:18 +08:00
buffer = append ( buffer , targetData )
if len ( buffer ) >= bufferMaxSize {
// buffer is full, send immediately
if err := sendAggregateRealTimeDataStream ( conn , buffer ) ; err != nil {
2025-11-26 17:49:24 +08:00
logger . Error ( ctx , "when buffer is full, send the real time aggregate data failed" , "client_id" , clientID , "buffer" , buffer , "error" , err )
2025-11-12 17:34:18 +08:00
return
}
// reset buffer
buffer = make ( [ ] network . RealTimePullTarget , 0 , bufferMaxSize )
// reset the ticker to prevent it from triggering immediately after the ticker is sent
ticker . Reset ( sendMaxInterval )
}
case <- ticker . C :
if len ( buffer ) > 0 {
// when the ticker is triggered, all data in the send buffer is sent
if err := sendAggregateRealTimeDataStream ( conn , buffer ) ; err != nil {
2025-11-26 17:49:24 +08:00
logger . Error ( ctx , "when the ticker is triggered, send the real time aggregate data failed" , "client_id" , clientID , "buffer" , buffer , "error" , err )
2025-11-12 17:34:18 +08:00
return
}
// reset buffer
buffer = make ( [ ] network . RealTimePullTarget , 0 , bufferMaxSize )
2025-11-11 11:50:25 +08:00
}
2025-11-12 17:34:18 +08:00
case <- ctx . Done ( ) :
// send the last remaining data
if err := sendAggregateRealTimeDataStream ( conn , buffer ) ; err != nil {
2025-11-26 17:49:24 +08:00
logger . Error ( ctx , "send the last remaining data failed" , "client_id" , clientID , "buffer" , buffer , "error" , err )
2025-11-12 17:34:18 +08:00
}
logger . Info ( ctx , "PullRealTimeDataHandler exiting as context is done." , "client_id" , clientID )
return
2025-11-08 17:11:07 +08:00
}
}
}
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
2025-11-11 11:50:25 +08:00
func readClientMessages ( ctx context . Context , conn * websocket . Conn , clientID string , cancel context . CancelFunc ) {
2025-11-08 17:11:07 +08:00
// conn.SetReadLimit(512)
for {
2025-11-11 11:50:25 +08:00
msgType , msgBytes , err := conn . ReadMessage ( )
2025-11-08 17:11:07 +08:00
if err != nil {
if websocket . IsCloseError ( err , websocket . CloseNormalClosure ) {
2025-11-11 11:50:25 +08:00
logger . Info ( ctx , "client actively and normally closed the connection" , "client_id" , clientID )
2025-11-08 17:11:07 +08:00
} else if websocket . IsUnexpectedCloseError ( err , websocket . CloseGoingAway , websocket . CloseAbnormalClosure ) {
2025-11-11 17:45:36 +08:00
logger . Error ( ctx , "an unexpected error occurred while reading the webSocket connection" , "client_id" , clientID , "error" , err )
2025-11-08 17:11:07 +08:00
} else {
2025-11-11 17:45:36 +08:00
// handle other read errors (eg, I/O errors)
logger . Error ( ctx , "an error occurred while reading the webSocket connection" , "client_id" , clientID , "error" , err )
2025-11-08 17:11:07 +08:00
}
2025-11-11 11:50:25 +08:00
cancel ( )
break
2025-11-08 17:11:07 +08:00
}
2025-11-11 11:50:25 +08:00
// process normal message from client
2025-11-08 17:11:07 +08:00
if msgType == websocket . TextMessage || msgType == websocket . BinaryMessage {
2025-11-25 16:13:55 +08:00
logger . Info ( ctx , "read normal message from client" , "client_id" , clientID , "content" , string ( msgBytes ) )
2025-11-08 17:11:07 +08:00
}
}
}
2025-11-12 17:34:18 +08:00
// sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client
func sendAggregateRealTimeDataStream ( conn * websocket . Conn , targetsData [ ] network . RealTimePullTarget ) error {
if len ( targetsData ) == 0 {
return nil
}
2025-11-11 11:50:25 +08:00
response := network . SuccessResponse {
Code : 200 ,
Msg : "success" ,
2025-11-12 17:34:18 +08:00
Payload : network . RealTimePullPayload {
Targets : targetsData ,
2025-11-11 11:50:25 +08:00
} ,
}
return conn . WriteJSON ( response )
2025-11-08 17:11:07 +08:00
}
2025-11-13 17:29:49 +08:00
// processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target
func processTargetPolling ( ctx context . Context , s * SharedSubState , clientID string , fanInChan chan network . RealTimePullTarget ) {
2025-11-11 11:50:25 +08:00
// ensure the fanInChan will not leak
defer close ( fanInChan )
2025-11-25 16:13:55 +08:00
logger . Info ( ctx , fmt . Sprintf ( "start processing real time data polling for clientID:%s" , clientID ) )
2025-11-08 17:11:07 +08:00
stopChanMap := make ( map [ string ] chan struct { } )
2025-11-11 17:37:06 +08:00
s . globalMutex . RLock ( )
2025-11-13 11:48:26 +08:00
config , confExist := s . subMap [ clientID ]
2025-11-08 17:11:07 +08:00
if ! confExist {
logger . Error ( ctx , "can not found config into local stored map by clientID" , "clientID" , clientID )
2025-11-11 17:37:06 +08:00
s . globalMutex . RUnlock ( )
2025-11-08 17:11:07 +08:00
return
}
2025-11-11 17:37:06 +08:00
s . globalMutex . RUnlock ( )
2025-11-08 17:11:07 +08:00
2025-11-27 16:59:03 +08:00
logger . Info ( ctx , fmt . Sprintf ( "found subscription config for clientID:%s, start initial polling goroutines" , clientID ) , "components len" , config . measurements )
2025-11-25 16:13:55 +08:00
2025-11-11 17:37:06 +08:00
config . mutex . RLock ( )
2025-11-26 17:49:24 +08:00
for interval , measurementTargets := range config . measurements {
for _ , target := range measurementTargets {
2025-11-11 11:50:25 +08:00
// add a secondary check to prevent the target from already existing in the stopChanMap
if _ , exists := stopChanMap [ target ] ; exists {
logger . Warn ( ctx , "target already exists in polling map, skipping start-up" , "target" , target )
continue
}
2025-11-26 17:49:24 +08:00
targetContext , exist := config . targetContext [ target ]
2025-11-08 17:11:07 +08:00
if ! exist {
logger . Error ( ctx , "can not found subscription node param into param map" , "target" , target )
continue
}
2025-11-26 17:49:24 +08:00
measurementInfo := targetContext . measurement
2025-11-08 17:11:07 +08:00
2025-11-10 17:32:18 +08:00
queryGStopChan := make ( chan struct { } )
2025-11-08 17:11:07 +08:00
// store stop channel with target into map
2025-11-10 17:32:18 +08:00
stopChanMap [ target ] = queryGStopChan
2025-11-26 17:49:24 +08:00
queryKey , err := model . GenerateMeasureIdentifier ( measurementInfo . DataSource )
2025-11-10 17:32:18 +08:00
if err != nil {
2025-11-26 17:49:24 +08:00
logger . Error ( ctx , "generate measurement indentifier by data_source field failed" , "data_source" , measurementInfo . DataSource , "error" , err )
2025-11-10 17:32:18 +08:00
continue
}
2025-11-11 11:50:25 +08:00
2025-11-11 17:37:06 +08:00
pollingConfig := redisPollingConfig {
2025-11-11 11:50:25 +08:00
targetID : target ,
queryKey : queryKey ,
interval : interval ,
2025-11-26 17:49:24 +08:00
dataSize : int64 ( measurementInfo . Size ) ,
2025-11-11 11:50:25 +08:00
}
2025-11-11 17:37:06 +08:00
go realTimeDataQueryFromRedis ( ctx , pollingConfig , fanInChan , queryGStopChan )
2025-11-08 17:11:07 +08:00
}
}
2025-11-11 17:37:06 +08:00
config . mutex . RUnlock ( )
2025-11-08 17:11:07 +08:00
for {
select {
2025-11-10 17:32:18 +08:00
case transportTargets , ok := <- config . noticeChan :
2025-11-08 17:11:07 +08:00
if ! ok {
logger . Error ( ctx , "notice channel was closed unexpectedly" , "clientID" , clientID )
stopAllPolling ( ctx , stopChanMap )
return
}
2025-11-11 17:37:06 +08:00
config . mutex . Lock ( )
2025-11-10 17:32:18 +08:00
switch transportTargets . OperationType {
case constants . OpAppend :
2025-11-11 17:37:06 +08:00
appendTargets ( ctx , config , stopChanMap , fanInChan , transportTargets . Targets )
2025-11-10 17:32:18 +08:00
case constants . OpRemove :
2025-11-27 16:59:03 +08:00
removeTargets ( ctx , stopChanMap , transportTargets . Targets )
2025-12-02 17:26:15 +08:00
case constants . OpUpdate :
// TODO 处理更新操作
fmt . Println ( 11111 )
2025-11-10 17:32:18 +08:00
}
2025-11-11 17:37:06 +08:00
config . mutex . Unlock ( )
2025-11-08 17:11:07 +08:00
case <- ctx . Done ( ) :
logger . Info ( ctx , fmt . Sprintf ( "stop all data retrieval goroutines under this clientID:%s" , clientID ) )
stopAllPolling ( ctx , stopChanMap )
return
}
}
}
2025-11-11 17:37:06 +08:00
// appendTargets starts new polling goroutines for targets that were just added
2025-11-13 17:29:49 +08:00
func appendTargets ( ctx context . Context , config * RealTimeSubConfig , stopChanMap map [ string ] chan struct { } , fanInChan chan network . RealTimePullTarget , appendTargets [ ] string ) {
2025-11-26 17:49:24 +08:00
appendTargetsSet := make ( map [ string ] struct { } , len ( appendTargets ) )
2025-11-11 17:37:06 +08:00
for _ , target := range appendTargets {
2025-11-26 17:49:24 +08:00
appendTargetsSet [ target ] = struct { } { }
2025-11-11 17:37:06 +08:00
}
2025-11-26 17:49:24 +08:00
for _ , target := range appendTargets {
targetContext , exists := config . targetContext [ target ]
2025-11-27 16:59:03 +08:00
if ! exists {
logger . Error ( ctx , "the append target does not exists in the real time data config context map,skipping the startup step" , "target" , target )
2025-11-26 17:49:24 +08:00
continue
}
2025-11-11 17:37:06 +08:00
2025-11-26 17:49:24 +08:00
if _ , exists := stopChanMap [ target ] ; exists {
2025-11-27 16:59:03 +08:00
logger . Error ( ctx , "the append target already has a stop channel, skipping the startup step" , "target" , target )
2025-11-26 17:49:24 +08:00
continue
}
2025-11-11 17:37:06 +08:00
2025-11-26 17:49:24 +08:00
queryGStopChan := make ( chan struct { } )
stopChanMap [ target ] = queryGStopChan
2025-11-11 17:37:06 +08:00
2025-11-26 17:49:24 +08:00
interval := targetContext . interval
measurementTargets , ok := config . measurements [ interval ]
if ! ok {
logger . Error ( ctx , "targetContext exists but measurements is missing, cannot update config" , "target" , target , "interval" , interval )
continue
}
measurementTargets = append ( measurementTargets , target )
config . targetContext [ target ] = targetContext
delete ( appendTargetsSet , target )
2025-11-11 17:37:06 +08:00
2025-11-26 17:49:24 +08:00
queryKey , err := model . GenerateMeasureIdentifier ( targetContext . measurement . DataSource )
if err != nil {
logger . Error ( ctx , "the append target generate redis query key identifier failed" , "target" , target , "error" , err )
continue
}
pollingConfig := redisPollingConfig {
targetID : target ,
queryKey : queryKey ,
interval : targetContext . interval ,
dataSize : int64 ( targetContext . measurement . Size ) ,
2025-11-11 17:37:06 +08:00
}
2025-11-26 17:49:24 +08:00
go realTimeDataQueryFromRedis ( ctx , pollingConfig , fanInChan , queryGStopChan )
logger . Info ( ctx , "started new polling goroutine for appended target" , "target" , target , "interval" , targetContext . interval )
2025-11-11 17:37:06 +08:00
}
2025-11-26 17:49:24 +08:00
allKeys := util . GetKeysFromSet ( appendTargetsSet )
if len ( allKeys ) > 0 {
logger . Warn ( ctx , fmt . Sprintf ( "the following targets:%v start up fetch real time data process goroutine not started" , allKeys ) )
clear ( appendTargetsSet )
2025-11-11 17:37:06 +08:00
}
}
// removeTargets define func to stops running polling goroutines for targets that were removed
2025-11-27 16:59:03 +08:00
func removeTargets ( ctx context . Context , stopChanMap map [ string ] chan struct { } , removeTargets [ ] string ) {
2025-11-26 17:49:24 +08:00
for _ , target := range removeTargets {
2025-11-11 17:37:06 +08:00
stopChan , exists := stopChanMap [ target ]
if ! exists {
2025-11-26 17:49:24 +08:00
logger . Warn ( ctx , "removeTarget was not running, skipping remove operation" , "target" , target )
2025-11-11 17:37:06 +08:00
continue
}
close ( stopChan )
delete ( stopChanMap , target )
logger . Info ( ctx , "stopped polling goroutine for removed target" , "target" , target )
}
}
// stopAllPolling stops all running query goroutines for a specific client
2025-11-08 17:11:07 +08:00
func stopAllPolling ( ctx context . Context , stopChanMap map [ string ] chan struct { } ) {
for target , stopChan := range stopChanMap {
logger . Info ( ctx , fmt . Sprintf ( "stop the data fetching behavior for the corresponding target:%s" , target ) )
close ( stopChan )
}
clear ( stopChanMap )
return
}
2025-11-11 11:50:25 +08:00
// redisPollingConfig define struct for param which query real time data from redis
type redisPollingConfig struct {
targetID string
queryKey string
interval string
dataSize int64
}
func realTimeDataQueryFromRedis ( ctx context . Context , config redisPollingConfig , fanInChan chan network . RealTimePullTarget , stopChan chan struct { } ) {
2025-11-25 16:13:55 +08:00
logger . Info ( ctx , "start a redis query goroutine for real time data pulling" , "targetID" , config . targetID , "queryKey" , config . queryKey , "interval" , config . interval , "dataSize" , config . dataSize )
2025-11-11 11:50:25 +08:00
duration , err := time . ParseDuration ( config . interval )
2025-11-08 17:11:07 +08:00
if err != nil {
2025-11-11 11:50:25 +08:00
logger . Error ( ctx , "failed to parse the time string" , "interval" , config . interval , "error" , err )
2025-11-08 17:11:07 +08:00
return
}
2025-11-25 16:13:55 +08:00
ticker := time . NewTicker ( duration )
2025-11-08 17:11:07 +08:00
defer ticker . Stop ( )
2025-11-10 17:32:18 +08:00
client := diagram . NewRedisClient ( )
2025-11-25 16:13:55 +08:00
needPerformQuery := true
2025-11-08 17:11:07 +08:00
for {
2025-11-25 16:13:55 +08:00
if needPerformQuery {
performQuery ( ctx , client , config , fanInChan )
needPerformQuery = false
}
2025-11-08 17:11:07 +08:00
select {
case <- ticker . C :
2025-11-25 16:13:55 +08:00
needPerformQuery = true
2025-11-08 17:11:07 +08:00
case <- stopChan :
2025-11-10 17:32:18 +08:00
logger . Info ( ctx , "stop the redis query goroutine via a singal" )
2025-11-08 17:11:07 +08:00
return
}
}
}
2025-11-25 16:13:55 +08:00
func performQuery ( ctx context . Context , client * diagram . RedisClient , config redisPollingConfig , fanInChan chan network . RealTimePullTarget ) {
members , err := client . QueryByZRangeByLex ( ctx , config . queryKey , config . dataSize )
if err != nil {
logger . Error ( ctx , "query real time data from redis failed" , "key" , config . queryKey , "error" , err )
return
}
pullDatas := make ( [ ] network . RealTimePullData , 0 , len ( members ) )
for _ , member := range members {
pullDatas = append ( pullDatas , network . RealTimePullData {
Time : member . Member . ( string ) ,
Value : member . Score ,
} )
}
sortPullDataByTimeAscending ( ctx , pullDatas )
targetData := network . RealTimePullTarget {
ID : config . targetID ,
Datas : pullDatas ,
}
select {
case fanInChan <- targetData :
default :
// TODO[BACKPRESSURE-ISSUE] 考虑 fanInChan 阻塞,当出现大量数据阻塞查询循环并丢弃时,采取背压方式解决问题 #1
logger . Warn ( ctx , "fanInChan is full, dropping real-time data frame" , "key" , config . queryKey , "data_size" , len ( members ) )
}
}
func sortPullDataByTimeAscending ( ctx context . Context , data [ ] network . RealTimePullData ) {
sort . Slice ( data , func ( i , j int ) bool {
t1 , err1 := strconv . ParseInt ( data [ i ] . Time , 10 , 64 )
if err1 != nil {
logger . Error ( ctx , "parsing real time data timestamp failed" , "index" , i , "time" , data [ i ] . Time , "error" , err1 )
return false
}
t2 , err2 := strconv . ParseInt ( data [ j ] . Time , 10 , 64 )
if err2 != nil {
logger . Error ( ctx , "parsing real time data timestamp failed" , "index" , j , "time" , data [ j ] . Time , "error" , err2 )
return true
}
return t1 < t2
} )
}