From fca6905d748381d0658b29835f0e1b7b7205493e Mon Sep 17 00:00:00 2001 From: douxu Date: Thu, 27 Nov 2025 16:59:03 +0800 Subject: [PATCH] optimize real time data pulling and subscription api --- constants/subscription.go | 12 ++++ handler/real_time_data_pull.go | 37 ++--------- handler/real_time_data_subscription.go | 89 ++++++++++++++++++-------- network/response.go | 2 +- util/string.go | 26 ++++++++ 5 files changed, 107 insertions(+), 59 deletions(-) diff --git a/constants/subscription.go b/constants/subscription.go index ca379d2..fa22f43 100644 --- a/constants/subscription.go +++ b/constants/subscription.go @@ -16,10 +16,16 @@ const ( SubSuccessCode = "1001" // SubSuccessCode define subscription failed code SubFailedCode = "1002" + // RTDSuccessCode define real time data resturn success code + RTDSuccessCode = "1003" + // SubSuccessCode define real time data resturn failed code + RTDFailedCode = "1004" // CancelSubSuccessCode define cancel subscription success code CancelSubSuccessCode = "1005" // CancelSubFailedCode define cancel subscription failed code CancelSubFailedCode = "1006" + // SubRepeatCode define subscription repeat code + SubRepeatCode = "1007" ) const ( @@ -27,10 +33,16 @@ const ( SubSuccessMsg = "subscription success" // SubFailedMsg define subscription failed message SubFailedMsg = "subscription failed" + // RTDSuccessMsg define real time data resturn success message + RTDSuccessMsg = "real time data return success" + // RTDFailedMsg define real time data resturn failed message + RTDFailedMsg = "real time data return failed" // CancelSubSuccessMsg define cancel subscription success message CancelSubSuccessMsg = "cancel subscription success" // CancelSubFailedMsg define cancel subscription failed message CancelSubFailedMsg = "cancel subscription failed" + // SubRepeatMsg define subscription repeat message + SubRepeatMsg = "subscription repeat in target interval" ) // TargetOperationType define constant to the target operation type diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index 9ca221d..5d7c79b 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -166,9 +166,7 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin } s.globalMutex.RUnlock() - // TODO 测试log - fmt.Printf("found subscription config for clientID:%s, start initial polling goroutines, config: %+v\n", clientID, config.components) - logger.Info(ctx, fmt.Sprintf("found subscription config for clientID:%s, start initial polling goroutines", clientID), "components len", config.components) + logger.Info(ctx, fmt.Sprintf("found subscription config for clientID:%s, start initial polling goroutines", clientID), "components len", config.measurements) config.mutex.RLock() for interval, measurementTargets := range config.measurements { @@ -219,7 +217,7 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin case constants.OpAppend: appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) case constants.OpRemove: - removeTargets(ctx, config, stopChanMap, transportTargets.Targets) + removeTargets(ctx, stopChanMap, transportTargets.Targets) } config.mutex.Unlock() case <-ctx.Done(): @@ -239,13 +237,13 @@ func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m for _, target := range appendTargets { targetContext, exists := config.targetContext[target] - if exists { - logger.Warn(ctx, "the append target already exists in the real time data fetch process,skipping the startup step", "target", target) + if !exists { + logger.Error(ctx, "the append target does not exists in the real time data config context map,skipping the startup step", "target", target) continue } if _, exists := stopChanMap[target]; exists { - logger.Warn(ctx, "the append target already has a stop channel, skipping the startup step", "target", target) + logger.Error(ctx, "the append target already has a stop channel, skipping the startup step", "target", target) continue } @@ -286,14 +284,8 @@ func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m } // removeTargets define func to stops running polling goroutines for targets that were removed -func removeTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, removeTargets []string) { +func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string) { for _, target := range removeTargets { - targetContext, exist := config.targetContext[target] - if !exist { - logger.Warn(ctx, "removeTarget does not exist in targetContext map, skipping remove operation", "target", target) - continue - } - stopChan, exists := stopChanMap[target] if !exists { logger.Warn(ctx, "removeTarget was not running, skipping remove operation", "target", target) @@ -302,17 +294,6 @@ func removeTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m close(stopChan) delete(stopChanMap, target) - delete(config.targetContext, target) - - interval := targetContext.interval - measurementTargets, mesExist := config.measurements[interval] - if !mesExist { - logger.Warn(ctx, "targetContext exists but measurements is missing, cannot perform remove operation", "interval", interval, "target", target) - continue - } - measurementTargets = util.RemoveTargetsFromSliceSimple(measurementTargets, []string{target}) - config.measurements[interval] = measurementTargets - logger.Info(ctx, "stopped polling goroutine for removed target", "target", target) } } @@ -336,7 +317,6 @@ type redisPollingConfig struct { } func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig, fanInChan chan network.RealTimePullTarget, stopChan chan struct{}) { - // TODO 测试log,后续可删除 logger.Info(ctx, "start a redis query goroutine for real time data pulling", "targetID", config.targetID, "queryKey", config.queryKey, "interval", config.interval, "dataSize", config.dataSize) duration, err := time.ParseDuration(config.interval) if err != nil { @@ -347,11 +327,6 @@ func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig, defer ticker.Stop() client := diagram.NewRedisClient() - startTimestamp := util.GenNanoTsStr() - - fmt.Printf("realTimeDataQueryFromRedis duration:%+v\n:", duration) - fmt.Printf("realTimeDataQueryFromRedis ticker:%+v\n:", ticker) - fmt.Printf("realTimeDataQueryFromRedis startTimestamp:%s\n", startTimestamp) needPerformQuery := true for { if needPerformQuery { diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index ecf5c78..45bf7be 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -13,6 +13,7 @@ import ( "modelRT/logger" "modelRT/network" "modelRT/orm" + "modelRT/util" "github.com/gin-gonic/gin" "github.com/gofrs/uuid" @@ -90,8 +91,6 @@ func RealTimeSubHandler(c *gin.Context) { } if request.Action == constants.SubStartAction && request.ClientID == "" { - // TODO 调试输出,待删除 - fmt.Println("00000") subAction = request.Action id, err := uuid.NewV4() if err != nil { @@ -104,8 +103,6 @@ func RealTimeSubHandler(c *gin.Context) { } clientID = id.String() } else if request.Action == constants.SubStartAction && request.ClientID != "" { - // TODO 调试输出,待删除 - fmt.Println("11111") subAction = constants.SubAppendAction clientID = request.ClientID } else if request.Action == constants.SubStopAction && request.ClientID != "" { @@ -168,8 +165,6 @@ func RealTimeSubHandler(c *gin.Context) { }) return case constants.SubAppendAction: - // TODO 调试输出,待删除 - fmt.Println("22222") results, err := globalSubState.AppendTargets(c, tx, clientID, request.Measurements) if err != nil { logger.Error(c, "append target to real time data subscription config failed", "error", err) @@ -277,6 +272,7 @@ func processAndValidateTargets(ctx context.Context, tx *gorm.DB, measurements [] meas := newMeasMap[interval] meas = append(meas, target) + newMeasMap[interval] = meas newMeasContextMap[target] = &TargetPollingContext{ interval: interval, measurement: targetModel.GetMeasurementInfo(), @@ -287,23 +283,34 @@ func processAndValidateTargets(ctx context.Context, tx *gorm.DB, measurements [] } // mergeMeasurements define func to merge newMeasurementsMap into existingMeasurementsMap -func mergeMeasurements(config *RealTimeSubConfig, newMeasurements map[string][]string, newMeasurementsContextMap map[string]*TargetPollingContext) { +func mergeMeasurements(config *RealTimeSubConfig, newMeasurements map[string][]string, newMeasurementsContextMap map[string]*TargetPollingContext) []string { + allDuplicates := make([]string, 0) for interval, newMeas := range newMeasurements { if existingMeas, ok := config.measurements[interval]; ok { - existingMeas = append(existingMeas, newMeas...) - config.measurements[interval] = existingMeas - maps.Copy(config.targetContext, newMeasurementsContextMap) - } else { - config.measurements[interval] = newMeas + // deduplication operations prevent duplicate subscriptions to the same measurement node + deduplicated, duplicates := util.DeduplicateAndReportDuplicates(existingMeas, newMeas) + + if len(duplicates) > 0 { + for _, duplicate := range duplicates { + delete(newMeasurementsContextMap, duplicate) + } + allDuplicates = append(allDuplicates, duplicates...) + } + + if len(deduplicated) > 0 { + existingMeas = append(existingMeas, deduplicated...) + config.measurements[interval] = existingMeas + maps.Copy(config.targetContext, newMeasurementsContextMap) + } } } + return allDuplicates } // CreateConfig define function to create config in SharedSubState -func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { - requestTargetsCount := processRealTimeRequestCount(components) - targetProcessResults, _, newMeasurementsMap, MeasurementInfos := processAndValidateTargets(ctx, tx, components, requestTargetsCount) - fmt.Println(MeasurementInfos) +func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { + requestTargetsCount := processRealTimeRequestCount(measurements) + targetProcessResults, _, newMeasurementsMap, measurementContexts := processAndValidateTargets(ctx, tx, measurements, requestTargetsCount) s.globalMutex.Lock() if _, exist := s.subMap[clientID]; exist { s.globalMutex.Unlock() @@ -315,7 +322,7 @@ func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID config := &RealTimeSubConfig{ noticeChan: make(chan *transportTargets, constants.NoticeChanCap), measurements: newMeasurementsMap, - targetContext: MeasurementInfos, + targetContext: measurementContexts, } s.subMap[clientID] = config s.globalMutex.Unlock() @@ -323,8 +330,8 @@ func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID } // AppendTargets define function to append targets in SharedSubState -func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { - requestTargetsCount := processRealTimeRequestCount(components) +func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { + requestTargetsCount := processRealTimeRequestCount(measurements) s.globalMutex.RLock() config, exist := s.subMap[clientID] @@ -333,13 +340,18 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI if !exist { err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID) logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err) - return processRealTimeRequestTargets(components, requestTargetsCount, err), err + return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err } - targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargets(ctx, tx, components, requestTargetsCount) + targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargets(ctx, tx, measurements, requestTargetsCount) config.mutex.Lock() - mergeMeasurements(config, newMeasMap, newMeasContextMap) + allDuplicates := mergeMeasurements(config, newMeasMap, newMeasContextMap) + if len(allDuplicates) > 0 { + logger.Warn(ctx, "some targets are duplicate and have been ignored in append operation", "clientID", clientID, "duplicates", allDuplicates) + // process repeat target in targetProcessResults and successfulTargets + targetProcessResults, successfulTargets = filterAndDeduplicateRepeatTargets(targetProcessResults, successfulTargets, allDuplicates) + } config.mutex.Unlock() if len(successfulTargets) > 0 { @@ -353,6 +365,29 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI return targetProcessResults, nil } +func filterAndDeduplicateRepeatTargets(resultsSlice []network.TargetResult, idsSlice []string, duplicates []string) ([]network.TargetResult, []string) { + filteredIDs := make([]string, 0, len(idsSlice)) + set := make(map[string]struct{}, len(duplicates)) + for _, duplicate := range duplicates { + set[duplicate] = struct{}{} + } + + for index := range resultsSlice { + if _, isTarget := set[resultsSlice[index].ID]; isTarget { + resultsSlice[index].Code = constants.SubRepeatCode + resultsSlice[index].Msg = constants.SubRepeatMsg + } + } + + for _, id := range idsSlice { + if _, isTarget := set[id]; !isTarget { + filteredIDs = append(filteredIDs, id) + } + } + + return resultsSlice, filteredIDs +} + // UpsertTargets define function to upsert targets in SharedSubState func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(measurements) @@ -506,18 +541,18 @@ func (s *SharedSubState) Get(clientID string) (*RealTimeSubConfig, bool) { // TODO 增加一个update 函数用来更新 interval -func processRealTimeRequestCount(components []network.RealTimeMeasurementItem) int { +func processRealTimeRequestCount(measurements []network.RealTimeMeasurementItem) int { totalTargetsCount := 0 - for _, compItem := range components { + for _, compItem := range measurements { totalTargetsCount += len(compItem.Targets) } return totalTargetsCount } -func processRealTimeRequestTargets(components []network.RealTimeMeasurementItem, targetCount int, err error) []network.TargetResult { +func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, err error) []network.TargetResult { targetProcessResults := make([]network.TargetResult, 0, targetCount) - for _, componentItem := range components { - for _, target := range componentItem.Targets { + for _, measurementItem := range measurements { + for _, target := range measurementItem.Targets { var targetResult network.TargetResult targetResult.ID = target targetResult.Code = constants.SubFailedCode diff --git a/network/response.go b/network/response.go index 6415919..f2a2a89 100644 --- a/network/response.go +++ b/network/response.go @@ -31,6 +31,6 @@ type TargetResult struct { // RealTimeSubPayload define struct of real time data subscription request type RealTimeSubPayload struct { - ClientID string `json:"monitor_id" example:"5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" description:"用于标识不同client的监控请求ID"` + ClientID string `json:"client_id" example:"5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" description:"用于标识不同client的监控请求ID"` TargetResults []TargetResult `json:"targets"` } diff --git a/util/string.go b/util/string.go index dac134a..08192de 100644 --- a/util/string.go +++ b/util/string.go @@ -16,3 +16,29 @@ func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []strin } return targetsSlice } + +// SliceToSet define func to convert string slice to set +func SliceToSet(targetsSlice []string) map[string]struct{} { + set := make(map[string]struct{}, len(targetsSlice)) + for _, target := range targetsSlice { + set[target] = struct{}{} + } + return set +} + +// DeduplicateAndReportDuplicates define func to deduplicate a slice of strings and report duplicates +func DeduplicateAndReportDuplicates(targetsSlice []string, sourceSlice []string) (deduplicated []string, duplicates []string) { + targetSet := SliceToSet(targetsSlice) + deduplicated = make([]string, 0, len(sourceSlice)) + // duplicate items slice + duplicates = make([]string, 0, len(sourceSlice)) + + for _, source := range sourceSlice { + if _, found := targetSet[source]; found { + duplicates = append(duplicates, source) + continue + } + deduplicated = append(deduplicated, source) + } + return deduplicated, duplicates +}