optimize measurement recommend api

This commit is contained in:
douxu 2025-12-03 16:55:14 +08:00
parent 8a4116879b
commit b99c03296a
3 changed files with 301 additions and 54 deletions

View File

@ -4,7 +4,9 @@ package handler
import (
"context"
"fmt"
"maps"
"net/http"
"slices"
"sort"
"strconv"
"time"
@ -14,7 +16,6 @@ import (
"modelRT/logger"
"modelRT/model"
"modelRT/network"
"modelRT/util"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
@ -219,8 +220,7 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin
case constants.OpRemove:
removeTargets(ctx, stopChanMap, transportTargets.Targets)
case constants.OpUpdate:
// TODO 处理更新操作
fmt.Println(11111)
updateTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets)
}
config.mutex.Unlock()
case <-ctx.Done():
@ -241,7 +241,7 @@ func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
for _, target := range appendTargets {
targetContext, exists := config.targetContext[target]
if !exists {
logger.Error(ctx, "the append target does not exists in the real time data config context map,skipping the startup step", "target", target)
logger.Error(ctx, "the append target does not exist in the real time data config context map,skipping the startup step", "target", target)
continue
}
@ -254,13 +254,11 @@ func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
stopChanMap[target] = queryGStopChan
interval := targetContext.interval
measurementTargets, ok := config.measurements[interval]
if !ok {
logger.Error(ctx, "targetContext exists but measurements is missing, cannot update config", "target", target, "interval", interval)
_, exists = config.measurements[interval]
if !exists {
logger.Error(ctx, "targetContext exist but measurements is missing, cannot update config", "target", target, "interval", interval)
continue
}
measurementTargets = append(measurementTargets, target)
config.targetContext[target] = targetContext
delete(appendTargetsSet, target)
queryKey, err := model.GenerateMeasureIdentifier(targetContext.measurement.DataSource)
@ -279,13 +277,72 @@ func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
logger.Info(ctx, "started new polling goroutine for appended target", "target", target, "interval", targetContext.interval)
}
allKeys := util.GetKeysFromSet(appendTargetsSet)
// allKeys := util.GetKeysFromSet(appendTargetsSet)
allKeys := slices.Sorted(maps.Keys(appendTargetsSet))
if len(allKeys) > 0 {
logger.Warn(ctx, fmt.Sprintf("the following targets:%v start up fetch real time data process goroutine not started", allKeys))
clear(appendTargetsSet)
}
}
// updateTargets starts new polling goroutines for targets that were just updated
func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, updateTargets []string) {
updateTargetsSet := make(map[string]struct{}, len(updateTargets))
for _, target := range updateTargets {
updateTargetsSet[target] = struct{}{}
}
for _, target := range updateTargets {
targetContext, exists := config.targetContext[target]
if !exists {
logger.Error(ctx, "the update target does not exist in the real time data config context map,skipping the startup step", "target", target)
continue
}
if _, exist := stopChanMap[target]; !exist {
logger.Error(ctx, "the update target does not has a stop channel, skipping the startup step", "target", target)
continue
}
oldQueryGStopChan := stopChanMap[target]
logger.Info(ctx, "stopped old polling goroutine for updated target", "target", target)
close(oldQueryGStopChan)
newQueryGStopChan := make(chan struct{})
stopChanMap[target] = newQueryGStopChan
interval := targetContext.interval
_, exists = config.measurements[interval]
if !exists {
logger.Error(ctx, "targetContext exist but measurements is missing, cannot update config", "target", target, "interval", interval)
continue
}
delete(updateTargetsSet, target)
queryKey, err := model.GenerateMeasureIdentifier(targetContext.measurement.DataSource)
if err != nil {
logger.Error(ctx, "the update target generate redis query key identifier failed", "target", target, "error", err)
continue
}
pollingConfig := redisPollingConfig{
targetID: target,
queryKey: queryKey,
interval: targetContext.interval,
dataSize: int64(targetContext.measurement.Size),
}
go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, newQueryGStopChan)
logger.Info(ctx, "started new polling goroutine for update target", "target", target, "interval", targetContext.interval)
}
// allKeys := util.GetKeysFromSet(updateTargetsSet)
allKeys := slices.Sorted(maps.Keys(updateTargetsSet))
if len(allKeys) > 0 {
logger.Warn(ctx, fmt.Sprintf("the following targets:%v start up fetch real time data process goroutine not started", allKeys))
clear(updateTargetsSet)
}
}
// removeTargets define func to stops running polling goroutines for targets that were removed
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string) {
for _, target := range removeTargets {

View File

@ -293,7 +293,7 @@ func processAndValidateTargetsForStart(ctx context.Context, tx *gorm.DB, measure
targetProcessResults = append(targetProcessResults, targetResult)
successfulTargets = append(successfulTargets, target)
if _, ok := newMeasMap[interval]; !ok {
if _, exists := newMeasMap[interval]; !exists {
newMeasMap[interval] = make([]string, 0, len(measurementItem.Targets))
}
@ -347,7 +347,7 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
targetProcessResults = append(targetProcessResults, targetResult)
successfulTargets = append(successfulTargets, target)
if _, ok := newMeasMap[interval]; !ok {
if _, exists := newMeasMap[interval]; !exists {
newMeasMap[interval] = make([]string, 0, len(measurementItem.Targets))
}
@ -367,7 +367,7 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
func mergeMeasurementsForStart(config *RealTimeSubConfig, newMeasurements map[string][]string, newMeasurementsContextMap map[string]*TargetPollingContext) []string {
allDuplicates := make([]string, 0)
for interval, newMeas := range newMeasurements {
if existingMeas, ok := config.measurements[interval]; ok {
if existingMeas, exists := config.measurements[interval]; exists {
// deduplication operations prevent duplicate subscriptions to the same measurement node
deduplicated, duplicates := util.DeduplicateAndReportDuplicates(existingMeas, newMeas)
@ -395,7 +395,7 @@ func mergeMeasurementsForUpdate(config *RealTimeSubConfig, newMeasurements map[s
for _, newMeas := range newMeasurements {
for _, measurement := range newMeas {
oldInterval := config.targetContext[measurement].interval
if _, ok := delMeasMap[oldInterval]; !ok {
if _, exists := delMeasMap[oldInterval]; !exists {
delMeasurements := []string{measurement}
delMeasMap[oldInterval] = delMeasurements
} else {
@ -417,7 +417,7 @@ func mergeMeasurementsForUpdate(config *RealTimeSubConfig, newMeasurements map[s
}
for interval, newMeas := range newMeasurements {
if existingMeas, ok := config.measurements[interval]; ok {
if existingMeas, exists := config.measurements[interval]; exists {
deduplicated, duplicates := util.DeduplicateAndReportDuplicates(existingMeas, newMeas)
if len(duplicates) > 0 {
@ -704,8 +704,8 @@ func (s *SharedSubState) Get(clientID string) (*RealTimeSubConfig, bool) {
s.globalMutex.RLock()
defer s.globalMutex.RUnlock()
config, ok := s.subMap[clientID]
if !ok {
config, exists := s.subMap[clientID]
if !exists {
return nil, false
}
return config, true

View File

@ -13,6 +13,7 @@ import (
"github.com/RediSearch/redisearch-go/v2/redisearch"
redigo "github.com/gomodule/redigo/redis"
"github.com/redis/go-redis/v9"
)
var ac *redisearch.Autocompleter
@ -28,25 +29,30 @@ func RedisSearchRecommend(ctx context.Context, input string) ([]string, bool, er
if input == "" {
// 返回所有 grid 名
return getAllGridKeys(ctx, constants.RedisAllGridSetKey)
return getKeyBySpecificsLevel(ctx, rdb, 1, input)
}
inputSlice := strings.Split(input, ".")
inputSliceLen := len(inputSlice)
originInputLen := len(inputSlice)
switch inputSliceLen {
case 1:
// TODO 优化成NewSet的形式
gridExist, err := rdb.SIsMember(ctx, constants.RedisAllGridSetKey, input).Result()
// grid search
gridSearchInput := inputSlice[0]
gridExists, err := rdb.SIsMember(ctx, constants.RedisAllGridSetKey, gridSearchInput).Result()
if err != nil {
logger.Error(ctx, "check grid key exist failed ", "grid_key", input, "error", err)
return []string{}, false, err
}
searchInput := input
inputLen := inputSliceLen
for inputLen != 0 && !gridExist {
if gridExists {
return []string{"."}, false, err
}
// start grid fuzzy search
searchInput := gridSearchInput
searchInputLen := len(searchInput)
for searchInputLen != 0 && !gridExists {
results, err := ac.SuggestOpts(searchInput, redisearch.SuggestOptions{
Num: math.MaxInt16,
Fuzzy: true,
@ -54,33 +60,196 @@ func RedisSearchRecommend(ctx context.Context, input string) ([]string, bool, er
WithPayloads: false,
})
if err != nil {
logger.Error(ctx, "query info by fuzzy failed", "query_key", input, "error", err)
logger.Error(ctx, "query grid key by redis fuzzy search failed", "query_key", searchInput, "error", err)
return []string{}, false, err
}
if len(results) == 0 {
// TODO 考虑使用其他方式代替 for 循环退一字节的查询方式
searchInput = searchInput[:len(searchInput)-1]
inputLen = len(searchInput)
searchInputLen = len(searchInput)
continue
}
var recommends []string
for _, result := range results {
termSlice := strings.Split(result.Term, ".")
if len(termSlice) <= originInputLen {
if len(termSlice) <= inputSliceLen {
recommends = append(recommends, result.Term)
}
}
// 返回模糊查询结果
// return fuzzy search results
return recommends, true, nil
}
// 处理 input 不为空、不含有.并且 input 是一个完整的 grid key 的情况
if strings.HasSuffix(input, ".") == false {
recommend := input + "."
return []string{recommend}, false, nil
case 2:
// zone search
zoneSearchInput := inputSlice[1]
if zoneSearchInput == "" {
specificalGrid := inputSlice[0]
allZones, isFuzzy, err := getKeyBySpecificsLevel(ctx, rdb, inputSliceLen, specificalGrid)
recommandResults := combineQueryResultByInput(inputSliceLen, inputSlice, allZones)
return recommandResults, isFuzzy, err
}
zoneExists, err := rdb.SIsMember(ctx, constants.RedisAllZoneSetKey, zoneSearchInput).Result()
if err != nil {
logger.Error(ctx, "check zone key exist failed ", "zone_key", zoneSearchInput, "error", err)
return []string{}, false, err
}
if zoneExists {
return []string{"."}, false, err
}
// start zone fuzzy search
searchInput := zoneSearchInput
searchInputLen := len(searchInput)
for searchInputLen != 0 && !zoneExists {
results, err := ac.SuggestOpts(searchInput, redisearch.SuggestOptions{
Num: math.MaxInt16,
Fuzzy: true,
WithScores: false,
WithPayloads: false,
})
if err != nil {
logger.Error(ctx, "query zone key by redis fuzzy search failed", "query_key", searchInput, "error", err)
return []string{}, false, err
}
if len(results) == 0 {
// TODO 考虑使用其他方式代替 for 循环退一字节的查询方式
searchInput = searchInput[:len(searchInput)-1]
searchInputLen = len(searchInput)
continue
}
var recommends []string
for _, result := range results {
termSlice := strings.Split(result.Term, ".")
if len(termSlice) <= inputSliceLen {
recommends = append(recommends, result.Term)
}
}
// return fuzzy search results
return combineQueryResultByInput(inputSliceLen, inputSlice, recommends), true, nil
}
case 3:
// station search
stationSearchInput := inputSlice[2]
fmt.Println(stationSearchInput)
if stationSearchInput == "" {
specificalZone := inputSlice[1]
allStations, isFuzzy, err := getKeyBySpecificsLevel(ctx, rdb, inputSliceLen, specificalZone)
recommandResults := combineQueryResultByInput(inputSliceLen, inputSlice, allStations)
return recommandResults, isFuzzy, err
}
stationExists, err := rdb.SIsMember(ctx, constants.RedisAllStationSetKey, stationSearchInput).Result()
if err != nil {
logger.Error(ctx, "check station key exist failed ", "station_key", stationSearchInput, "error", err)
return []string{}, false, err
}
if stationExists {
return []string{"."}, false, err
}
// start grid fuzzy search
searchInput := stationSearchInput
searchInputLen := len(searchInput)
for searchInputLen != 0 && !stationExists {
results, err := ac.SuggestOpts(searchInput, redisearch.SuggestOptions{
Num: math.MaxInt16,
Fuzzy: true,
WithScores: false,
WithPayloads: false,
})
if err != nil {
logger.Error(ctx, "query station key by redis fuzzy search failed", "query_key", searchInput, "error", err)
return []string{}, false, err
}
if len(results) == 0 {
// TODO 考虑使用其他方式代替 for 循环退一字节的查询方式
searchInput = searchInput[:len(searchInput)-1]
searchInputLen = len(searchInput)
continue
}
var recommends []string
for _, result := range results {
termSlice := strings.Split(result.Term, ".")
if len(termSlice) <= inputSliceLen {
recommends = append(recommends, result.Term)
}
}
// return fuzzy search results
return combineQueryResultByInput(inputSliceLen, inputSlice, recommends), true, nil
}
case 4:
// component nspath search
componentSearchInput := inputSlice[3]
if componentSearchInput == "" {
specificalStation := inputSlice[1]
allComponents, isFuzzy, err := getKeyBySpecificsLevel(ctx, rdb, inputSliceLen, specificalStation)
recommandResults := combineQueryResultByInput(inputSliceLen, inputSlice, allComponents)
return recommandResults, isFuzzy, err
}
componentExists, err := rdb.SIsMember(ctx, constants.RedisAllStationSetKey, componentSearchInput).Result()
if err != nil {
logger.Error(ctx, "check component key exist failed ", "component_key", componentSearchInput, "error", err)
return []string{}, false, err
}
if componentExists {
return []string{"."}, false, err
}
// start grid fuzzy search
searchInput := componentSearchInput
searchInputLen := len(searchInput)
for searchInputLen != 0 && !componentExists {
results, err := ac.SuggestOpts(searchInput, redisearch.SuggestOptions{
Num: math.MaxInt16,
Fuzzy: true,
WithScores: false,
WithPayloads: false,
})
if err != nil {
logger.Error(ctx, "query station key by redis fuzzy search failed", "query_key", searchInput, "error", err)
return []string{}, false, err
}
if len(results) == 0 {
// TODO 考虑使用其他方式代替 for 循环退一字节的查询方式
searchInput = searchInput[:len(searchInput)-1]
searchInputLen = len(searchInput)
continue
}
var recommends []string
for _, result := range results {
termSlice := strings.Split(result.Term, ".")
if len(termSlice) <= inputSliceLen {
recommends = append(recommends, result.Term)
}
}
// return fuzzy search results
return combineQueryResultByInput(inputSliceLen, inputSlice, recommends), true, nil
}
case 5:
// component tag search
compTagSearchInput := inputSlice[4]
fmt.Println(compTagSearchInput)
case 6:
// configuration search
configSearchInput := inputSlice[5]
fmt.Println(configSearchInput)
case 7:
// measurement search
measSearchInput := inputSlice[6]
fmt.Println(measSearchInput)
default:
lastInput := inputSlice[inputSliceLen-1]
// 判断 queryKey 是否是空值空值则返回上一级别下的所有key
@ -141,31 +310,37 @@ func RedisSearchRecommend(ctx context.Context, input string) ([]string, bool, er
return []string{}, false, nil
}
func getAllGridKeys(ctx context.Context, setKey string) ([]string, bool, error) {
// 从redis set 中获取所有的 grid key
gridSets := diagram.NewRedisSet(ctx, setKey, 10, true)
keys, err := gridSets.SMembers("grid_keys")
func getKeyBySpecificsLevel(ctx context.Context, rdb *redis.Client, inputLen int, input string) ([]string, bool, error) {
queryKey := getSpecificKeyByLength(inputLen, input)
results, err := rdb.SMembers(ctx, queryKey).Result()
if err != nil {
return []string{}, false, fmt.Errorf("get all root keys failed, error: %v", err)
}
return keys, false, nil
}
func getSpecificZoneKeys(ctx context.Context, input string) ([]string, bool, error) {
setKey := fmt.Sprintf(constants.RedisSpecGridZoneSetKey, input)
zoneSets := diagram.NewRedisSet(ctx, setKey, 10, true)
keys, err := zoneSets.SMembers(setKey)
if err != nil {
return []string{}, false, fmt.Errorf("get all root keys failed, error: %v", err)
}
var results []string
for _, key := range keys {
result := input + "." + key
results = append(results, result)
return []string{}, false, fmt.Errorf("get all root keys failed, error: %w", err)
}
return results, false, nil
}
func combineQueryResultByInput(inputSliceLen int, inputSlice []string, queryResults []string) []string {
prefixs := make([]string, 0, len(inputSlice))
recommandResults := make([]string, 0, len(queryResults))
switch inputSliceLen {
case 2:
prefixs = []string{inputSlice[0]}
case 3:
prefixs = inputSlice[0:2]
default:
return []string{}
}
for _, queryResult := range queryResults {
combineStrs := make([]string, 0, len(inputSlice))
combineStrs = append(combineStrs, prefixs...)
combineStrs = append(combineStrs, queryResult)
recommandResult := strings.Join(combineStrs, ".")
recommandResults = append(recommandResults, recommandResult)
}
return recommandResults
}
func getConstantsKeyByLength(inputLen int) string {
switch inputLen {
case 1:
@ -181,6 +356,21 @@ func getConstantsKeyByLength(inputLen int) string {
}
}
func getSpecificKeyByLength(inputLen int, input string) string {
switch inputLen {
case 1:
return constants.RedisAllGridSetKey
case 2:
return fmt.Sprintf(constants.RedisSpecGridZoneSetKey, input)
case 3:
return fmt.Sprintf(constants.RedisSpecZoneStationSetKey, input)
case 4:
return fmt.Sprintf(constants.RedisSpecStationComponentSetKey, input)
default:
return constants.RedisAllGridSetKey
}
}
func getCombinedConstantsKeyByLength(key string, inputLen int) string {
switch inputLen {
case 2: