extracting duplicate code snippets to form a common function

This commit is contained in:
douxu 2025-11-11 17:37:06 +08:00
parent 93d1eea61f
commit a82e02126d
3 changed files with 242 additions and 174 deletions

View File

@ -7,7 +7,7 @@ import (
"github.com/redis/go-redis/v9"
)
// RedisClient define struct to create redis client
// RedisClient define struct to accessing redis data that does not require the use of distributed locks
type RedisClient struct {
Client *redis.Client
}

View File

@ -127,14 +127,16 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s
defer close(fanInChan)
stopChanMap := make(map[string]chan struct{})
s.mutex.RLock()
s.globalMutex.RLock()
config, confExist := s.monitorMap[clientID]
if !confExist {
logger.Error(ctx, "can not found config into local stored map by clientID", "clientID", clientID)
s.globalMutex.RUnlock()
return
}
s.globalMutex.RUnlock()
config.mutex.RLock()
for interval, componentItems := range config.components {
for _, target := range componentItems.targets {
// add a secondary check to prevent the target from already existing in the stopChanMap
@ -158,16 +160,16 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s
continue
}
config := redisPollingConfig{
pollingConfig := redisPollingConfig{
targetID: target,
queryKey: queryKey,
interval: interval,
dataSize: int64(measurement.Size),
}
go realTimeDataQueryFromRedis(ctx, config, fanInChan, queryGStopChan)
go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan)
}
}
s.mutex.RUnlock()
config.mutex.RUnlock()
for {
select {
@ -177,19 +179,14 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s
stopAllPolling(ctx, stopChanMap)
return
}
config.mutex.Lock()
switch transportTargets.OperationType {
case constants.OpAppend:
// TODO 考虑精细化锁结果将RW锁置于ClientID层面之下
s.mutex.Lock()
defer s.mutex.Unlock()
// TODO 增加 append 函数调用
fmt.Println(transportTargets.Targets)
appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets)
case constants.OpRemove:
s.mutex.Lock()
defer s.mutex.Unlock()
// TODO 增加 remove 函数调用
fmt.Println(transportTargets.Targets)
removeTargets(ctx, stopChanMap, transportTargets.Targets)
}
config.mutex.Unlock()
case <-ctx.Done():
logger.Info(ctx, fmt.Sprintf("stop all data retrieval goroutines under this clientID:%s", clientID))
stopAllPolling(ctx, stopChanMap)
@ -198,6 +195,75 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s
}
}
// appendTargets starts new polling goroutines for targets that were just added
func appendTargets(ctx context.Context, config *RealTimeMonitorConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, appendTargets []string) {
targetSet := make(map[string]struct{}, len(appendTargets))
for _, target := range appendTargets {
targetSet[target] = struct{}{}
}
for interval, componentItems := range config.components {
for _, target := range componentItems.targets {
if _, needsToAdd := targetSet[target]; !needsToAdd {
continue
}
if _, exists := stopChanMap[target]; exists {
logger.Warn(ctx, "append target already running, skipping", "target", target)
continue
}
measurement, exist := componentItems.targetParam[target]
if !exist {
logger.Error(ctx, "append target can not find measurement params for new target", "target", target)
continue
}
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
if err != nil {
logger.Error(ctx, "append target generate measurement identifier failed", "target", target, "error", err)
continue
}
pollingConfig := redisPollingConfig{
targetID: target,
queryKey: queryKey,
interval: interval,
dataSize: int64(measurement.Size),
}
queryGStopChan := make(chan struct{})
stopChanMap[target] = queryGStopChan
go realTimeDataQueryFromRedis(ctx, pollingConfig, fanInChan, queryGStopChan)
logger.Info(ctx, "started new polling goroutine for appended target", "target", target, "interval", interval)
delete(targetSet, target)
}
}
for target := range targetSet {
logger.Error(ctx, "append target: failed to find config for target, goroutine not started", "target", target)
}
}
// removeTargets define func to stops running polling goroutines for targets that were removed
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, targetsToRemove []string) {
for _, target := range targetsToRemove {
stopChan, exists := stopChanMap[target]
if !exists {
logger.Warn(ctx, "removeTarget was not running, skipping", "target", target)
continue
}
close(stopChan)
delete(stopChanMap, target)
logger.Info(ctx, "stopped polling goroutine for removed target", "target", target)
}
}
// stopAllPolling stops all running query goroutines for a specific client
func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) {
for target, stopChan := range stopChanMap {
logger.Info(ctx, fmt.Sprintf("stop the data fetching behavior for the corresponding target:%s", target))

View File

@ -4,6 +4,7 @@ package handler
import (
"context"
"fmt"
"maps"
"net/http"
"sync"
@ -77,7 +78,7 @@ func init() {
func RealTimeSubHandler(c *gin.Context) {
var request network.RealTimeSubRequest
var monitorAction string
var monitorID string
var clientID string
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(c, "failed to unmarshal real time query request", "error", err)
@ -99,13 +100,13 @@ func RealTimeSubHandler(c *gin.Context) {
})
return
}
monitorID = id.String()
clientID = id.String()
} else if request.Action == constants.MonitorStartAction && request.ClientID != "" {
monitorAction = constants.MonitorAppendAction
monitorID = request.ClientID
clientID = request.ClientID
} else if request.Action == constants.MonitorStopAction && request.ClientID != "" {
monitorAction = request.Action
monitorID = request.ClientID
clientID = request.ClientID
}
pgClient := database.GetPostgresDBClient()
@ -115,14 +116,14 @@ func RealTimeSubHandler(c *gin.Context) {
switch monitorAction {
case constants.MonitorStartAction:
results, err := globalMonitorState.CreateConfig(c, tx, monitorID, request.Components)
results, err := globalMonitorState.CreateConfig(c, tx, clientID, request.Components)
if err != nil {
logger.Error(c, "create real time data monitor config failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
PayLoad: network.RealTimeSubPayload{
ClientID: monitorID,
ClientID: clientID,
TargetResults: results,
},
})
@ -133,20 +134,20 @@ func RealTimeSubHandler(c *gin.Context) {
Code: http.StatusOK,
Msg: "success",
PayLoad: network.RealTimeSubPayload{
ClientID: monitorID,
ClientID: clientID,
TargetResults: results,
},
})
return
case constants.MonitorStopAction:
results, err := globalMonitorState.RemoveTargets(c, monitorID, request.Components)
results, err := globalMonitorState.RemoveTargets(c, clientID, request.Components)
if err != nil {
logger.Error(c, "remove target to real time data monitor config failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
PayLoad: network.RealTimeSubPayload{
ClientID: monitorID,
ClientID: clientID,
TargetResults: results,
},
})
@ -157,20 +158,20 @@ func RealTimeSubHandler(c *gin.Context) {
Code: http.StatusOK,
Msg: "success",
PayLoad: network.RealTimeSubPayload{
ClientID: monitorID,
ClientID: clientID,
TargetResults: results,
},
})
return
case constants.MonitorAppendAction:
results, err := globalMonitorState.AppendTargets(c, tx, monitorID, request.Components)
results, err := globalMonitorState.AppendTargets(c, tx, clientID, request.Components)
if err != nil {
logger.Error(c, "append target to real time data monitor config failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
PayLoad: network.RealTimeSubPayload{
ClientID: monitorID,
ClientID: clientID,
TargetResults: results,
},
})
@ -181,7 +182,7 @@ func RealTimeSubHandler(c *gin.Context) {
Code: http.StatusOK,
Msg: "success",
PayLoad: network.RealTimeSubPayload{
ClientID: monitorID,
ClientID: clientID,
TargetResults: results,
},
})
@ -195,7 +196,7 @@ func RealTimeSubHandler(c *gin.Context) {
Code: http.StatusBadRequest,
Msg: err.Error(),
PayLoad: network.RealTimeSubPayload{
ClientID: monitorID,
ClientID: clientID,
TargetResults: results,
},
})
@ -212,13 +213,14 @@ type RealTimeMonitorComponent struct {
// RealTimeMonitorConfig define struct of real time monitor config
type RealTimeMonitorConfig struct {
noticeChan chan *transportTargets
mutex sync.RWMutex
components map[string]*RealTimeMonitorComponent
}
// SharedMonitorState define struct of shared monitor state with mutex
type SharedMonitorState struct {
monitorMap map[string]*RealTimeMonitorConfig
mutex sync.RWMutex
globalMutex sync.RWMutex
}
// NewSharedMonitorState define function to create new SharedMonitorState
@ -228,31 +230,22 @@ func NewSharedMonitorState() *SharedMonitorState {
}
}
// CreateConfig define function to create config in SharedMonitorState
func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
requestTargetsCount := processRealTimeRequestCount(components)
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
if _, exist := s.monitorMap[monitorID]; exist {
err := fmt.Errorf("monitorID %s already exists. Use AppendTargets to modify existing config", monitorID)
logger.Error(ctx, "monitorID already exists. Use AppendTargets to modify existing config", "error", err)
return processRealTimeRequestTargets(components, requestTargetsCount, err), err
}
config := &RealTimeMonitorConfig{
noticeChan: make(chan *transportTargets),
components: make(map[string]*RealTimeMonitorComponent),
}
// processAndValidateTargets define func to perform all database I/O operations in a lock-free state (eg,ParseDataIdentifierToken)
func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []network.RealTimeComponentItem, allReqTargetNum int) (
[]network.TargetResult,
map[string]*RealTimeMonitorComponent,
[]string,
) {
targetProcessResults := make([]network.TargetResult, 0, allReqTargetNum)
newComponentsMap := make(map[string]*RealTimeMonitorComponent)
successfulTargets := make([]string, 0, allReqTargetNum)
for _, componentItem := range components {
interval := componentItem.Interval
for _, target := range componentItem.Targets {
var targetResult network.TargetResult
targetResult.ID = target
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
@ -261,48 +254,94 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni
targetProcessResults = append(targetProcessResults, targetResult)
continue
}
targetResult.Code = constants.SubSuccessCode
targetResult.Msg = constants.SubSuccessMsg
targetProcessResults = append(targetProcessResults, targetResult)
if comp, ok := config.components[interval]; !ok {
targets := make([]string, 0, len(componentItem.Targets))
targetParam := make(map[string]*orm.Measurement)
targetParam[target] = targetModel.GetMeasurementInfo()
config.components[interval] = &RealTimeMonitorComponent{
targets: append(targets, target),
targetParam: targetParam,
successfulTargets = append(successfulTargets, target)
if _, ok := newComponentsMap[interval]; !ok {
newComponentsMap[interval] = &RealTimeMonitorComponent{
targets: make([]string, 0, len(componentItem.Targets)),
targetParam: make(map[string]*orm.Measurement),
}
} else {
}
comp := newComponentsMap[interval]
comp.targets = append(comp.targets, target)
comp.targetParam[target] = targetModel.GetMeasurementInfo()
}
}
return targetProcessResults, newComponentsMap, successfulTargets
}
s.monitorMap[monitorID] = config
// mergeComponents define func to merge newComponentsMap into existingComponentsMap
func mergeComponents(existingComponents map[string]*RealTimeMonitorComponent, newComponents map[string]*RealTimeMonitorComponent) {
for interval, newComp := range newComponents {
if existingComp, ok := existingComponents[interval]; ok {
existingComp.targets = append(existingComp.targets, newComp.targets...)
maps.Copy(existingComp.targetParam, newComp.targetParam)
} else {
existingComponents[interval] = newComp
}
}
}
// CreateConfig define function to create config in SharedMonitorState
func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
requestTargetsCount := processRealTimeRequestCount(components)
targetProcessResults, newComponentsMap, _ := processAndValidateTargets(ctx, tx, components, requestTargetsCount)
s.globalMutex.Lock()
if _, exist := s.monitorMap[clientID]; exist {
s.globalMutex.Unlock()
err := fmt.Errorf("clientID %s already exists. use AppendTargets to modify existing config", clientID)
logger.Error(ctx, "clientID already exists. use AppendTargets to modify existing config", "error", err)
return targetProcessResults, err
}
config := &RealTimeMonitorConfig{
noticeChan: make(chan *transportTargets),
components: newComponentsMap, // 直接使用预构建的 Map
}
s.monitorMap[clientID] = config
s.globalMutex.Unlock()
return targetProcessResults, nil
}
// AppendTargets define function to append targets in SharedMonitorState
func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
requestTargetsCount := processRealTimeRequestCount(components)
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
config, exist := s.monitorMap[monitorID]
s.globalMutex.RLock()
config, exist := s.monitorMap[clientID]
if !exist {
err := fmt.Errorf("monitorID %s not found. Use CreateConfig to start a new config", monitorID)
logger.Error(ctx, "monitorID not found. Use CreateConfig to start a new config", "error", err)
s.globalMutex.RUnlock()
err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID)
logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err)
return processRealTimeRequestTargets(components, requestTargetsCount, err), err
}
s.globalMutex.RUnlock()
targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount)
config.mutex.Lock()
mergeComponents(config.components, newComponentsMap)
config.mutex.Unlock()
if len(successfulTargets) > 0 {
transportTargets := &transportTargets{
OperationType: constants.OpAppend,
Targets: successfulTargets,
}
config.noticeChan <- transportTargets
}
transportTargets := &transportTargets{
OperationType: constants.OpAppend,
Targets: make([]string, requestTargetsCount),
}
config.mutex.Lock()
for _, componentItem := range components {
interval := componentItem.Interval
for _, target := range componentItem.Targets {
@ -335,141 +374,84 @@ func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, mon
transportTargets.Targets = append(transportTargets.Targets, target)
}
}
config.mutex.Unlock()
config.noticeChan <- transportTargets
return targetProcessResults, nil
}
// UpsertTargets define function to upsert targets in SharedMonitorState
func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
requestTargetsCount := processRealTimeRequestCount(components)
targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount)
config, exist := s.monitorMap[monitorID]
if !exist {
// create new config
s.globalMutex.RLock()
config, exist := s.monitorMap[clientID]
s.globalMutex.RUnlock()
var opType constants.TargetOperationType
if exist {
opType = constants.OpUpdate
config.mutex.Lock()
mergeComponents(config.components, newComponentsMap)
config.mutex.Unlock()
} else {
opType = constants.OpAppend
s.globalMutex.Lock()
if config, exist = s.monitorMap[clientID]; !exist {
config = &RealTimeMonitorConfig{
noticeChan: make(chan *transportTargets),
components: make(map[string]*RealTimeMonitorComponent),
}
targetProcessResults := make([]network.TargetResult, 0, processRealTimeRequestCount(components))
for _, componentItem := range components {
interval := componentItem.Interval
for _, target := range componentItem.Targets {
var targetResult network.TargetResult
targetResult.ID = target
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
targetResult.Code = constants.SubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
}
targetResult.Code = constants.SubSuccessCode
targetResult.Msg = constants.SubSuccessMsg
if comp, ok := config.components[interval]; !ok {
targets := make([]string, 0, len(componentItem.Targets))
targetParam := make(map[string]*orm.Measurement)
targetParam[target] = targetModel.GetMeasurementInfo()
config.components[interval] = &RealTimeMonitorComponent{
targets: append(targets, target),
components: newComponentsMap,
}
s.monitorMap[clientID] = config
} else {
comp.targets = append(comp.targets, target)
comp.targetParam[target] = targetModel.GetMeasurementInfo()
s.globalMutex.Unlock()
config.mutex.Lock()
mergeComponents(config.components, newComponentsMap)
config.mutex.Unlock()
}
}
}
s.monitorMap[monitorID] = config
return targetProcessResults, nil
s.globalMutex.Unlock()
}
requestTargetsCount := processRealTimeRequestCount(components)
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
if len(successfulTargets) > 0 {
transportTargets := &transportTargets{
OperationType: constants.OpUpdate,
Targets: make([]string, requestTargetsCount),
OperationType: opType,
Targets: successfulTargets,
}
for _, componentItem := range components {
interval := componentItem.Interval
for _, target := range componentItem.Targets {
var targetResult network.TargetResult
targetResult.ID = target
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
targetResult.Code = constants.SubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
}
targetResult.Code = constants.SubSuccessCode
targetResult.Msg = constants.SubSuccessMsg
targetProcessResults = append(targetProcessResults, targetResult)
if comp, ok := config.components[interval]; !ok {
targets := make([]string, 0, len(componentItem.Targets))
targetParam := make(map[string]*orm.Measurement)
targetParam[target] = targetModel.GetMeasurementInfo()
config.components[interval] = &RealTimeMonitorComponent{
targets: append(targets, target),
}
} else {
comp.targets = append(comp.targets, target)
comp.targetParam[target] = targetModel.GetMeasurementInfo()
}
transportTargets.Targets = append(transportTargets.Targets, target)
}
}
config.noticeChan <- transportTargets
}
return targetProcessResults, nil
}
// Get define function to get subscriptions config from SharedMonitorState
func (s *SharedMonitorState) Get(clientID string) (*RealTimeMonitorConfig, bool) {
s.mutex.RLock()
defer s.mutex.RUnlock()
config, ok := s.monitorMap[clientID]
if !ok {
return nil, false
}
return config, true
}
// RemoveTargets define function to remove targets in SharedMonitorState
func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (s *SharedMonitorState) RemoveTargets(ctx context.Context, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
requestTargetsCount := processRealTimeRequestCount(components)
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
config, exist := s.monitorMap[monitorID]
s.globalMutex.RLock()
config, exist := s.monitorMap[clientID]
if !exist {
err := fmt.Errorf("monitorID %s not found", monitorID)
logger.Error(ctx, "monitorID not found in remove targets operation", "error", err)
s.globalMutex.RUnlock()
err := fmt.Errorf("clientID %s not found", clientID)
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
return processRealTimeRequestTargets(components, requestTargetsCount, err), err
}
s.globalMutex.RUnlock()
var shouldRemoveClient bool
// components is the list of items to be removed passed in the request
transportTargets := &transportTargets{
OperationType: constants.OpRemove,
Targets: make([]string, requestTargetsCount),
Targets: make([]string, 0, requestTargetsCount),
}
config.mutex.Lock()
for _, compent := range components {
interval := compent.Interval
// comp is the locally running listener configuration
comp, compExist := config.components[interval]
if !compExist {
logger.Error(ctx, fmt.Sprintf("component with interval %s not found under monitorID %s", interval, monitorID), "monitorID", monitorID, "interval", interval)
for _, target := range comp.targets {
logger.Error(ctx, fmt.Sprintf("component with interval %s not found under clientID %s", interval, clientID), "clientID", clientID, "interval", interval)
for _, target := range compent.Targets {
targetResult := network.TargetResult{
ID: target,
Code: constants.CancelSubFailedCode,
@ -508,11 +490,11 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string
}
if len(config.components) == 0 {
delete(s.monitorMap, monitorID)
shouldRemoveClient = true
}
if len(targetsToRemoveMap) > 0 {
err := fmt.Errorf("target remove were not found under monitorID %s and interval %s", monitorID, interval)
err := fmt.Errorf("target remove were not found under clientID %s and interval %s", clientID, interval)
for target := range targetsToRemoveMap {
targetResult := network.TargetResult{
ID: target,
@ -523,12 +505,32 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string
}
}
}
config.mutex.Unlock()
// pass the removed subscription configuration to the notice channel
config.noticeChan <- transportTargets
if shouldRemoveClient {
s.globalMutex.Lock()
if currentConfig, exist := s.monitorMap[clientID]; exist && len(currentConfig.components) == 0 {
delete(s.monitorMap, clientID)
}
s.globalMutex.Unlock()
}
return targetProcessResults, nil
}
// Get define function to get subscriptions config from SharedMonitorState
func (s *SharedMonitorState) Get(clientID string) (*RealTimeMonitorConfig, bool) {
s.globalMutex.RLock()
defer s.globalMutex.RUnlock()
config, ok := s.monitorMap[clientID]
if !ok {
return nil, false
}
return config, true
}
// TODO 增加一个update 函数用来更新 interval
func processRealTimeRequestCount(components []network.RealTimeComponentItem) int {