From 6f3134b5e9fc4da2e7ce7bb6273c3854654890c8 Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 26 Nov 2025 17:49:24 +0800 Subject: [PATCH] optimize struct of real time data subscription api and fix bug of real time data pull api --- constants/subscription.go | 5 + deploy/redis-test-data/data_injection.go | 2 +- handler/real_time_data_pull.go | 131 ++++++++------- handler/real_time_data_subscription.go | 205 ++++++++++------------- network/real_time_data_request.go | 10 +- util/map.go | 11 ++ util/string.go | 18 ++ 7 files changed, 204 insertions(+), 178 deletions(-) create mode 100644 util/map.go create mode 100644 util/string.go diff --git a/constants/subscription.go b/constants/subscription.go index a71a069..ca379d2 100644 --- a/constants/subscription.go +++ b/constants/subscription.go @@ -44,3 +44,8 @@ const ( // OpUpdate define update exist target from the subscription list OpUpdate ) + +const ( + // NoticeChanCap define real time data notice channel capacity + NoticeChanCap = 10000 +) diff --git a/deploy/redis-test-data/data_injection.go b/deploy/redis-test-data/data_injection.go index 01008cd..c8f918a 100644 --- a/deploy/redis-test-data/data_injection.go +++ b/deploy/redis-test-data/data_injection.go @@ -611,7 +611,7 @@ func generateRandomData(baseValue float64, changes []float64, size int) []float6 // simulateDataWrite 定时生成并写入模拟数据到 Redis ZSet func simulateDataWrite(ctx context.Context, rdb *redis.Client, redisKey string, config outlierConfig, measInfo calculationResult) { log.Printf("启动数据写入程序, Redis Key: %s, 基准值: %.4f, 变化范围: %+v\n", redisKey, measInfo.BaseValue, measInfo.Changes) - ticker := time.NewTicker(1 * time.Second) + ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() pipe := rdb.Pipeline() diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index 05da5dc..9ca221d 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -82,7 +82,7 @@ func PullRealTimeDataHandler(c *gin.Context) { if len(buffer) >= bufferMaxSize { // buffer is full, send immediately if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { - logger.Error(nil, "when buffer is full, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err) + logger.Error(ctx, "when buffer is full, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err) return } // reset buffer @@ -94,7 +94,7 @@ func PullRealTimeDataHandler(c *gin.Context) { if len(buffer) > 0 { // when the ticker is triggered, all data in the send buffer is sent if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { - logger.Error(nil, "when the ticker is triggered, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err) + logger.Error(ctx, "when the ticker is triggered, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err) return } // reset buffer @@ -103,7 +103,7 @@ func PullRealTimeDataHandler(c *gin.Context) { case <-ctx.Done(): // send the last remaining data if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { - logger.Error(nil, "send the last remaining data failed", "client_id", clientID, "buffer", buffer, "error", err) + logger.Error(ctx, "send the last remaining data failed", "client_id", clientID, "buffer", buffer, "error", err) } logger.Info(ctx, "PullRealTimeDataHandler exiting as context is done.", "client_id", clientID) return @@ -171,27 +171,27 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin logger.Info(ctx, fmt.Sprintf("found subscription config for clientID:%s, start initial polling goroutines", clientID), "components len", config.components) config.mutex.RLock() - for interval, componentItems := range config.components { - logger.Info(ctx, fmt.Sprintf("interval %s len of componentItems:%d\n", interval, len(componentItems.targets))) - for _, target := range componentItems.targets { + for interval, measurementTargets := range config.measurements { + for _, target := range measurementTargets { // add a secondary check to prevent the target from already existing in the stopChanMap if _, exists := stopChanMap[target]; exists { logger.Warn(ctx, "target already exists in polling map, skipping start-up", "target", target) continue } - measurement, exist := componentItems.targetParam[target] + targetContext, exist := config.targetContext[target] if !exist { logger.Error(ctx, "can not found subscription node param into param map", "target", target) continue } + measurementInfo := targetContext.measurement queryGStopChan := make(chan struct{}) // store stop channel with target into map stopChanMap[target] = queryGStopChan - queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource) + queryKey, err := model.GenerateMeasureIdentifier(measurementInfo.DataSource) if err != nil { - logger.Error(ctx, "generate measurement indentifier by data_source field failed", "data_source", measurement.DataSource, "error", err) + logger.Error(ctx, "generate measurement indentifier by data_source field failed", "data_source", measurementInfo.DataSource, "error", err) continue } @@ -199,7 +199,7 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin targetID: target, queryKey: queryKey, interval: interval, - dataSize: int64(measurement.Size), + dataSize: int64(measurementInfo.Size), } go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan) } @@ -219,7 +219,7 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin case constants.OpAppend: appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) case constants.OpRemove: - removeTargets(ctx, stopChanMap, transportTargets.Targets) + removeTargets(ctx, config, stopChanMap, transportTargets.Targets) } config.mutex.Unlock() case <-ctx.Done(): @@ -232,68 +232,87 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin // appendTargets starts new polling goroutines for targets that were just added func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, appendTargets []string) { - targetSet := make(map[string]struct{}, len(appendTargets)) + appendTargetsSet := make(map[string]struct{}, len(appendTargets)) for _, target := range appendTargets { - targetSet[target] = struct{}{} + appendTargetsSet[target] = struct{}{} } - for interval, componentItems := range config.components { - for _, target := range componentItems.targets { - if _, needsToAdd := targetSet[target]; !needsToAdd { - continue - } - - if _, exists := stopChanMap[target]; exists { - logger.Warn(ctx, "append target already running, skipping", "target", target) - continue - } - - measurement, exist := componentItems.targetParam[target] - if !exist { - logger.Error(ctx, "append target can not find measurement params for new target", "target", target) - continue - } - - queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource) - if err != nil { - logger.Error(ctx, "append target generate measurement identifier failed", "target", target, "error", err) - continue - } - - pollingConfig := redisPollingConfig{ - targetID: target, - queryKey: queryKey, - interval: interval, - dataSize: int64(measurement.Size), - } - - queryGStopChan := make(chan struct{}) - stopChanMap[target] = queryGStopChan - - go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan) - - logger.Info(ctx, "started new polling goroutine for appended target", "target", target, "interval", interval) - - delete(targetSet, target) + for _, target := range appendTargets { + targetContext, exists := config.targetContext[target] + if exists { + logger.Warn(ctx, "the append target already exists in the real time data fetch process,skipping the startup step", "target", target) + continue } + + if _, exists := stopChanMap[target]; exists { + logger.Warn(ctx, "the append target already has a stop channel, skipping the startup step", "target", target) + continue + } + + queryGStopChan := make(chan struct{}) + stopChanMap[target] = queryGStopChan + + interval := targetContext.interval + measurementTargets, ok := config.measurements[interval] + if !ok { + logger.Error(ctx, "targetContext exists but measurements is missing, cannot update config", "target", target, "interval", interval) + continue + } + measurementTargets = append(measurementTargets, target) + config.targetContext[target] = targetContext + delete(appendTargetsSet, target) + + queryKey, err := model.GenerateMeasureIdentifier(targetContext.measurement.DataSource) + if err != nil { + logger.Error(ctx, "the append target generate redis query key identifier failed", "target", target, "error", err) + continue + } + pollingConfig := redisPollingConfig{ + targetID: target, + queryKey: queryKey, + interval: targetContext.interval, + dataSize: int64(targetContext.measurement.Size), + } + go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan) + + logger.Info(ctx, "started new polling goroutine for appended target", "target", target, "interval", targetContext.interval) } - for target := range targetSet { - logger.Error(ctx, "append target: failed to find config for target, goroutine not started", "target", target) + allKeys := util.GetKeysFromSet(appendTargetsSet) + if len(allKeys) > 0 { + logger.Warn(ctx, fmt.Sprintf("the following targets:%v start up fetch real time data process goroutine not started", allKeys)) + clear(appendTargetsSet) } } // removeTargets define func to stops running polling goroutines for targets that were removed -func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, targetsToRemove []string) { - for _, target := range targetsToRemove { +func removeTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, removeTargets []string) { + for _, target := range removeTargets { + targetContext, exist := config.targetContext[target] + if !exist { + logger.Warn(ctx, "removeTarget does not exist in targetContext map, skipping remove operation", "target", target) + continue + } + stopChan, exists := stopChanMap[target] if !exists { - logger.Warn(ctx, "removeTarget was not running, skipping", "target", target) + logger.Warn(ctx, "removeTarget was not running, skipping remove operation", "target", target) continue } close(stopChan) delete(stopChanMap, target) + delete(config.targetContext, target) + + interval := targetContext.interval + measurementTargets, mesExist := config.measurements[interval] + if !mesExist { + logger.Warn(ctx, "targetContext exists but measurements is missing, cannot perform remove operation", "interval", interval, "target", target) + continue + } + measurementTargets = util.RemoveTargetsFromSliceSimple(measurementTargets, []string{target}) + config.measurements[interval] = measurementTargets + logger.Info(ctx, "stopped polling goroutine for removed target", "target", target) } } diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index d8c32be..ecf5c78 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -90,6 +90,8 @@ func RealTimeSubHandler(c *gin.Context) { } if request.Action == constants.SubStartAction && request.ClientID == "" { + // TODO 调试输出,待删除 + fmt.Println("00000") subAction = request.Action id, err := uuid.NewV4() if err != nil { @@ -102,6 +104,8 @@ func RealTimeSubHandler(c *gin.Context) { } clientID = id.String() } else if request.Action == constants.SubStartAction && request.ClientID != "" { + // TODO 调试输出,待删除 + fmt.Println("11111") subAction = constants.SubAppendAction clientID = request.ClientID } else if request.Action == constants.SubStopAction && request.ClientID != "" { @@ -116,7 +120,7 @@ func RealTimeSubHandler(c *gin.Context) { switch subAction { case constants.SubStartAction: - results, err := globalSubState.CreateConfig(c, tx, clientID, request.Components) + results, err := globalSubState.CreateConfig(c, tx, clientID, request.Measurements) if err != nil { logger.Error(c, "create real time data subscription config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ @@ -140,7 +144,7 @@ func RealTimeSubHandler(c *gin.Context) { }) return case constants.SubStopAction: - results, err := globalSubState.RemoveTargets(c, clientID, request.Components) + results, err := globalSubState.RemoveTargets(c, clientID, request.Measurements) if err != nil { logger.Error(c, "remove target to real time data subscription config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ @@ -164,7 +168,9 @@ func RealTimeSubHandler(c *gin.Context) { }) return case constants.SubAppendAction: - results, err := globalSubState.AppendTargets(c, tx, clientID, request.Components) + // TODO 调试输出,待删除 + fmt.Println("22222") + results, err := globalSubState.AppendTargets(c, tx, clientID, request.Measurements) if err != nil { logger.Error(c, "append target to real time data subscription config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ @@ -190,8 +196,8 @@ func RealTimeSubHandler(c *gin.Context) { default: err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedAction, request.Action) logger.Error(c, "unsupported action of real time data subscription request", "error", err) - requestTargetsCount := processRealTimeRequestCount(request.Components) - results := processRealTimeRequestTargets(request.Components, requestTargetsCount, err) + requestTargetsCount := processRealTimeRequestCount(request.Measurements) + results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), @@ -204,17 +210,23 @@ func RealTimeSubHandler(c *gin.Context) { } } -// RealTimeSubComponent define struct of real time subscription component -type RealTimeSubComponent struct { - targets []string - targetParam map[string]*orm.Measurement +// RealTimeSubMeasurement define struct of real time subscription measurement +type RealTimeSubMeasurement struct { + targets []string +} + +// TargetPollingContext define struct of real time pulling reverse context +type TargetPollingContext struct { + interval string + measurement *orm.Measurement } // RealTimeSubConfig define struct of real time subscription config type RealTimeSubConfig struct { - noticeChan chan *transportTargets - mutex sync.RWMutex - components map[string]*RealTimeSubComponent + noticeChan chan *transportTargets + mutex sync.RWMutex + measurements map[string][]string + targetContext map[string]*TargetPollingContext } // SharedSubState define struct of shared subscription state with mutex @@ -231,19 +243,19 @@ func NewSharedSubState() *SharedSubState { } // processAndValidateTargets define func to perform all database I/O operations in a lock-free state (eg,ParseDataIdentifierToken) -func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []network.RealTimeComponentItem, allReqTargetNum int) ( - []network.TargetResult, - map[string]*RealTimeSubComponent, - []string, +func processAndValidateTargets(ctx context.Context, tx *gorm.DB, measurements []network.RealTimeMeasurementItem, allReqTargetNum int) ( + []network.TargetResult, []string, + map[string][]string, + map[string]*TargetPollingContext, ) { targetProcessResults := make([]network.TargetResult, 0, allReqTargetNum) - newComponentsMap := make(map[string]*RealTimeSubComponent) + newMeasMap := make(map[string][]string) successfulTargets := make([]string, 0, allReqTargetNum) + newMeasContextMap := make(map[string]*TargetPollingContext) - for _, componentItem := range components { - interval := componentItem.Interval - - for _, target := range componentItem.Targets { + for _, measurementItem := range measurements { + interval := measurementItem.Interval + for _, target := range measurementItem.Targets { var targetResult network.TargetResult targetResult.ID = target targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) @@ -259,38 +271,39 @@ func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []ne targetProcessResults = append(targetProcessResults, targetResult) successfulTargets = append(successfulTargets, target) - if _, ok := newComponentsMap[interval]; !ok { - newComponentsMap[interval] = &RealTimeSubComponent{ - targets: make([]string, 0, len(componentItem.Targets)), - targetParam: make(map[string]*orm.Measurement), - } + if _, ok := newMeasMap[interval]; !ok { + newMeasMap[interval] = make([]string, 0, len(measurementItem.Targets)) } - comp := newComponentsMap[interval] - comp.targets = append(comp.targets, target) - comp.targetParam[target] = targetModel.GetMeasurementInfo() + meas := newMeasMap[interval] + meas = append(meas, target) + newMeasContextMap[target] = &TargetPollingContext{ + interval: interval, + measurement: targetModel.GetMeasurementInfo(), + } } } - return targetProcessResults, newComponentsMap, successfulTargets + return targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap } -// mergeComponents define func to merge newComponentsMap into existingComponentsMap -func mergeComponents(existingComponents map[string]*RealTimeSubComponent, newComponents map[string]*RealTimeSubComponent) { - for interval, newComp := range newComponents { - if existingComp, ok := existingComponents[interval]; ok { - existingComp.targets = append(existingComp.targets, newComp.targets...) - maps.Copy(existingComp.targetParam, newComp.targetParam) +// mergeMeasurements define func to merge newMeasurementsMap into existingMeasurementsMap +func mergeMeasurements(config *RealTimeSubConfig, newMeasurements map[string][]string, newMeasurementsContextMap map[string]*TargetPollingContext) { + for interval, newMeas := range newMeasurements { + if existingMeas, ok := config.measurements[interval]; ok { + existingMeas = append(existingMeas, newMeas...) + config.measurements[interval] = existingMeas + maps.Copy(config.targetContext, newMeasurementsContextMap) } else { - existingComponents[interval] = newComp + config.measurements[interval] = newMeas } } } // CreateConfig define function to create config in SharedSubState -func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { +func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) - targetProcessResults, newComponentsMap, _ := processAndValidateTargets(ctx, tx, components, requestTargetsCount) - + targetProcessResults, _, newMeasurementsMap, MeasurementInfos := processAndValidateTargets(ctx, tx, components, requestTargetsCount) + fmt.Println(MeasurementInfos) s.globalMutex.Lock() if _, exist := s.subMap[clientID]; exist { s.globalMutex.Unlock() @@ -300,8 +313,9 @@ func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID } config := &RealTimeSubConfig{ - noticeChan: make(chan *transportTargets), - components: newComponentsMap, // 直接使用预构建的 Map + noticeChan: make(chan *transportTargets, constants.NoticeChanCap), + measurements: newMeasurementsMap, + targetContext: MeasurementInfos, } s.subMap[clientID] = config s.globalMutex.Unlock() @@ -309,24 +323,23 @@ func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID } // AppendTargets define function to append targets in SharedSubState -func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { +func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) - targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) s.globalMutex.RLock() config, exist := s.subMap[clientID] + s.globalMutex.RUnlock() + if !exist { - s.globalMutex.RUnlock() err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID) logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err) return processRealTimeRequestTargets(components, requestTargetsCount, err), err } - s.globalMutex.RUnlock() - targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount) + targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargets(ctx, tx, components, requestTargetsCount) config.mutex.Lock() - mergeComponents(config.components, newComponentsMap) + mergeMeasurements(config, newMeasMap, newMeasContextMap) config.mutex.Unlock() if len(successfulTargets) > 0 { @@ -337,53 +350,13 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI config.noticeChan <- transportTargets } - transportTargets := &transportTargets{ - OperationType: constants.OpAppend, - Targets: make([]string, requestTargetsCount), - } - config.mutex.Lock() - for _, componentItem := range components { - interval := componentItem.Interval - for _, target := range componentItem.Targets { - var targetResult network.TargetResult - targetResult.ID = target - - targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) - if err != nil { - logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) - targetResult.Code = constants.SubFailedCode - targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) - targetProcessResults = append(targetProcessResults, targetResult) - continue - } - - targetResult.Code = constants.SubSuccessCode - targetResult.Msg = constants.SubSuccessMsg - targetProcessResults = append(targetProcessResults, targetResult) - if comp, ok := config.components[interval]; !ok { - targets := make([]string, 0, len(componentItem.Targets)) - targetParam := make(map[string]*orm.Measurement) - targetParam[target] = targetModel.GetMeasurementInfo() - config.components[interval] = &RealTimeSubComponent{ - targets: append(targets, target), - } - } else { - comp.targets = append(comp.targets, target) - comp.targetParam[target] = targetModel.GetMeasurementInfo() - } - transportTargets.Targets = append(transportTargets.Targets, target) - } - } - config.mutex.Unlock() - - config.noticeChan <- transportTargets return targetProcessResults, nil } // UpsertTargets define function to upsert targets in SharedSubState -func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { - requestTargetsCount := processRealTimeRequestCount(components) - targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount) +func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { + requestTargetsCount := processRealTimeRequestCount(measurements) + targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargets(ctx, tx, measurements, requestTargetsCount) s.globalMutex.RLock() config, exist := s.subMap[clientID] @@ -393,21 +366,21 @@ func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientI if exist { opType = constants.OpUpdate config.mutex.Lock() - mergeComponents(config.components, newComponentsMap) + mergeMeasurements(config, newMeasMap, newMeasContextMap) config.mutex.Unlock() } else { opType = constants.OpAppend s.globalMutex.Lock() if config, exist = s.subMap[clientID]; !exist { config = &RealTimeSubConfig{ - noticeChan: make(chan *transportTargets), - components: newComponentsMap, + noticeChan: make(chan *transportTargets, constants.NoticeChanCap), + measurements: newMeasMap, } s.subMap[clientID] = config } else { s.globalMutex.Unlock() config.mutex.Lock() - mergeComponents(config.components, newComponentsMap) + mergeMeasurements(config, newMeasMap, newMeasContextMap) config.mutex.Unlock() } s.globalMutex.Unlock() @@ -424,8 +397,8 @@ func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientI } // RemoveTargets define function to remove targets in SharedSubState -func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { - requestTargetsCount := processRealTimeRequestCount(components) +func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { + requestTargetsCount := processRealTimeRequestCount(measurements) targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) s.globalMutex.RLock() @@ -434,24 +407,24 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, com s.globalMutex.RUnlock() err := fmt.Errorf("clientID %s not found", clientID) logger.Error(ctx, "clientID not found in remove targets operation", "error", err) - return processRealTimeRequestTargets(components, requestTargetsCount, err), err + return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err } s.globalMutex.RUnlock() var shouldRemoveClient bool - // components is the list of items to be removed passed in the request + // measurements is the list of items to be removed passed in the request transportTargets := &transportTargets{ OperationType: constants.OpRemove, Targets: make([]string, 0, requestTargetsCount), } config.mutex.Lock() - for _, compent := range components { - interval := compent.Interval - // comp is the locally running listener configuration - comp, compExist := config.components[interval] - if !compExist { - logger.Error(ctx, fmt.Sprintf("component with interval %s not found under clientID %s", interval, clientID), "clientID", clientID, "interval", interval) - for _, target := range compent.Targets { + for _, measurement := range measurements { + interval := measurement.Interval + // meas is the locally running listener configuration + measTargets, measExist := config.measurements[interval] + if !measExist { + logger.Error(ctx, fmt.Sprintf("measurement with interval %s not found under clientID %s", interval, clientID), "clientID", clientID, "interval", interval) + for _, target := range measTargets { targetResult := network.TargetResult{ ID: target, Code: constants.CancelSubFailedCode, @@ -463,12 +436,12 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, com } targetsToRemoveMap := make(map[string]struct{}) - for _, target := range compent.Targets { + for _, target := range measurement.Targets { targetsToRemoveMap[target] = struct{}{} } var newTargets []string - for _, existingTarget := range comp.targets { + for _, existingTarget := range measTargets { if _, found := targetsToRemoveMap[existingTarget]; !found { newTargets = append(newTargets, existingTarget) } else { @@ -480,16 +453,16 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, com } targetProcessResults = append(targetProcessResults, targetResult) delete(targetsToRemoveMap, existingTarget) - delete(comp.targetParam, existingTarget) + delete(config.targetContext, existingTarget) } } - comp.targets = newTargets + measTargets = newTargets - if len(comp.targets) == 0 { - delete(config.components, interval) + if len(measTargets) == 0 { + delete(config.measurements, interval) } - if len(config.components) == 0 { + if len(config.measurements) == 0 { shouldRemoveClient = true } @@ -511,7 +484,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, com if shouldRemoveClient { s.globalMutex.Lock() - if currentConfig, exist := s.subMap[clientID]; exist && len(currentConfig.components) == 0 { + if currentConfig, exist := s.subMap[clientID]; exist && len(currentConfig.measurements) == 0 { delete(s.subMap, clientID) } s.globalMutex.Unlock() @@ -533,7 +506,7 @@ func (s *SharedSubState) Get(clientID string) (*RealTimeSubConfig, bool) { // TODO 增加一个update 函数用来更新 interval -func processRealTimeRequestCount(components []network.RealTimeComponentItem) int { +func processRealTimeRequestCount(components []network.RealTimeMeasurementItem) int { totalTargetsCount := 0 for _, compItem := range components { totalTargetsCount += len(compItem.Targets) @@ -541,7 +514,7 @@ func processRealTimeRequestCount(components []network.RealTimeComponentItem) int return totalTargetsCount } -func processRealTimeRequestTargets(components []network.RealTimeComponentItem, targetCount int, err error) []network.TargetResult { +func processRealTimeRequestTargets(components []network.RealTimeMeasurementItem, targetCount int, err error) []network.TargetResult { targetProcessResults := make([]network.TargetResult, 0, targetCount) for _, componentItem := range components { for _, target := range componentItem.Targets { diff --git a/network/real_time_data_request.go b/network/real_time_data_request.go index 2dc51ab..7ace3b6 100644 --- a/network/real_time_data_request.go +++ b/network/real_time_data_request.go @@ -7,10 +7,10 @@ type RealTimeQueryRequest struct { // enum: [start, stop] Action string `json:"action" example:"start" description:"请求的操作,例如 start/stop"` // TODO 增加monitorID的example值说明 - MonitorID string `json:"monitor_id" example:"xxxx" description:"用于标识不同client的监控请求ID"` + ClientID string `json:"client_id" example:"xxxx" description:"用于标识不同client的监控请求ID"` // required: true - Components []RealTimeComponentItem `json:"components" description:"定义不同的数据采集策略和目标"` + Measurements []RealTimeMeasurementItem `json:"measurements" description:"定义不同的数据采集策略和目标"` } // RealTimeSubRequest define struct of real time data subscription request @@ -20,11 +20,11 @@ type RealTimeSubRequest struct { Action string `json:"action" example:"start" description:"请求的操作,例如 start/stop"` ClientID string `json:"client_id" example:"5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" description:"用于标识不同client的监控请求ID"` // required: true - Components []RealTimeComponentItem `json:"components" description:"定义不同的数据采集策略和目标"` + Measurements []RealTimeMeasurementItem `json:"measurements" description:"定义不同的数据采集策略和目标"` } -// RealTimeComponentItem define struct of real time component item -type RealTimeComponentItem struct { +// RealTimeMeasurementItem define struct of real time measurement item +type RealTimeMeasurementItem struct { Interval string `json:"interval" example:"1" description:"数据采集的时间间隔(秒)"` Targets []string `json:"targets" example:"[\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms\",\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms\"]" description:"需要采集数据的测点或标签名称列表"` } diff --git a/util/map.go b/util/map.go new file mode 100644 index 0000000..e6f3116 --- /dev/null +++ b/util/map.go @@ -0,0 +1,11 @@ +// Package util provide some utility functions +package util + +// GetKeysFromSet define func to get all keys from a map[string]struct{} +func GetKeysFromSet(set map[string]struct{}) []string { + keys := make([]string, 0, len(set)) + for key := range set { + keys = append(keys, key) + } + return keys +} diff --git a/util/string.go b/util/string.go new file mode 100644 index 0000000..dac134a --- /dev/null +++ b/util/string.go @@ -0,0 +1,18 @@ +// Package util provide some utility functions +package util + +// RemoveTargetsFromSliceSimple define func to remove targets from a slice of strings +func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []string) []string { + targetsToRemoveSet := make(map[string]struct{}, len(targetsToRemove)) + for _, target := range targetsToRemove { + targetsToRemoveSet[target] = struct{}{} + } + + for i := len(targetsSlice) - 1; i >= 0; i-- { + if _, found := targetsToRemoveSet[targetsSlice[i]]; found { + targetsSlice[i] = targetsSlice[len(targetsSlice)-1] + targetsSlice = targetsSlice[:len(targetsSlice)-1] + } + } + return targetsSlice +}