// Package handler provides HTTP handlers for various endpoints. package handler import ( "context" "fmt" "net/http" "sort" "strconv" "time" "modelRT/constants" "modelRT/diagram" "modelRT/logger" "modelRT/model" "modelRT/network" "modelRT/util" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) var pullUpgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(_ *http.Request) bool { return true }, } // PullRealTimeDataHandler define real time data pull API // @Summary 实时数据拉取 websocket api // @Description 根据用户输入的clientID拉取对应的实时数据 // @Tags RealTime Component Websocket // @Router /monitors/data/realtime/stream/:clientID [get] func PullRealTimeDataHandler(c *gin.Context) { clientID := c.Param("clientID") if clientID == "" { err := fmt.Errorf("clientID is missing from the path") logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), }) return } conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), }) return } defer conn.Close() ctx, cancel := context.WithCancel(c.Request.Context()) defer cancel() // TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1 fanInChan := make(chan network.RealTimePullTarget, 10000) go processTargetPolling(ctx, globalSubState, clientID, fanInChan) go readClientMessages(ctx, conn, clientID, cancel) bufferMaxSize := constants.SendMaxBatchSize sendMaxInterval := constants.SendMaxBatchInterval buffer := make([]network.RealTimePullTarget, 0, bufferMaxSize) ticker := time.NewTicker(sendMaxInterval) defer ticker.Stop() for { select { case targetData, ok := <-fanInChan: if !ok { logger.Error(ctx, "fanInChan closed unexpectedly", "client_id", clientID) return } buffer = append(buffer, targetData) if len(buffer) >= bufferMaxSize { // buffer is full, send immediately if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { logger.Error(ctx, "when buffer is full, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err) return } // reset buffer buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize) // reset the ticker to prevent it from triggering immediately after the ticker is sent ticker.Reset(sendMaxInterval) } case <-ticker.C: if len(buffer) > 0 { // when the ticker is triggered, all data in the send buffer is sent if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { logger.Error(ctx, "when the ticker is triggered, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err) return } // reset buffer buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize) } case <-ctx.Done(): // send the last remaining data if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { logger.Error(ctx, "send the last remaining data failed", "client_id", clientID, "buffer", buffer, "error", err) } logger.Info(ctx, "PullRealTimeDataHandler exiting as context is done.", "client_id", clientID) return } } } // readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令) func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) { // conn.SetReadLimit(512) for { msgType, msgBytes, err := conn.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure) { logger.Info(ctx, "client actively and normally closed the connection", "client_id", clientID) } else if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { logger.Error(ctx, "an unexpected error occurred while reading the webSocket connection", "client_id", clientID, "error", err) } else { // handle other read errors (eg, I/O errors) logger.Error(ctx, "an error occurred while reading the webSocket connection", "client_id", clientID, "error", err) } cancel() break } // process normal message from client if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage { logger.Info(ctx, "read normal message from client", "client_id", clientID, "content", string(msgBytes)) } } } // sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network.RealTimePullTarget) error { if len(targetsData) == 0 { return nil } response := network.SuccessResponse{ Code: 200, Msg: "success", Payload: network.RealTimePullPayload{ Targets: targetsData, }, } return conn.WriteJSON(response) } // processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget) { // ensure the fanInChan will not leak defer close(fanInChan) logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID)) stopChanMap := make(map[string]chan struct{}) s.globalMutex.RLock() config, confExist := s.subMap[clientID] if !confExist { logger.Error(ctx, "can not found config into local stored map by clientID", "clientID", clientID) s.globalMutex.RUnlock() return } s.globalMutex.RUnlock() // TODO 测试log fmt.Printf("found subscription config for clientID:%s, start initial polling goroutines, config: %+v\n", clientID, config.components) logger.Info(ctx, fmt.Sprintf("found subscription config for clientID:%s, start initial polling goroutines", clientID), "components len", config.components) config.mutex.RLock() for interval, measurementTargets := range config.measurements { for _, target := range measurementTargets { // add a secondary check to prevent the target from already existing in the stopChanMap if _, exists := stopChanMap[target]; exists { logger.Warn(ctx, "target already exists in polling map, skipping start-up", "target", target) continue } targetContext, exist := config.targetContext[target] if !exist { logger.Error(ctx, "can not found subscription node param into param map", "target", target) continue } measurementInfo := targetContext.measurement queryGStopChan := make(chan struct{}) // store stop channel with target into map stopChanMap[target] = queryGStopChan queryKey, err := model.GenerateMeasureIdentifier(measurementInfo.DataSource) if err != nil { logger.Error(ctx, "generate measurement indentifier by data_source field failed", "data_source", measurementInfo.DataSource, "error", err) continue } pollingConfig := redisPollingConfig{ targetID: target, queryKey: queryKey, interval: interval, dataSize: int64(measurementInfo.Size), } go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan) } } config.mutex.RUnlock() for { select { case transportTargets, ok := <-config.noticeChan: if !ok { logger.Error(ctx, "notice channel was closed unexpectedly", "clientID", clientID) stopAllPolling(ctx, stopChanMap) return } config.mutex.Lock() switch transportTargets.OperationType { case constants.OpAppend: appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) case constants.OpRemove: removeTargets(ctx, config, stopChanMap, transportTargets.Targets) } config.mutex.Unlock() case <-ctx.Done(): logger.Info(ctx, fmt.Sprintf("stop all data retrieval goroutines under this clientID:%s", clientID)) stopAllPolling(ctx, stopChanMap) return } } } // appendTargets starts new polling goroutines for targets that were just added func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, appendTargets []string) { appendTargetsSet := make(map[string]struct{}, len(appendTargets)) for _, target := range appendTargets { appendTargetsSet[target] = struct{}{} } for _, target := range appendTargets { targetContext, exists := config.targetContext[target] if exists { logger.Warn(ctx, "the append target already exists in the real time data fetch process,skipping the startup step", "target", target) continue } if _, exists := stopChanMap[target]; exists { logger.Warn(ctx, "the append target already has a stop channel, skipping the startup step", "target", target) continue } queryGStopChan := make(chan struct{}) stopChanMap[target] = queryGStopChan interval := targetContext.interval measurementTargets, ok := config.measurements[interval] if !ok { logger.Error(ctx, "targetContext exists but measurements is missing, cannot update config", "target", target, "interval", interval) continue } measurementTargets = append(measurementTargets, target) config.targetContext[target] = targetContext delete(appendTargetsSet, target) queryKey, err := model.GenerateMeasureIdentifier(targetContext.measurement.DataSource) if err != nil { logger.Error(ctx, "the append target generate redis query key identifier failed", "target", target, "error", err) continue } pollingConfig := redisPollingConfig{ targetID: target, queryKey: queryKey, interval: targetContext.interval, dataSize: int64(targetContext.measurement.Size), } go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan) logger.Info(ctx, "started new polling goroutine for appended target", "target", target, "interval", targetContext.interval) } allKeys := util.GetKeysFromSet(appendTargetsSet) if len(allKeys) > 0 { logger.Warn(ctx, fmt.Sprintf("the following targets:%v start up fetch real time data process goroutine not started", allKeys)) clear(appendTargetsSet) } } // removeTargets define func to stops running polling goroutines for targets that were removed func removeTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, removeTargets []string) { for _, target := range removeTargets { targetContext, exist := config.targetContext[target] if !exist { logger.Warn(ctx, "removeTarget does not exist in targetContext map, skipping remove operation", "target", target) continue } stopChan, exists := stopChanMap[target] if !exists { logger.Warn(ctx, "removeTarget was not running, skipping remove operation", "target", target) continue } close(stopChan) delete(stopChanMap, target) delete(config.targetContext, target) interval := targetContext.interval measurementTargets, mesExist := config.measurements[interval] if !mesExist { logger.Warn(ctx, "targetContext exists but measurements is missing, cannot perform remove operation", "interval", interval, "target", target) continue } measurementTargets = util.RemoveTargetsFromSliceSimple(measurementTargets, []string{target}) config.measurements[interval] = measurementTargets logger.Info(ctx, "stopped polling goroutine for removed target", "target", target) } } // stopAllPolling stops all running query goroutines for a specific client func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) { for target, stopChan := range stopChanMap { logger.Info(ctx, fmt.Sprintf("stop the data fetching behavior for the corresponding target:%s", target)) close(stopChan) } clear(stopChanMap) return } // redisPollingConfig define struct for param which query real time data from redis type redisPollingConfig struct { targetID string queryKey string interval string dataSize int64 } func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig, fanInChan chan network.RealTimePullTarget, stopChan chan struct{}) { // TODO 测试log,后续可删除 logger.Info(ctx, "start a redis query goroutine for real time data pulling", "targetID", config.targetID, "queryKey", config.queryKey, "interval", config.interval, "dataSize", config.dataSize) duration, err := time.ParseDuration(config.interval) if err != nil { logger.Error(ctx, "failed to parse the time string", "interval", config.interval, "error", err) return } ticker := time.NewTicker(duration) defer ticker.Stop() client := diagram.NewRedisClient() startTimestamp := util.GenNanoTsStr() fmt.Printf("realTimeDataQueryFromRedis duration:%+v\n:", duration) fmt.Printf("realTimeDataQueryFromRedis ticker:%+v\n:", ticker) fmt.Printf("realTimeDataQueryFromRedis startTimestamp:%s\n", startTimestamp) needPerformQuery := true for { if needPerformQuery { performQuery(ctx, client, config, fanInChan) needPerformQuery = false } select { case <-ticker.C: needPerformQuery = true case <-stopChan: logger.Info(ctx, "stop the redis query goroutine via a singal") return } } } func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) { members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize) if err != nil { logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err) return } pullDatas := make([]network.RealTimePullData, 0, len(members)) for _, member := range members { pullDatas = append(pullDatas, network.RealTimePullData{ Time: member.Member.(string), Value: member.Score, }) } sortPullDataByTimeAscending(ctx, pullDatas) targetData := network.RealTimePullTarget{ ID: config.targetID, Datas: pullDatas, } select { case fanInChan <- targetData: default: // TODO[BACKPRESSURE-ISSUE] 考虑 fanInChan 阻塞,当出现大量数据阻塞查询循环并丢弃时,采取背压方式解决问题 #1 logger.Warn(ctx, "fanInChan is full, dropping real-time data frame", "key", config.queryKey, "data_size", len(members)) } } func sortPullDataByTimeAscending(ctx context.Context, data []network.RealTimePullData) { sort.Slice(data, func(i, j int) bool { t1, err1 := strconv.ParseInt(data[i].Time, 10, 64) if err1 != nil { logger.Error(ctx, "parsing real time data timestamp failed", "index", i, "time", data[i].Time, "error", err1) return false } t2, err2 := strconv.ParseInt(data[j].Time, 10, 64) if err2 != nil { logger.Error(ctx, "parsing real time data timestamp failed", "index", j, "time", data[j].Time, "error", err2) return true } return t1 < t2 }) }