optimize real time data pulling and subscription api

This commit is contained in:
douxu 2025-11-27 16:59:03 +08:00
parent 6f3134b5e9
commit fca6905d74
5 changed files with 107 additions and 59 deletions

View File

@ -16,10 +16,16 @@ const (
SubSuccessCode = "1001" SubSuccessCode = "1001"
// SubSuccessCode define subscription failed code // SubSuccessCode define subscription failed code
SubFailedCode = "1002" SubFailedCode = "1002"
// RTDSuccessCode define real time data resturn success code
RTDSuccessCode = "1003"
// SubSuccessCode define real time data resturn failed code
RTDFailedCode = "1004"
// CancelSubSuccessCode define cancel subscription success code // CancelSubSuccessCode define cancel subscription success code
CancelSubSuccessCode = "1005" CancelSubSuccessCode = "1005"
// CancelSubFailedCode define cancel subscription failed code // CancelSubFailedCode define cancel subscription failed code
CancelSubFailedCode = "1006" CancelSubFailedCode = "1006"
// SubRepeatCode define subscription repeat code
SubRepeatCode = "1007"
) )
const ( const (
@ -27,10 +33,16 @@ const (
SubSuccessMsg = "subscription success" SubSuccessMsg = "subscription success"
// SubFailedMsg define subscription failed message // SubFailedMsg define subscription failed message
SubFailedMsg = "subscription failed" SubFailedMsg = "subscription failed"
// RTDSuccessMsg define real time data resturn success message
RTDSuccessMsg = "real time data return success"
// RTDFailedMsg define real time data resturn failed message
RTDFailedMsg = "real time data return failed"
// CancelSubSuccessMsg define cancel subscription success message // CancelSubSuccessMsg define cancel subscription success message
CancelSubSuccessMsg = "cancel subscription success" CancelSubSuccessMsg = "cancel subscription success"
// CancelSubFailedMsg define cancel subscription failed message // CancelSubFailedMsg define cancel subscription failed message
CancelSubFailedMsg = "cancel subscription failed" CancelSubFailedMsg = "cancel subscription failed"
// SubRepeatMsg define subscription repeat message
SubRepeatMsg = "subscription repeat in target interval"
) )
// TargetOperationType define constant to the target operation type // TargetOperationType define constant to the target operation type

View File

@ -166,9 +166,7 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin
} }
s.globalMutex.RUnlock() s.globalMutex.RUnlock()
// TODO 测试log logger.Info(ctx, fmt.Sprintf("found subscription config for clientID:%s, start initial polling goroutines", clientID), "components len", config.measurements)
fmt.Printf("found subscription config for clientID:%s, start initial polling goroutines, config: %+v\n", clientID, config.components)
logger.Info(ctx, fmt.Sprintf("found subscription config for clientID:%s, start initial polling goroutines", clientID), "components len", config.components)
config.mutex.RLock() config.mutex.RLock()
for interval, measurementTargets := range config.measurements { for interval, measurementTargets := range config.measurements {
@ -219,7 +217,7 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin
case constants.OpAppend: case constants.OpAppend:
appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets)
case constants.OpRemove: case constants.OpRemove:
removeTargets(ctx, config, stopChanMap, transportTargets.Targets) removeTargets(ctx, stopChanMap, transportTargets.Targets)
} }
config.mutex.Unlock() config.mutex.Unlock()
case <-ctx.Done(): case <-ctx.Done():
@ -239,13 +237,13 @@ func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
for _, target := range appendTargets { for _, target := range appendTargets {
targetContext, exists := config.targetContext[target] targetContext, exists := config.targetContext[target]
if exists { if !exists {
logger.Warn(ctx, "the append target already exists in the real time data fetch process,skipping the startup step", "target", target) logger.Error(ctx, "the append target does not exists in the real time data config context map,skipping the startup step", "target", target)
continue continue
} }
if _, exists := stopChanMap[target]; exists { if _, exists := stopChanMap[target]; exists {
logger.Warn(ctx, "the append target already has a stop channel, skipping the startup step", "target", target) logger.Error(ctx, "the append target already has a stop channel, skipping the startup step", "target", target)
continue continue
} }
@ -286,14 +284,8 @@ func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
} }
// removeTargets define func to stops running polling goroutines for targets that were removed // removeTargets define func to stops running polling goroutines for targets that were removed
func removeTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, removeTargets []string) { func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string) {
for _, target := range removeTargets { for _, target := range removeTargets {
targetContext, exist := config.targetContext[target]
if !exist {
logger.Warn(ctx, "removeTarget does not exist in targetContext map, skipping remove operation", "target", target)
continue
}
stopChan, exists := stopChanMap[target] stopChan, exists := stopChanMap[target]
if !exists { if !exists {
logger.Warn(ctx, "removeTarget was not running, skipping remove operation", "target", target) logger.Warn(ctx, "removeTarget was not running, skipping remove operation", "target", target)
@ -302,17 +294,6 @@ func removeTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
close(stopChan) close(stopChan)
delete(stopChanMap, target) delete(stopChanMap, target)
delete(config.targetContext, target)
interval := targetContext.interval
measurementTargets, mesExist := config.measurements[interval]
if !mesExist {
logger.Warn(ctx, "targetContext exists but measurements is missing, cannot perform remove operation", "interval", interval, "target", target)
continue
}
measurementTargets = util.RemoveTargetsFromSliceSimple(measurementTargets, []string{target})
config.measurements[interval] = measurementTargets
logger.Info(ctx, "stopped polling goroutine for removed target", "target", target) logger.Info(ctx, "stopped polling goroutine for removed target", "target", target)
} }
} }
@ -336,7 +317,6 @@ type redisPollingConfig struct {
} }
func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig, fanInChan chan network.RealTimePullTarget, stopChan chan struct{}) { func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig, fanInChan chan network.RealTimePullTarget, stopChan chan struct{}) {
// TODO 测试log后续可删除
logger.Info(ctx, "start a redis query goroutine for real time data pulling", "targetID", config.targetID, "queryKey", config.queryKey, "interval", config.interval, "dataSize", config.dataSize) logger.Info(ctx, "start a redis query goroutine for real time data pulling", "targetID", config.targetID, "queryKey", config.queryKey, "interval", config.interval, "dataSize", config.dataSize)
duration, err := time.ParseDuration(config.interval) duration, err := time.ParseDuration(config.interval)
if err != nil { if err != nil {
@ -347,11 +327,6 @@ func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig,
defer ticker.Stop() defer ticker.Stop()
client := diagram.NewRedisClient() client := diagram.NewRedisClient()
startTimestamp := util.GenNanoTsStr()
fmt.Printf("realTimeDataQueryFromRedis duration:%+v\n:", duration)
fmt.Printf("realTimeDataQueryFromRedis ticker:%+v\n:", ticker)
fmt.Printf("realTimeDataQueryFromRedis startTimestamp:%s\n", startTimestamp)
needPerformQuery := true needPerformQuery := true
for { for {
if needPerformQuery { if needPerformQuery {

View File

@ -13,6 +13,7 @@ import (
"modelRT/logger" "modelRT/logger"
"modelRT/network" "modelRT/network"
"modelRT/orm" "modelRT/orm"
"modelRT/util"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gofrs/uuid" "github.com/gofrs/uuid"
@ -90,8 +91,6 @@ func RealTimeSubHandler(c *gin.Context) {
} }
if request.Action == constants.SubStartAction && request.ClientID == "" { if request.Action == constants.SubStartAction && request.ClientID == "" {
// TODO 调试输出,待删除
fmt.Println("00000")
subAction = request.Action subAction = request.Action
id, err := uuid.NewV4() id, err := uuid.NewV4()
if err != nil { if err != nil {
@ -104,8 +103,6 @@ func RealTimeSubHandler(c *gin.Context) {
} }
clientID = id.String() clientID = id.String()
} else if request.Action == constants.SubStartAction && request.ClientID != "" { } else if request.Action == constants.SubStartAction && request.ClientID != "" {
// TODO 调试输出,待删除
fmt.Println("11111")
subAction = constants.SubAppendAction subAction = constants.SubAppendAction
clientID = request.ClientID clientID = request.ClientID
} else if request.Action == constants.SubStopAction && request.ClientID != "" { } else if request.Action == constants.SubStopAction && request.ClientID != "" {
@ -168,8 +165,6 @@ func RealTimeSubHandler(c *gin.Context) {
}) })
return return
case constants.SubAppendAction: case constants.SubAppendAction:
// TODO 调试输出,待删除
fmt.Println("22222")
results, err := globalSubState.AppendTargets(c, tx, clientID, request.Measurements) results, err := globalSubState.AppendTargets(c, tx, clientID, request.Measurements)
if err != nil { if err != nil {
logger.Error(c, "append target to real time data subscription config failed", "error", err) logger.Error(c, "append target to real time data subscription config failed", "error", err)
@ -277,6 +272,7 @@ func processAndValidateTargets(ctx context.Context, tx *gorm.DB, measurements []
meas := newMeasMap[interval] meas := newMeasMap[interval]
meas = append(meas, target) meas = append(meas, target)
newMeasMap[interval] = meas
newMeasContextMap[target] = &TargetPollingContext{ newMeasContextMap[target] = &TargetPollingContext{
interval: interval, interval: interval,
measurement: targetModel.GetMeasurementInfo(), measurement: targetModel.GetMeasurementInfo(),
@ -287,23 +283,34 @@ func processAndValidateTargets(ctx context.Context, tx *gorm.DB, measurements []
} }
// mergeMeasurements define func to merge newMeasurementsMap into existingMeasurementsMap // mergeMeasurements define func to merge newMeasurementsMap into existingMeasurementsMap
func mergeMeasurements(config *RealTimeSubConfig, newMeasurements map[string][]string, newMeasurementsContextMap map[string]*TargetPollingContext) { func mergeMeasurements(config *RealTimeSubConfig, newMeasurements map[string][]string, newMeasurementsContextMap map[string]*TargetPollingContext) []string {
allDuplicates := make([]string, 0)
for interval, newMeas := range newMeasurements { for interval, newMeas := range newMeasurements {
if existingMeas, ok := config.measurements[interval]; ok { if existingMeas, ok := config.measurements[interval]; ok {
existingMeas = append(existingMeas, newMeas...) // deduplication operations prevent duplicate subscriptions to the same measurement node
deduplicated, duplicates := util.DeduplicateAndReportDuplicates(existingMeas, newMeas)
if len(duplicates) > 0 {
for _, duplicate := range duplicates {
delete(newMeasurementsContextMap, duplicate)
}
allDuplicates = append(allDuplicates, duplicates...)
}
if len(deduplicated) > 0 {
existingMeas = append(existingMeas, deduplicated...)
config.measurements[interval] = existingMeas config.measurements[interval] = existingMeas
maps.Copy(config.targetContext, newMeasurementsContextMap) maps.Copy(config.targetContext, newMeasurementsContextMap)
} else {
config.measurements[interval] = newMeas
} }
} }
} }
return allDuplicates
}
// CreateConfig define function to create config in SharedSubState // CreateConfig define function to create config in SharedSubState
func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) {
requestTargetsCount := processRealTimeRequestCount(components) requestTargetsCount := processRealTimeRequestCount(measurements)
targetProcessResults, _, newMeasurementsMap, MeasurementInfos := processAndValidateTargets(ctx, tx, components, requestTargetsCount) targetProcessResults, _, newMeasurementsMap, measurementContexts := processAndValidateTargets(ctx, tx, measurements, requestTargetsCount)
fmt.Println(MeasurementInfos)
s.globalMutex.Lock() s.globalMutex.Lock()
if _, exist := s.subMap[clientID]; exist { if _, exist := s.subMap[clientID]; exist {
s.globalMutex.Unlock() s.globalMutex.Unlock()
@ -315,7 +322,7 @@ func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID
config := &RealTimeSubConfig{ config := &RealTimeSubConfig{
noticeChan: make(chan *transportTargets, constants.NoticeChanCap), noticeChan: make(chan *transportTargets, constants.NoticeChanCap),
measurements: newMeasurementsMap, measurements: newMeasurementsMap,
targetContext: MeasurementInfos, targetContext: measurementContexts,
} }
s.subMap[clientID] = config s.subMap[clientID] = config
s.globalMutex.Unlock() s.globalMutex.Unlock()
@ -323,8 +330,8 @@ func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID
} }
// AppendTargets define function to append targets in SharedSubState // AppendTargets define function to append targets in SharedSubState
func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) {
requestTargetsCount := processRealTimeRequestCount(components) requestTargetsCount := processRealTimeRequestCount(measurements)
s.globalMutex.RLock() s.globalMutex.RLock()
config, exist := s.subMap[clientID] config, exist := s.subMap[clientID]
@ -333,13 +340,18 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI
if !exist { if !exist {
err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID) err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID)
logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err) logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err)
return processRealTimeRequestTargets(components, requestTargetsCount, err), err return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
} }
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargets(ctx, tx, components, requestTargetsCount) targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargets(ctx, tx, measurements, requestTargetsCount)
config.mutex.Lock() config.mutex.Lock()
mergeMeasurements(config, newMeasMap, newMeasContextMap) allDuplicates := mergeMeasurements(config, newMeasMap, newMeasContextMap)
if len(allDuplicates) > 0 {
logger.Warn(ctx, "some targets are duplicate and have been ignored in append operation", "clientID", clientID, "duplicates", allDuplicates)
// process repeat target in targetProcessResults and successfulTargets
targetProcessResults, successfulTargets = filterAndDeduplicateRepeatTargets(targetProcessResults, successfulTargets, allDuplicates)
}
config.mutex.Unlock() config.mutex.Unlock()
if len(successfulTargets) > 0 { if len(successfulTargets) > 0 {
@ -353,6 +365,29 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI
return targetProcessResults, nil return targetProcessResults, nil
} }
func filterAndDeduplicateRepeatTargets(resultsSlice []network.TargetResult, idsSlice []string, duplicates []string) ([]network.TargetResult, []string) {
filteredIDs := make([]string, 0, len(idsSlice))
set := make(map[string]struct{}, len(duplicates))
for _, duplicate := range duplicates {
set[duplicate] = struct{}{}
}
for index := range resultsSlice {
if _, isTarget := set[resultsSlice[index].ID]; isTarget {
resultsSlice[index].Code = constants.SubRepeatCode
resultsSlice[index].Msg = constants.SubRepeatMsg
}
}
for _, id := range idsSlice {
if _, isTarget := set[id]; !isTarget {
filteredIDs = append(filteredIDs, id)
}
}
return resultsSlice, filteredIDs
}
// UpsertTargets define function to upsert targets in SharedSubState // UpsertTargets define function to upsert targets in SharedSubState
func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) {
requestTargetsCount := processRealTimeRequestCount(measurements) requestTargetsCount := processRealTimeRequestCount(measurements)
@ -506,18 +541,18 @@ func (s *SharedSubState) Get(clientID string) (*RealTimeSubConfig, bool) {
// TODO 增加一个update 函数用来更新 interval // TODO 增加一个update 函数用来更新 interval
func processRealTimeRequestCount(components []network.RealTimeMeasurementItem) int { func processRealTimeRequestCount(measurements []network.RealTimeMeasurementItem) int {
totalTargetsCount := 0 totalTargetsCount := 0
for _, compItem := range components { for _, compItem := range measurements {
totalTargetsCount += len(compItem.Targets) totalTargetsCount += len(compItem.Targets)
} }
return totalTargetsCount return totalTargetsCount
} }
func processRealTimeRequestTargets(components []network.RealTimeMeasurementItem, targetCount int, err error) []network.TargetResult { func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, err error) []network.TargetResult {
targetProcessResults := make([]network.TargetResult, 0, targetCount) targetProcessResults := make([]network.TargetResult, 0, targetCount)
for _, componentItem := range components { for _, measurementItem := range measurements {
for _, target := range componentItem.Targets { for _, target := range measurementItem.Targets {
var targetResult network.TargetResult var targetResult network.TargetResult
targetResult.ID = target targetResult.ID = target
targetResult.Code = constants.SubFailedCode targetResult.Code = constants.SubFailedCode

View File

@ -31,6 +31,6 @@ type TargetResult struct {
// RealTimeSubPayload define struct of real time data subscription request // RealTimeSubPayload define struct of real time data subscription request
type RealTimeSubPayload struct { type RealTimeSubPayload struct {
ClientID string `json:"monitor_id" example:"5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" description:"用于标识不同client的监控请求ID"` ClientID string `json:"client_id" example:"5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" description:"用于标识不同client的监控请求ID"`
TargetResults []TargetResult `json:"targets"` TargetResults []TargetResult `json:"targets"`
} }

View File

@ -16,3 +16,29 @@ func RemoveTargetsFromSliceSimple(targetsSlice []string, targetsToRemove []strin
} }
return targetsSlice return targetsSlice
} }
// SliceToSet define func to convert string slice to set
func SliceToSet(targetsSlice []string) map[string]struct{} {
set := make(map[string]struct{}, len(targetsSlice))
for _, target := range targetsSlice {
set[target] = struct{}{}
}
return set
}
// DeduplicateAndReportDuplicates define func to deduplicate a slice of strings and report duplicates
func DeduplicateAndReportDuplicates(targetsSlice []string, sourceSlice []string) (deduplicated []string, duplicates []string) {
targetSet := SliceToSet(targetsSlice)
deduplicated = make([]string, 0, len(sourceSlice))
// duplicate items slice
duplicates = make([]string, 0, len(sourceSlice))
for _, source := range sourceSlice {
if _, found := targetSet[source]; found {
duplicates = append(duplicates, source)
continue
}
deduplicated = append(deduplicated, source)
}
return deduplicated, duplicates
}