modelRT/handler/real_time_data_pull.go

397 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package handler provides HTTP handlers for various endpoints.
package handler
import (
"context"
"fmt"
"net/http"
"sort"
"strconv"
"time"
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
"modelRT/model"
"modelRT/network"
"modelRT/util"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var pullUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(_ *http.Request) bool {
return true
},
}
// PullRealTimeDataHandler define real time data pull API
// @Summary 实时数据拉取 websocket api
// @Description 根据用户输入的clientID拉取对应的实时数据
// @Tags RealTime Component Websocket
// @Router /monitors/data/realtime/stream/:clientID [get]
func PullRealTimeDataHandler(c *gin.Context) {
clientID := c.Param("clientID")
if clientID == "" {
err := fmt.Errorf("clientID is missing from the path")
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
defer conn.Close()
ctx, cancel := context.WithCancel(c.Request.Context())
defer cancel()
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
fanInChan := make(chan network.RealTimePullTarget, 10000)
go processTargetPolling(ctx, globalSubState, clientID, fanInChan)
go readClientMessages(ctx, conn, clientID, cancel)
bufferMaxSize := constants.SendMaxBatchSize
sendMaxInterval := constants.SendMaxBatchInterval
buffer := make([]network.RealTimePullTarget, 0, bufferMaxSize)
ticker := time.NewTicker(sendMaxInterval)
defer ticker.Stop()
for {
select {
case targetData, ok := <-fanInChan:
if !ok {
logger.Error(ctx, "fanInChan closed unexpectedly", "client_id", clientID)
return
}
buffer = append(buffer, targetData)
if len(buffer) >= bufferMaxSize {
// buffer is full, send immediately
if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil {
logger.Error(nil, "when buffer is full, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err)
return
}
// reset buffer
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
// reset the ticker to prevent it from triggering immediately after the ticker is sent
ticker.Reset(sendMaxInterval)
}
case <-ticker.C:
if len(buffer) > 0 {
// when the ticker is triggered, all data in the send buffer is sent
if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil {
logger.Error(nil, "when the ticker is triggered, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err)
return
}
// reset buffer
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
}
case <-ctx.Done():
// send the last remaining data
if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil {
logger.Error(nil, "send the last remaining data failed", "client_id", clientID, "buffer", buffer, "error", err)
}
logger.Info(ctx, "PullRealTimeDataHandler exiting as context is done.", "client_id", clientID)
return
}
}
}
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) {
// conn.SetReadLimit(512)
for {
msgType, msgBytes, err := conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
logger.Info(ctx, "client actively and normally closed the connection", "client_id", clientID)
} else if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Error(ctx, "an unexpected error occurred while reading the webSocket connection", "client_id", clientID, "error", err)
} else {
// handle other read errors (eg, I/O errors)
logger.Error(ctx, "an error occurred while reading the webSocket connection", "client_id", clientID, "error", err)
}
cancel()
break
}
// process normal message from client
if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage {
logger.Info(ctx, "read normal message from client", "client_id", clientID, "content", string(msgBytes))
}
}
}
// sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client
func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network.RealTimePullTarget) error {
if len(targetsData) == 0 {
return nil
}
response := network.SuccessResponse{
Code: 200,
Msg: "success",
Payload: network.RealTimePullPayload{
Targets: targetsData,
},
}
return conn.WriteJSON(response)
}
// processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget) {
// ensure the fanInChan will not leak
defer close(fanInChan)
logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID))
stopChanMap := make(map[string]chan struct{})
s.globalMutex.RLock()
config, confExist := s.subMap[clientID]
if !confExist {
logger.Error(ctx, "can not found config into local stored map by clientID", "clientID", clientID)
s.globalMutex.RUnlock()
return
}
s.globalMutex.RUnlock()
// TODO 测试log
fmt.Printf("found subscription config for clientID:%s, start initial polling goroutines, config: %+v\n", clientID, config.components)
logger.Info(ctx, fmt.Sprintf("found subscription config for clientID:%s, start initial polling goroutines", clientID), "components len", config.components)
config.mutex.RLock()
for interval, componentItems := range config.components {
logger.Info(ctx, fmt.Sprintf("interval %s len of componentItems:%d\n", interval, len(componentItems.targets)))
for _, target := range componentItems.targets {
// add a secondary check to prevent the target from already existing in the stopChanMap
if _, exists := stopChanMap[target]; exists {
logger.Warn(ctx, "target already exists in polling map, skipping start-up", "target", target)
continue
}
measurement, exist := componentItems.targetParam[target]
if !exist {
logger.Error(ctx, "can not found subscription node param into param map", "target", target)
continue
}
queryGStopChan := make(chan struct{})
// store stop channel with target into map
stopChanMap[target] = queryGStopChan
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
if err != nil {
logger.Error(ctx, "generate measurement indentifier by data_source field failed", "data_source", measurement.DataSource, "error", err)
continue
}
pollingConfig := redisPollingConfig{
targetID: target,
queryKey: queryKey,
interval: interval,
dataSize: int64(measurement.Size),
}
go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan)
}
}
config.mutex.RUnlock()
for {
select {
case transportTargets, ok := <-config.noticeChan:
if !ok {
logger.Error(ctx, "notice channel was closed unexpectedly", "clientID", clientID)
stopAllPolling(ctx, stopChanMap)
return
}
config.mutex.Lock()
switch transportTargets.OperationType {
case constants.OpAppend:
appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets)
case constants.OpRemove:
removeTargets(ctx, stopChanMap, transportTargets.Targets)
}
config.mutex.Unlock()
case <-ctx.Done():
logger.Info(ctx, fmt.Sprintf("stop all data retrieval goroutines under this clientID:%s", clientID))
stopAllPolling(ctx, stopChanMap)
return
}
}
}
// appendTargets starts new polling goroutines for targets that were just added
func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, appendTargets []string) {
targetSet := make(map[string]struct{}, len(appendTargets))
for _, target := range appendTargets {
targetSet[target] = struct{}{}
}
for interval, componentItems := range config.components {
for _, target := range componentItems.targets {
if _, needsToAdd := targetSet[target]; !needsToAdd {
continue
}
if _, exists := stopChanMap[target]; exists {
logger.Warn(ctx, "append target already running, skipping", "target", target)
continue
}
measurement, exist := componentItems.targetParam[target]
if !exist {
logger.Error(ctx, "append target can not find measurement params for new target", "target", target)
continue
}
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
if err != nil {
logger.Error(ctx, "append target generate measurement identifier failed", "target", target, "error", err)
continue
}
pollingConfig := redisPollingConfig{
targetID: target,
queryKey: queryKey,
interval: interval,
dataSize: int64(measurement.Size),
}
queryGStopChan := make(chan struct{})
stopChanMap[target] = queryGStopChan
go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan)
logger.Info(ctx, "started new polling goroutine for appended target", "target", target, "interval", interval)
delete(targetSet, target)
}
}
for target := range targetSet {
logger.Error(ctx, "append target: failed to find config for target, goroutine not started", "target", target)
}
}
// removeTargets define func to stops running polling goroutines for targets that were removed
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, targetsToRemove []string) {
for _, target := range targetsToRemove {
stopChan, exists := stopChanMap[target]
if !exists {
logger.Warn(ctx, "removeTarget was not running, skipping", "target", target)
continue
}
close(stopChan)
delete(stopChanMap, target)
logger.Info(ctx, "stopped polling goroutine for removed target", "target", target)
}
}
// stopAllPolling stops all running query goroutines for a specific client
func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) {
for target, stopChan := range stopChanMap {
logger.Info(ctx, fmt.Sprintf("stop the data fetching behavior for the corresponding target:%s", target))
close(stopChan)
}
clear(stopChanMap)
return
}
// redisPollingConfig define struct for param which query real time data from redis
type redisPollingConfig struct {
targetID string
queryKey string
interval string
dataSize int64
}
func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig, fanInChan chan network.RealTimePullTarget, stopChan chan struct{}) {
// TODO 测试log后续可删除
logger.Info(ctx, "start a redis query goroutine for real time data pulling", "targetID", config.targetID, "queryKey", config.queryKey, "interval", config.interval, "dataSize", config.dataSize)
duration, err := time.ParseDuration(config.interval)
if err != nil {
logger.Error(ctx, "failed to parse the time string", "interval", config.interval, "error", err)
return
}
ticker := time.NewTicker(duration)
defer ticker.Stop()
client := diagram.NewRedisClient()
startTimestamp := util.GenNanoTsStr()
fmt.Printf("realTimeDataQueryFromRedis duration:%+v\n:", duration)
fmt.Printf("realTimeDataQueryFromRedis ticker:%+v\n:", ticker)
fmt.Printf("realTimeDataQueryFromRedis startTimestamp:%s\n", startTimestamp)
needPerformQuery := true
for {
if needPerformQuery {
performQuery(ctx, client, config, fanInChan)
needPerformQuery = false
}
select {
case <-ticker.C:
needPerformQuery = true
case <-stopChan:
logger.Info(ctx, "stop the redis query goroutine via a singal")
return
}
}
}
func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) {
members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize)
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err)
return
}
pullDatas := make([]network.RealTimePullData, 0, len(members))
for _, member := range members {
pullDatas = append(pullDatas, network.RealTimePullData{
Time: member.Member.(string),
Value: member.Score,
})
}
sortPullDataByTimeAscending(ctx, pullDatas)
targetData := network.RealTimePullTarget{
ID: config.targetID,
Datas: pullDatas,
}
select {
case fanInChan <- targetData:
default:
// TODO[BACKPRESSURE-ISSUE] 考虑 fanInChan 阻塞,当出现大量数据阻塞查询循环并丢弃时,采取背压方式解决问题 #1
logger.Warn(ctx, "fanInChan is full, dropping real-time data frame", "key", config.queryKey, "data_size", len(members))
}
}
func sortPullDataByTimeAscending(ctx context.Context, data []network.RealTimePullData) {
sort.Slice(data, func(i, j int) bool {
t1, err1 := strconv.ParseInt(data[i].Time, 10, 64)
if err1 != nil {
logger.Error(ctx, "parsing real time data timestamp failed", "index", i, "time", data[i].Time, "error", err1)
return false
}
t2, err2 := strconv.ParseInt(data[j].Time, 10, 64)
if err2 != nil {
logger.Error(ctx, "parsing real time data timestamp failed", "index", j, "time", data[j].Time, "error", err2)
return true
}
return t1 < t2
})
}