diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index 47c1a05..f54d756 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -4,7 +4,9 @@ package handler import ( "context" "fmt" + "maps" "net/http" + "slices" "sort" "strconv" "time" @@ -14,7 +16,6 @@ import ( "modelRT/logger" "modelRT/model" "modelRT/network" - "modelRT/util" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" @@ -219,8 +220,7 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin case constants.OpRemove: removeTargets(ctx, stopChanMap, transportTargets.Targets) case constants.OpUpdate: - // TODO 处理更新操作 - fmt.Println(11111) + updateTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) } config.mutex.Unlock() case <-ctx.Done(): @@ -241,7 +241,7 @@ func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m for _, target := range appendTargets { targetContext, exists := config.targetContext[target] if !exists { - logger.Error(ctx, "the append target does not exists in the real time data config context map,skipping the startup step", "target", target) + logger.Error(ctx, "the append target does not exist in the real time data config context map,skipping the startup step", "target", target) continue } @@ -254,13 +254,11 @@ func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m stopChanMap[target] = queryGStopChan interval := targetContext.interval - measurementTargets, ok := config.measurements[interval] - if !ok { - logger.Error(ctx, "targetContext exists but measurements is missing, cannot update config", "target", target, "interval", interval) + _, exists = config.measurements[interval] + if !exists { + logger.Error(ctx, "targetContext exist but measurements is missing, cannot update config", "target", target, "interval", interval) continue } - measurementTargets = append(measurementTargets, target) - config.targetContext[target] = targetContext delete(appendTargetsSet, target) queryKey, err := model.GenerateMeasureIdentifier(targetContext.measurement.DataSource) @@ -279,13 +277,72 @@ func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m logger.Info(ctx, "started new polling goroutine for appended target", "target", target, "interval", targetContext.interval) } - allKeys := util.GetKeysFromSet(appendTargetsSet) + // allKeys := util.GetKeysFromSet(appendTargetsSet) + allKeys := slices.Sorted(maps.Keys(appendTargetsSet)) if len(allKeys) > 0 { logger.Warn(ctx, fmt.Sprintf("the following targets:%v start up fetch real time data process goroutine not started", allKeys)) clear(appendTargetsSet) } } +// updateTargets starts new polling goroutines for targets that were just updated +func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, updateTargets []string) { + updateTargetsSet := make(map[string]struct{}, len(updateTargets)) + for _, target := range updateTargets { + updateTargetsSet[target] = struct{}{} + } + + for _, target := range updateTargets { + targetContext, exists := config.targetContext[target] + if !exists { + logger.Error(ctx, "the update target does not exist in the real time data config context map,skipping the startup step", "target", target) + continue + } + + if _, exist := stopChanMap[target]; !exist { + logger.Error(ctx, "the update target does not has a stop channel, skipping the startup step", "target", target) + continue + } + + oldQueryGStopChan := stopChanMap[target] + logger.Info(ctx, "stopped old polling goroutine for updated target", "target", target) + close(oldQueryGStopChan) + + newQueryGStopChan := make(chan struct{}) + stopChanMap[target] = newQueryGStopChan + + interval := targetContext.interval + _, exists = config.measurements[interval] + if !exists { + logger.Error(ctx, "targetContext exist but measurements is missing, cannot update config", "target", target, "interval", interval) + continue + } + delete(updateTargetsSet, target) + + queryKey, err := model.GenerateMeasureIdentifier(targetContext.measurement.DataSource) + if err != nil { + logger.Error(ctx, "the update target generate redis query key identifier failed", "target", target, "error", err) + continue + } + pollingConfig := redisPollingConfig{ + targetID: target, + queryKey: queryKey, + interval: targetContext.interval, + dataSize: int64(targetContext.measurement.Size), + } + go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, newQueryGStopChan) + + logger.Info(ctx, "started new polling goroutine for update target", "target", target, "interval", targetContext.interval) + } + + // allKeys := util.GetKeysFromSet(updateTargetsSet) + allKeys := slices.Sorted(maps.Keys(updateTargetsSet)) + if len(allKeys) > 0 { + logger.Warn(ctx, fmt.Sprintf("the following targets:%v start up fetch real time data process goroutine not started", allKeys)) + clear(updateTargetsSet) + } +} + // removeTargets define func to stops running polling goroutines for targets that were removed func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string) { for _, target := range removeTargets { diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index 607e267..536f3c8 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -293,7 +293,7 @@ func processAndValidateTargetsForStart(ctx context.Context, tx *gorm.DB, measure targetProcessResults = append(targetProcessResults, targetResult) successfulTargets = append(successfulTargets, target) - if _, ok := newMeasMap[interval]; !ok { + if _, exists := newMeasMap[interval]; !exists { newMeasMap[interval] = make([]string, 0, len(measurementItem.Targets)) } @@ -347,7 +347,7 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config targetProcessResults = append(targetProcessResults, targetResult) successfulTargets = append(successfulTargets, target) - if _, ok := newMeasMap[interval]; !ok { + if _, exists := newMeasMap[interval]; !exists { newMeasMap[interval] = make([]string, 0, len(measurementItem.Targets)) } @@ -367,7 +367,7 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config func mergeMeasurementsForStart(config *RealTimeSubConfig, newMeasurements map[string][]string, newMeasurementsContextMap map[string]*TargetPollingContext) []string { allDuplicates := make([]string, 0) for interval, newMeas := range newMeasurements { - if existingMeas, ok := config.measurements[interval]; ok { + if existingMeas, exists := config.measurements[interval]; exists { // deduplication operations prevent duplicate subscriptions to the same measurement node deduplicated, duplicates := util.DeduplicateAndReportDuplicates(existingMeas, newMeas) @@ -395,7 +395,7 @@ func mergeMeasurementsForUpdate(config *RealTimeSubConfig, newMeasurements map[s for _, newMeas := range newMeasurements { for _, measurement := range newMeas { oldInterval := config.targetContext[measurement].interval - if _, ok := delMeasMap[oldInterval]; !ok { + if _, exists := delMeasMap[oldInterval]; !exists { delMeasurements := []string{measurement} delMeasMap[oldInterval] = delMeasurements } else { @@ -417,7 +417,7 @@ func mergeMeasurementsForUpdate(config *RealTimeSubConfig, newMeasurements map[s } for interval, newMeas := range newMeasurements { - if existingMeas, ok := config.measurements[interval]; ok { + if existingMeas, exists := config.measurements[interval]; exists { deduplicated, duplicates := util.DeduplicateAndReportDuplicates(existingMeas, newMeas) if len(duplicates) > 0 { @@ -704,8 +704,8 @@ func (s *SharedSubState) Get(clientID string) (*RealTimeSubConfig, bool) { s.globalMutex.RLock() defer s.globalMutex.RUnlock() - config, ok := s.subMap[clientID] - if !ok { + config, exists := s.subMap[clientID] + if !exists { return nil, false } return config, true diff --git a/model/redis_recommend.go b/model/redis_recommend.go index c885cde..32fdb17 100644 --- a/model/redis_recommend.go +++ b/model/redis_recommend.go @@ -13,6 +13,7 @@ import ( "github.com/RediSearch/redisearch-go/v2/redisearch" redigo "github.com/gomodule/redigo/redis" + "github.com/redis/go-redis/v9" ) var ac *redisearch.Autocompleter @@ -28,25 +29,30 @@ func RedisSearchRecommend(ctx context.Context, input string) ([]string, bool, er if input == "" { // 返回所有 grid 名 - return getAllGridKeys(ctx, constants.RedisAllGridSetKey) + return getKeyBySpecificsLevel(ctx, rdb, 1, input) } inputSlice := strings.Split(input, ".") inputSliceLen := len(inputSlice) - originInputLen := len(inputSlice) switch inputSliceLen { case 1: - // TODO 优化成NewSet的形式 - gridExist, err := rdb.SIsMember(ctx, constants.RedisAllGridSetKey, input).Result() + // grid search + gridSearchInput := inputSlice[0] + gridExists, err := rdb.SIsMember(ctx, constants.RedisAllGridSetKey, gridSearchInput).Result() if err != nil { logger.Error(ctx, "check grid key exist failed ", "grid_key", input, "error", err) return []string{}, false, err } - searchInput := input - inputLen := inputSliceLen - for inputLen != 0 && !gridExist { + if gridExists { + return []string{"."}, false, err + } + + // start grid fuzzy search + searchInput := gridSearchInput + searchInputLen := len(searchInput) + for searchInputLen != 0 && !gridExists { results, err := ac.SuggestOpts(searchInput, redisearch.SuggestOptions{ Num: math.MaxInt16, Fuzzy: true, @@ -54,33 +60,196 @@ func RedisSearchRecommend(ctx context.Context, input string) ([]string, bool, er WithPayloads: false, }) if err != nil { - logger.Error(ctx, "query info by fuzzy failed", "query_key", input, "error", err) + logger.Error(ctx, "query grid key by redis fuzzy search failed", "query_key", searchInput, "error", err) return []string{}, false, err } if len(results) == 0 { - // TODO 考虑使用其他方式代替for 循环退一字节的查询方式 + // TODO 考虑使用其他方式代替 for 循环退一字节的查询方式 searchInput = searchInput[:len(searchInput)-1] - inputLen = len(searchInput) + searchInputLen = len(searchInput) continue } var recommends []string for _, result := range results { termSlice := strings.Split(result.Term, ".") - if len(termSlice) <= originInputLen { + if len(termSlice) <= inputSliceLen { recommends = append(recommends, result.Term) } } - // 返回模糊查询结果 + // return fuzzy search results return recommends, true, nil } - - // 处理 input 不为空、不含有.并且 input 是一个完整的 grid key 的情况 - if strings.HasSuffix(input, ".") == false { - recommend := input + "." - return []string{recommend}, false, nil + case 2: + // zone search + zoneSearchInput := inputSlice[1] + if zoneSearchInput == "" { + specificalGrid := inputSlice[0] + allZones, isFuzzy, err := getKeyBySpecificsLevel(ctx, rdb, inputSliceLen, specificalGrid) + recommandResults := combineQueryResultByInput(inputSliceLen, inputSlice, allZones) + return recommandResults, isFuzzy, err } + + zoneExists, err := rdb.SIsMember(ctx, constants.RedisAllZoneSetKey, zoneSearchInput).Result() + if err != nil { + logger.Error(ctx, "check zone key exist failed ", "zone_key", zoneSearchInput, "error", err) + return []string{}, false, err + } + + if zoneExists { + return []string{"."}, false, err + } + + // start zone fuzzy search + searchInput := zoneSearchInput + searchInputLen := len(searchInput) + for searchInputLen != 0 && !zoneExists { + results, err := ac.SuggestOpts(searchInput, redisearch.SuggestOptions{ + Num: math.MaxInt16, + Fuzzy: true, + WithScores: false, + WithPayloads: false, + }) + if err != nil { + logger.Error(ctx, "query zone key by redis fuzzy search failed", "query_key", searchInput, "error", err) + return []string{}, false, err + } + + if len(results) == 0 { + // TODO 考虑使用其他方式代替 for 循环退一字节的查询方式 + searchInput = searchInput[:len(searchInput)-1] + searchInputLen = len(searchInput) + continue + } + + var recommends []string + for _, result := range results { + termSlice := strings.Split(result.Term, ".") + if len(termSlice) <= inputSliceLen { + recommends = append(recommends, result.Term) + } + } + // return fuzzy search results + return combineQueryResultByInput(inputSliceLen, inputSlice, recommends), true, nil + } + case 3: + // station search + stationSearchInput := inputSlice[2] + fmt.Println(stationSearchInput) + if stationSearchInput == "" { + specificalZone := inputSlice[1] + allStations, isFuzzy, err := getKeyBySpecificsLevel(ctx, rdb, inputSliceLen, specificalZone) + recommandResults := combineQueryResultByInput(inputSliceLen, inputSlice, allStations) + return recommandResults, isFuzzy, err + } + + stationExists, err := rdb.SIsMember(ctx, constants.RedisAllStationSetKey, stationSearchInput).Result() + if err != nil { + logger.Error(ctx, "check station key exist failed ", "station_key", stationSearchInput, "error", err) + return []string{}, false, err + } + + if stationExists { + return []string{"."}, false, err + } + + // start grid fuzzy search + searchInput := stationSearchInput + searchInputLen := len(searchInput) + for searchInputLen != 0 && !stationExists { + results, err := ac.SuggestOpts(searchInput, redisearch.SuggestOptions{ + Num: math.MaxInt16, + Fuzzy: true, + WithScores: false, + WithPayloads: false, + }) + if err != nil { + logger.Error(ctx, "query station key by redis fuzzy search failed", "query_key", searchInput, "error", err) + return []string{}, false, err + } + + if len(results) == 0 { + // TODO 考虑使用其他方式代替 for 循环退一字节的查询方式 + searchInput = searchInput[:len(searchInput)-1] + searchInputLen = len(searchInput) + continue + } + + var recommends []string + for _, result := range results { + termSlice := strings.Split(result.Term, ".") + if len(termSlice) <= inputSliceLen { + recommends = append(recommends, result.Term) + } + } + // return fuzzy search results + return combineQueryResultByInput(inputSliceLen, inputSlice, recommends), true, nil + } + case 4: + // component nspath search + componentSearchInput := inputSlice[3] + if componentSearchInput == "" { + specificalStation := inputSlice[1] + allComponents, isFuzzy, err := getKeyBySpecificsLevel(ctx, rdb, inputSliceLen, specificalStation) + recommandResults := combineQueryResultByInput(inputSliceLen, inputSlice, allComponents) + return recommandResults, isFuzzy, err + } + + componentExists, err := rdb.SIsMember(ctx, constants.RedisAllStationSetKey, componentSearchInput).Result() + if err != nil { + logger.Error(ctx, "check component key exist failed ", "component_key", componentSearchInput, "error", err) + return []string{}, false, err + } + + if componentExists { + return []string{"."}, false, err + } + + // start grid fuzzy search + searchInput := componentSearchInput + searchInputLen := len(searchInput) + for searchInputLen != 0 && !componentExists { + results, err := ac.SuggestOpts(searchInput, redisearch.SuggestOptions{ + Num: math.MaxInt16, + Fuzzy: true, + WithScores: false, + WithPayloads: false, + }) + if err != nil { + logger.Error(ctx, "query station key by redis fuzzy search failed", "query_key", searchInput, "error", err) + return []string{}, false, err + } + + if len(results) == 0 { + // TODO 考虑使用其他方式代替 for 循环退一字节的查询方式 + searchInput = searchInput[:len(searchInput)-1] + searchInputLen = len(searchInput) + continue + } + + var recommends []string + for _, result := range results { + termSlice := strings.Split(result.Term, ".") + if len(termSlice) <= inputSliceLen { + recommends = append(recommends, result.Term) + } + } + // return fuzzy search results + return combineQueryResultByInput(inputSliceLen, inputSlice, recommends), true, nil + } + case 5: + // component tag search + compTagSearchInput := inputSlice[4] + fmt.Println(compTagSearchInput) + case 6: + // configuration search + configSearchInput := inputSlice[5] + fmt.Println(configSearchInput) + case 7: + // measurement search + measSearchInput := inputSlice[6] + fmt.Println(measSearchInput) default: lastInput := inputSlice[inputSliceLen-1] // 判断 queryKey 是否是空值,空值则返回上一级别下的所有key @@ -141,31 +310,37 @@ func RedisSearchRecommend(ctx context.Context, input string) ([]string, bool, er return []string{}, false, nil } -func getAllGridKeys(ctx context.Context, setKey string) ([]string, bool, error) { - // 从redis set 中获取所有的 grid key - gridSets := diagram.NewRedisSet(ctx, setKey, 10, true) - keys, err := gridSets.SMembers("grid_keys") +func getKeyBySpecificsLevel(ctx context.Context, rdb *redis.Client, inputLen int, input string) ([]string, bool, error) { + queryKey := getSpecificKeyByLength(inputLen, input) + results, err := rdb.SMembers(ctx, queryKey).Result() if err != nil { - return []string{}, false, fmt.Errorf("get all root keys failed, error: %v", err) - } - return keys, false, nil -} - -func getSpecificZoneKeys(ctx context.Context, input string) ([]string, bool, error) { - setKey := fmt.Sprintf(constants.RedisSpecGridZoneSetKey, input) - zoneSets := diagram.NewRedisSet(ctx, setKey, 10, true) - keys, err := zoneSets.SMembers(setKey) - if err != nil { - return []string{}, false, fmt.Errorf("get all root keys failed, error: %v", err) - } - var results []string - for _, key := range keys { - result := input + "." + key - results = append(results, result) + return []string{}, false, fmt.Errorf("get all root keys failed, error: %w", err) } return results, false, nil } +func combineQueryResultByInput(inputSliceLen int, inputSlice []string, queryResults []string) []string { + prefixs := make([]string, 0, len(inputSlice)) + recommandResults := make([]string, 0, len(queryResults)) + switch inputSliceLen { + case 2: + prefixs = []string{inputSlice[0]} + case 3: + prefixs = inputSlice[0:2] + default: + return []string{} + } + + for _, queryResult := range queryResults { + combineStrs := make([]string, 0, len(inputSlice)) + combineStrs = append(combineStrs, prefixs...) + combineStrs = append(combineStrs, queryResult) + recommandResult := strings.Join(combineStrs, ".") + recommandResults = append(recommandResults, recommandResult) + } + return recommandResults +} + func getConstantsKeyByLength(inputLen int) string { switch inputLen { case 1: @@ -181,6 +356,21 @@ func getConstantsKeyByLength(inputLen int) string { } } +func getSpecificKeyByLength(inputLen int, input string) string { + switch inputLen { + case 1: + return constants.RedisAllGridSetKey + case 2: + return fmt.Sprintf(constants.RedisSpecGridZoneSetKey, input) + case 3: + return fmt.Sprintf(constants.RedisSpecZoneStationSetKey, input) + case 4: + return fmt.Sprintf(constants.RedisSpecStationComponentSetKey, input) + default: + return constants.RedisAllGridSetKey + } +} + func getCombinedConstantsKeyByLength(key string, inputLen int) string { switch inputLen { case 2: