From a82e02126ddc744c7c559a859a1321f12c51a876 Mon Sep 17 00:00:00 2001 From: douxu Date: Tue, 11 Nov 2025 17:37:06 +0800 Subject: [PATCH] extracting duplicate code snippets to form a common function --- diagram/redis_client.go | 2 +- handler/real_time_data_pull.go | 94 ++++++-- handler/real_time_data_subscription.go | 320 +++++++++++++------------ 3 files changed, 242 insertions(+), 174 deletions(-) diff --git a/diagram/redis_client.go b/diagram/redis_client.go index 6d7bf1a..c336fe7 100644 --- a/diagram/redis_client.go +++ b/diagram/redis_client.go @@ -7,7 +7,7 @@ import ( "github.com/redis/go-redis/v9" ) -// RedisClient define struct to create redis client +// RedisClient define struct to accessing redis data that does not require the use of distributed locks type RedisClient struct { Client *redis.Client } diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index e386b0a..2e4a298 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -127,14 +127,16 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s defer close(fanInChan) stopChanMap := make(map[string]chan struct{}) - s.mutex.RLock() - + s.globalMutex.RLock() config, confExist := s.monitorMap[clientID] if !confExist { logger.Error(ctx, "can not found config into local stored map by clientID", "clientID", clientID) + s.globalMutex.RUnlock() return } + s.globalMutex.RUnlock() + config.mutex.RLock() for interval, componentItems := range config.components { for _, target := range componentItems.targets { // add a secondary check to prevent the target from already existing in the stopChanMap @@ -158,16 +160,16 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s continue } - config := redisPollingConfig{ + pollingConfig := redisPollingConfig{ targetID: target, queryKey: queryKey, interval: interval, dataSize: int64(measurement.Size), } - go realTimeDataQueryFromRedis(ctx, config, fanInChan, queryGStopChan) + go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan) } } - s.mutex.RUnlock() + config.mutex.RUnlock() for { select { @@ -177,19 +179,14 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s stopAllPolling(ctx, stopChanMap) return } + config.mutex.Lock() switch transportTargets.OperationType { case constants.OpAppend: - // TODO 考虑精细化锁结果,将RW锁置于ClientID层面之下 - s.mutex.Lock() - defer s.mutex.Unlock() - // TODO 增加 append 函数调用 - fmt.Println(transportTargets.Targets) + appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) case constants.OpRemove: - s.mutex.Lock() - defer s.mutex.Unlock() - // TODO 增加 remove 函数调用 - fmt.Println(transportTargets.Targets) + removeTargets(ctx, stopChanMap, transportTargets.Targets) } + config.mutex.Unlock() case <-ctx.Done(): logger.Info(ctx, fmt.Sprintf("stop all data retrieval goroutines under this clientID:%s", clientID)) stopAllPolling(ctx, stopChanMap) @@ -198,6 +195,75 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s } } +// appendTargets starts new polling goroutines for targets that were just added +func appendTargets(ctx context.Context, config *RealTimeMonitorConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, appendTargets []string) { + targetSet := make(map[string]struct{}, len(appendTargets)) + for _, target := range appendTargets { + targetSet[target] = struct{}{} + } + + for interval, componentItems := range config.components { + for _, target := range componentItems.targets { + if _, needsToAdd := targetSet[target]; !needsToAdd { + continue + } + + if _, exists := stopChanMap[target]; exists { + logger.Warn(ctx, "append target already running, skipping", "target", target) + continue + } + + measurement, exist := componentItems.targetParam[target] + if !exist { + logger.Error(ctx, "append target can not find measurement params for new target", "target", target) + continue + } + + queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource) + if err != nil { + logger.Error(ctx, "append target generate measurement identifier failed", "target", target, "error", err) + continue + } + + pollingConfig := redisPollingConfig{ + targetID: target, + queryKey: queryKey, + interval: interval, + dataSize: int64(measurement.Size), + } + + queryGStopChan := make(chan struct{}) + stopChanMap[target] = queryGStopChan + + go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan) + + logger.Info(ctx, "started new polling goroutine for appended target", "target", target, "interval", interval) + + delete(targetSet, target) + } + } + + for target := range targetSet { + logger.Error(ctx, "append target: failed to find config for target, goroutine not started", "target", target) + } +} + +// removeTargets define func to stops running polling goroutines for targets that were removed +func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, targetsToRemove []string) { + for _, target := range targetsToRemove { + stopChan, exists := stopChanMap[target] + if !exists { + logger.Warn(ctx, "removeTarget was not running, skipping", "target", target) + continue + } + + close(stopChan) + delete(stopChanMap, target) + logger.Info(ctx, "stopped polling goroutine for removed target", "target", target) + } +} + +// stopAllPolling stops all running query goroutines for a specific client func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) { for target, stopChan := range stopChanMap { logger.Info(ctx, fmt.Sprintf("stop the data fetching behavior for the corresponding target:%s", target)) diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index 6c102cb..2b147df 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -4,6 +4,7 @@ package handler import ( "context" "fmt" + "maps" "net/http" "sync" @@ -77,7 +78,7 @@ func init() { func RealTimeSubHandler(c *gin.Context) { var request network.RealTimeSubRequest var monitorAction string - var monitorID string + var clientID string if err := c.ShouldBindJSON(&request); err != nil { logger.Error(c, "failed to unmarshal real time query request", "error", err) @@ -99,13 +100,13 @@ func RealTimeSubHandler(c *gin.Context) { }) return } - monitorID = id.String() + clientID = id.String() } else if request.Action == constants.MonitorStartAction && request.ClientID != "" { monitorAction = constants.MonitorAppendAction - monitorID = request.ClientID + clientID = request.ClientID } else if request.Action == constants.MonitorStopAction && request.ClientID != "" { monitorAction = request.Action - monitorID = request.ClientID + clientID = request.ClientID } pgClient := database.GetPostgresDBClient() @@ -115,14 +116,14 @@ func RealTimeSubHandler(c *gin.Context) { switch monitorAction { case constants.MonitorStartAction: - results, err := globalMonitorState.CreateConfig(c, tx, monitorID, request.Components) + results, err := globalMonitorState.CreateConfig(c, tx, clientID, request.Components) if err != nil { logger.Error(c, "create real time data monitor config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), PayLoad: network.RealTimeSubPayload{ - ClientID: monitorID, + ClientID: clientID, TargetResults: results, }, }) @@ -133,20 +134,20 @@ func RealTimeSubHandler(c *gin.Context) { Code: http.StatusOK, Msg: "success", PayLoad: network.RealTimeSubPayload{ - ClientID: monitorID, + ClientID: clientID, TargetResults: results, }, }) return case constants.MonitorStopAction: - results, err := globalMonitorState.RemoveTargets(c, monitorID, request.Components) + results, err := globalMonitorState.RemoveTargets(c, clientID, request.Components) if err != nil { logger.Error(c, "remove target to real time data monitor config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), PayLoad: network.RealTimeSubPayload{ - ClientID: monitorID, + ClientID: clientID, TargetResults: results, }, }) @@ -157,20 +158,20 @@ func RealTimeSubHandler(c *gin.Context) { Code: http.StatusOK, Msg: "success", PayLoad: network.RealTimeSubPayload{ - ClientID: monitorID, + ClientID: clientID, TargetResults: results, }, }) return case constants.MonitorAppendAction: - results, err := globalMonitorState.AppendTargets(c, tx, monitorID, request.Components) + results, err := globalMonitorState.AppendTargets(c, tx, clientID, request.Components) if err != nil { logger.Error(c, "append target to real time data monitor config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), PayLoad: network.RealTimeSubPayload{ - ClientID: monitorID, + ClientID: clientID, TargetResults: results, }, }) @@ -181,7 +182,7 @@ func RealTimeSubHandler(c *gin.Context) { Code: http.StatusOK, Msg: "success", PayLoad: network.RealTimeSubPayload{ - ClientID: monitorID, + ClientID: clientID, TargetResults: results, }, }) @@ -195,7 +196,7 @@ func RealTimeSubHandler(c *gin.Context) { Code: http.StatusBadRequest, Msg: err.Error(), PayLoad: network.RealTimeSubPayload{ - ClientID: monitorID, + ClientID: clientID, TargetResults: results, }, }) @@ -212,13 +213,14 @@ type RealTimeMonitorComponent struct { // RealTimeMonitorConfig define struct of real time monitor config type RealTimeMonitorConfig struct { noticeChan chan *transportTargets + mutex sync.RWMutex components map[string]*RealTimeMonitorComponent } // SharedMonitorState define struct of shared monitor state with mutex type SharedMonitorState struct { - monitorMap map[string]*RealTimeMonitorConfig - mutex sync.RWMutex + monitorMap map[string]*RealTimeMonitorConfig + globalMutex sync.RWMutex } // NewSharedMonitorState define function to create new SharedMonitorState @@ -228,31 +230,22 @@ func NewSharedMonitorState() *SharedMonitorState { } } -// CreateConfig define function to create config in SharedMonitorState -func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - - requestTargetsCount := processRealTimeRequestCount(components) - targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) - - if _, exist := s.monitorMap[monitorID]; exist { - err := fmt.Errorf("monitorID %s already exists. Use AppendTargets to modify existing config", monitorID) - logger.Error(ctx, "monitorID already exists. Use AppendTargets to modify existing config", "error", err) - return processRealTimeRequestTargets(components, requestTargetsCount, err), err - } - - config := &RealTimeMonitorConfig{ - noticeChan: make(chan *transportTargets), - components: make(map[string]*RealTimeMonitorComponent), - } +// processAndValidateTargets define func to perform all database I/O operations in a lock-free state (eg,ParseDataIdentifierToken) +func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []network.RealTimeComponentItem, allReqTargetNum int) ( + []network.TargetResult, + map[string]*RealTimeMonitorComponent, + []string, +) { + targetProcessResults := make([]network.TargetResult, 0, allReqTargetNum) + newComponentsMap := make(map[string]*RealTimeMonitorComponent) + successfulTargets := make([]string, 0, allReqTargetNum) for _, componentItem := range components { interval := componentItem.Interval + for _, target := range componentItem.Targets { var targetResult network.TargetResult targetResult.ID = target - targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) if err != nil { logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) @@ -261,48 +254,94 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni targetProcessResults = append(targetProcessResults, targetResult) continue } - targetResult.Code = constants.SubSuccessCode targetResult.Msg = constants.SubSuccessMsg targetProcessResults = append(targetProcessResults, targetResult) - if comp, ok := config.components[interval]; !ok { - targets := make([]string, 0, len(componentItem.Targets)) - targetParam := make(map[string]*orm.Measurement) - targetParam[target] = targetModel.GetMeasurementInfo() - config.components[interval] = &RealTimeMonitorComponent{ - targets: append(targets, target), - targetParam: targetParam, + successfulTargets = append(successfulTargets, target) + + if _, ok := newComponentsMap[interval]; !ok { + newComponentsMap[interval] = &RealTimeMonitorComponent{ + targets: make([]string, 0, len(componentItem.Targets)), + targetParam: make(map[string]*orm.Measurement), } - } else { - comp.targets = append(comp.targets, target) - comp.targetParam[target] = targetModel.GetMeasurementInfo() } + + comp := newComponentsMap[interval] + comp.targets = append(comp.targets, target) + comp.targetParam[target] = targetModel.GetMeasurementInfo() } } + return targetProcessResults, newComponentsMap, successfulTargets +} - s.monitorMap[monitorID] = config +// mergeComponents define func to merge newComponentsMap into existingComponentsMap +func mergeComponents(existingComponents map[string]*RealTimeMonitorComponent, newComponents map[string]*RealTimeMonitorComponent) { + for interval, newComp := range newComponents { + if existingComp, ok := existingComponents[interval]; ok { + existingComp.targets = append(existingComp.targets, newComp.targets...) + maps.Copy(existingComp.targetParam, newComp.targetParam) + } else { + existingComponents[interval] = newComp + } + } +} + +// CreateConfig define function to create config in SharedMonitorState +func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { + requestTargetsCount := processRealTimeRequestCount(components) + targetProcessResults, newComponentsMap, _ := processAndValidateTargets(ctx, tx, components, requestTargetsCount) + + s.globalMutex.Lock() + if _, exist := s.monitorMap[clientID]; exist { + s.globalMutex.Unlock() + err := fmt.Errorf("clientID %s already exists. use AppendTargets to modify existing config", clientID) + logger.Error(ctx, "clientID already exists. use AppendTargets to modify existing config", "error", err) + return targetProcessResults, err + } + + config := &RealTimeMonitorConfig{ + noticeChan: make(chan *transportTargets), + components: newComponentsMap, // 直接使用预构建的 Map + } + s.monitorMap[clientID] = config + s.globalMutex.Unlock() return targetProcessResults, nil } // AppendTargets define function to append targets in SharedMonitorState -func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - +func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) - config, exist := s.monitorMap[monitorID] + s.globalMutex.RLock() + config, exist := s.monitorMap[clientID] if !exist { - err := fmt.Errorf("monitorID %s not found. Use CreateConfig to start a new config", monitorID) - logger.Error(ctx, "monitorID not found. Use CreateConfig to start a new config", "error", err) + s.globalMutex.RUnlock() + err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID) + logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err) return processRealTimeRequestTargets(components, requestTargetsCount, err), err } + s.globalMutex.RUnlock() + + targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount) + + config.mutex.Lock() + mergeComponents(config.components, newComponentsMap) + config.mutex.Unlock() + + if len(successfulTargets) > 0 { + transportTargets := &transportTargets{ + OperationType: constants.OpAppend, + Targets: successfulTargets, + } + config.noticeChan <- transportTargets + } transportTargets := &transportTargets{ OperationType: constants.OpAppend, Targets: make([]string, requestTargetsCount), } + config.mutex.Lock() for _, componentItem := range components { interval := componentItem.Interval for _, target := range componentItem.Targets { @@ -335,141 +374,84 @@ func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, mon transportTargets.Targets = append(transportTargets.Targets, target) } } + config.mutex.Unlock() config.noticeChan <- transportTargets return targetProcessResults, nil } // UpsertTargets define function to upsert targets in SharedMonitorState -func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - - config, exist := s.monitorMap[monitorID] - if !exist { - // create new config - config = &RealTimeMonitorConfig{ - noticeChan: make(chan *transportTargets), - components: make(map[string]*RealTimeMonitorComponent), - } - - targetProcessResults := make([]network.TargetResult, 0, processRealTimeRequestCount(components)) - for _, componentItem := range components { - interval := componentItem.Interval - for _, target := range componentItem.Targets { - var targetResult network.TargetResult - targetResult.ID = target - - targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) - if err != nil { - logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) - targetResult.Code = constants.SubFailedCode - targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) - targetProcessResults = append(targetProcessResults, targetResult) - continue - } - - targetResult.Code = constants.SubSuccessCode - targetResult.Msg = constants.SubSuccessMsg - if comp, ok := config.components[interval]; !ok { - targets := make([]string, 0, len(componentItem.Targets)) - targetParam := make(map[string]*orm.Measurement) - targetParam[target] = targetModel.GetMeasurementInfo() - config.components[interval] = &RealTimeMonitorComponent{ - targets: append(targets, target), - } - } else { - comp.targets = append(comp.targets, target) - comp.targetParam[target] = targetModel.GetMeasurementInfo() - } - } - } - s.monitorMap[monitorID] = config - return targetProcessResults, nil - } - +func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) - targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) - transportTargets := &transportTargets{ - OperationType: constants.OpUpdate, - Targets: make([]string, requestTargetsCount), - } - for _, componentItem := range components { - interval := componentItem.Interval - for _, target := range componentItem.Targets { - var targetResult network.TargetResult - targetResult.ID = target + targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount) - targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) - if err != nil { - logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) - targetResult.Code = constants.SubFailedCode - targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) - targetProcessResults = append(targetProcessResults, targetResult) - continue - } + s.globalMutex.RLock() + config, exist := s.monitorMap[clientID] + s.globalMutex.RUnlock() - targetResult.Code = constants.SubSuccessCode - targetResult.Msg = constants.SubSuccessMsg - targetProcessResults = append(targetProcessResults, targetResult) - if comp, ok := config.components[interval]; !ok { - targets := make([]string, 0, len(componentItem.Targets)) - targetParam := make(map[string]*orm.Measurement) - targetParam[target] = targetModel.GetMeasurementInfo() - config.components[interval] = &RealTimeMonitorComponent{ - targets: append(targets, target), - } - } else { - comp.targets = append(comp.targets, target) - comp.targetParam[target] = targetModel.GetMeasurementInfo() + var opType constants.TargetOperationType + if exist { + opType = constants.OpUpdate + config.mutex.Lock() + mergeComponents(config.components, newComponentsMap) + config.mutex.Unlock() + } else { + opType = constants.OpAppend + s.globalMutex.Lock() + if config, exist = s.monitorMap[clientID]; !exist { + config = &RealTimeMonitorConfig{ + noticeChan: make(chan *transportTargets), + components: newComponentsMap, } - transportTargets.Targets = append(transportTargets.Targets, target) + s.monitorMap[clientID] = config + } else { + s.globalMutex.Unlock() + config.mutex.Lock() + mergeComponents(config.components, newComponentsMap) + config.mutex.Unlock() } + s.globalMutex.Unlock() } - config.noticeChan <- transportTargets + if len(successfulTargets) > 0 { + transportTargets := &transportTargets{ + OperationType: opType, + Targets: successfulTargets, + } + config.noticeChan <- transportTargets + } return targetProcessResults, nil } -// Get define function to get subscriptions config from SharedMonitorState -func (s *SharedMonitorState) Get(clientID string) (*RealTimeMonitorConfig, bool) { - s.mutex.RLock() - defer s.mutex.RUnlock() - - config, ok := s.monitorMap[clientID] - if !ok { - return nil, false - } - return config, true -} - // RemoveTargets define function to remove targets in SharedMonitorState -func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - +func (s *SharedMonitorState) RemoveTargets(ctx context.Context, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) - config, exist := s.monitorMap[monitorID] + s.globalMutex.RLock() + config, exist := s.monitorMap[clientID] if !exist { - err := fmt.Errorf("monitorID %s not found", monitorID) - logger.Error(ctx, "monitorID not found in remove targets operation", "error", err) + s.globalMutex.RUnlock() + err := fmt.Errorf("clientID %s not found", clientID) + logger.Error(ctx, "clientID not found in remove targets operation", "error", err) return processRealTimeRequestTargets(components, requestTargetsCount, err), err } + s.globalMutex.RUnlock() + var shouldRemoveClient bool // components is the list of items to be removed passed in the request transportTargets := &transportTargets{ OperationType: constants.OpRemove, - Targets: make([]string, requestTargetsCount), + Targets: make([]string, 0, requestTargetsCount), } + config.mutex.Lock() for _, compent := range components { interval := compent.Interval // comp is the locally running listener configuration comp, compExist := config.components[interval] if !compExist { - logger.Error(ctx, fmt.Sprintf("component with interval %s not found under monitorID %s", interval, monitorID), "monitorID", monitorID, "interval", interval) - for _, target := range comp.targets { + logger.Error(ctx, fmt.Sprintf("component with interval %s not found under clientID %s", interval, clientID), "clientID", clientID, "interval", interval) + for _, target := range compent.Targets { targetResult := network.TargetResult{ ID: target, Code: constants.CancelSubFailedCode, @@ -508,11 +490,11 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string } if len(config.components) == 0 { - delete(s.monitorMap, monitorID) + shouldRemoveClient = true } if len(targetsToRemoveMap) > 0 { - err := fmt.Errorf("target remove were not found under monitorID %s and interval %s", monitorID, interval) + err := fmt.Errorf("target remove were not found under clientID %s and interval %s", clientID, interval) for target := range targetsToRemoveMap { targetResult := network.TargetResult{ ID: target, @@ -523,12 +505,32 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string } } } - + config.mutex.Unlock() // pass the removed subscription configuration to the notice channel config.noticeChan <- transportTargets + + if shouldRemoveClient { + s.globalMutex.Lock() + if currentConfig, exist := s.monitorMap[clientID]; exist && len(currentConfig.components) == 0 { + delete(s.monitorMap, clientID) + } + s.globalMutex.Unlock() + } return targetProcessResults, nil } +// Get define function to get subscriptions config from SharedMonitorState +func (s *SharedMonitorState) Get(clientID string) (*RealTimeMonitorConfig, bool) { + s.globalMutex.RLock() + defer s.globalMutex.RUnlock() + + config, ok := s.monitorMap[clientID] + if !ok { + return nil, false + } + return config, true +} + // TODO 增加一个update 函数用来更新 interval func processRealTimeRequestCount(components []network.RealTimeComponentItem) int {