// Package handler provides HTTP handlers for various endpoints. package handler import ( "context" "fmt" "maps" "net/http" "sync" "modelRT/constants" "modelRT/database" "modelRT/logger" "modelRT/network" "modelRT/orm" "github.com/gin-gonic/gin" "github.com/gofrs/uuid" "gorm.io/gorm" ) var globalMonitorState *SharedMonitorState func init() { globalMonitorState = NewSharedMonitorState() } // RealTimeSubHandler define real time data subscriptions process API // @Summary 开始或结束订阅实时数据 // @Description 根据用户输入的组件token,从 modelRT 服务中开始或结束对于实时数据的监控 // @Tags RealTime Component // @Accept json // @Produce json // @Param request body network.MeasurementRecommendRequest true "查询输入参数,例如 'trans' 或 'transformfeeder1_220.'" // @Success 200 {object} network.SuccessResponse{payload=network.RealTimeMonitorPayload} "订阅实时数据结果列表" // // @Example 200 { // "code": 200, // "msg": "success", // "payload": { // "targets": [ // { // "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", // "code": "1001", // "msg": "subscription success" // }, // { // "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms", // "code": "1002", // "msg": "subscription failed" // } // ] // } // } // // @Failure 400 {object} network.FailureResponse{payload=network.RealTimeMonitorPayload} "订阅实时数据结果列表" // // @Example 400 { // "code": 400, // "msg": "failed to get recommend data from redis", // "payload": { // "targets": [ // { // "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", // "code": "1002", // "msg": "subscription failed" // }, // { // "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms", // "code": "1002", // "msg": "subscription failed" // } // ] // } // } // // @Router /monitors/data/subscriptions [post] func RealTimeSubHandler(c *gin.Context) { var request network.RealTimeSubRequest var monitorAction string var clientID string if err := c.ShouldBindJSON(&request); err != nil { logger.Error(c, "failed to unmarshal real time query request", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), }) return } if request.Action == constants.MonitorStartAction && request.ClientID == "" { monitorAction = request.Action id, err := uuid.NewV4() if err != nil { logger.Error(c, "failed to generate monitor id", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), }) return } clientID = id.String() } else if request.Action == constants.MonitorStartAction && request.ClientID != "" { monitorAction = constants.MonitorAppendAction clientID = request.ClientID } else if request.Action == constants.MonitorStopAction && request.ClientID != "" { monitorAction = request.Action clientID = request.ClientID } pgClient := database.GetPostgresDBClient() // open transaction tx := pgClient.Begin() defer tx.Commit() switch monitorAction { case constants.MonitorStartAction: results, err := globalMonitorState.CreateConfig(c, tx, clientID, request.Components) if err != nil { logger.Error(c, "create real time data monitor config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, }) return } c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, }) return case constants.MonitorStopAction: results, err := globalMonitorState.RemoveTargets(c, clientID, request.Components) if err != nil { logger.Error(c, "remove target to real time data monitor config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, }) return } c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, }) return case constants.MonitorAppendAction: results, err := globalMonitorState.AppendTargets(c, tx, clientID, request.Components) if err != nil { logger.Error(c, "append target to real time data monitor config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, }) return } c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, }) return default: err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedAction, request.Action) logger.Error(c, "unsupported action of real time data monitor request", "error", err) requestTargetsCount := processRealTimeRequestCount(request.Components) results := processRealTimeRequestTargets(request.Components, requestTargetsCount, err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, }) return } } // RealTimeMonitorComponent define struct of real time monitor component type RealTimeMonitorComponent struct { targets []string targetParam map[string]*orm.Measurement } // RealTimeMonitorConfig define struct of real time monitor config type RealTimeMonitorConfig struct { noticeChan chan *transportTargets mutex sync.RWMutex components map[string]*RealTimeMonitorComponent } // SharedMonitorState define struct of shared monitor state with mutex type SharedMonitorState struct { monitorMap map[string]*RealTimeMonitorConfig globalMutex sync.RWMutex } // NewSharedMonitorState define function to create new SharedMonitorState func NewSharedMonitorState() *SharedMonitorState { return &SharedMonitorState{ monitorMap: make(map[string]*RealTimeMonitorConfig), } } // processAndValidateTargets define func to perform all database I/O operations in a lock-free state (eg,ParseDataIdentifierToken) func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []network.RealTimeComponentItem, allReqTargetNum int) ( []network.TargetResult, map[string]*RealTimeMonitorComponent, []string, ) { targetProcessResults := make([]network.TargetResult, 0, allReqTargetNum) newComponentsMap := make(map[string]*RealTimeMonitorComponent) successfulTargets := make([]string, 0, allReqTargetNum) for _, componentItem := range components { interval := componentItem.Interval for _, target := range componentItem.Targets { var targetResult network.TargetResult targetResult.ID = target targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) if err != nil { logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) targetResult.Code = constants.SubFailedCode targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) targetProcessResults = append(targetProcessResults, targetResult) continue } targetResult.Code = constants.SubSuccessCode targetResult.Msg = constants.SubSuccessMsg targetProcessResults = append(targetProcessResults, targetResult) successfulTargets = append(successfulTargets, target) if _, ok := newComponentsMap[interval]; !ok { newComponentsMap[interval] = &RealTimeMonitorComponent{ targets: make([]string, 0, len(componentItem.Targets)), targetParam: make(map[string]*orm.Measurement), } } comp := newComponentsMap[interval] comp.targets = append(comp.targets, target) comp.targetParam[target] = targetModel.GetMeasurementInfo() } } return targetProcessResults, newComponentsMap, successfulTargets } // mergeComponents define func to merge newComponentsMap into existingComponentsMap func mergeComponents(existingComponents map[string]*RealTimeMonitorComponent, newComponents map[string]*RealTimeMonitorComponent) { for interval, newComp := range newComponents { if existingComp, ok := existingComponents[interval]; ok { existingComp.targets = append(existingComp.targets, newComp.targets...) maps.Copy(existingComp.targetParam, newComp.targetParam) } else { existingComponents[interval] = newComp } } } // CreateConfig define function to create config in SharedMonitorState func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) targetProcessResults, newComponentsMap, _ := processAndValidateTargets(ctx, tx, components, requestTargetsCount) s.globalMutex.Lock() if _, exist := s.monitorMap[clientID]; exist { s.globalMutex.Unlock() err := fmt.Errorf("clientID %s already exists. use AppendTargets to modify existing config", clientID) logger.Error(ctx, "clientID already exists. use AppendTargets to modify existing config", "error", err) return targetProcessResults, err } config := &RealTimeMonitorConfig{ noticeChan: make(chan *transportTargets), components: newComponentsMap, // 直接使用预构建的 Map } s.monitorMap[clientID] = config s.globalMutex.Unlock() return targetProcessResults, nil } // AppendTargets define function to append targets in SharedMonitorState func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) s.globalMutex.RLock() config, exist := s.monitorMap[clientID] if !exist { s.globalMutex.RUnlock() err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID) logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err) return processRealTimeRequestTargets(components, requestTargetsCount, err), err } s.globalMutex.RUnlock() targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount) config.mutex.Lock() mergeComponents(config.components, newComponentsMap) config.mutex.Unlock() if len(successfulTargets) > 0 { transportTargets := &transportTargets{ OperationType: constants.OpAppend, Targets: successfulTargets, } config.noticeChan <- transportTargets } transportTargets := &transportTargets{ OperationType: constants.OpAppend, Targets: make([]string, requestTargetsCount), } config.mutex.Lock() for _, componentItem := range components { interval := componentItem.Interval for _, target := range componentItem.Targets { var targetResult network.TargetResult targetResult.ID = target targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) if err != nil { logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) targetResult.Code = constants.SubFailedCode targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) targetProcessResults = append(targetProcessResults, targetResult) continue } targetResult.Code = constants.SubSuccessCode targetResult.Msg = constants.SubSuccessMsg targetProcessResults = append(targetProcessResults, targetResult) if comp, ok := config.components[interval]; !ok { targets := make([]string, 0, len(componentItem.Targets)) targetParam := make(map[string]*orm.Measurement) targetParam[target] = targetModel.GetMeasurementInfo() config.components[interval] = &RealTimeMonitorComponent{ targets: append(targets, target), } } else { comp.targets = append(comp.targets, target) comp.targetParam[target] = targetModel.GetMeasurementInfo() } transportTargets.Targets = append(transportTargets.Targets, target) } } config.mutex.Unlock() config.noticeChan <- transportTargets return targetProcessResults, nil } // UpsertTargets define function to upsert targets in SharedMonitorState func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount) s.globalMutex.RLock() config, exist := s.monitorMap[clientID] s.globalMutex.RUnlock() var opType constants.TargetOperationType if exist { opType = constants.OpUpdate config.mutex.Lock() mergeComponents(config.components, newComponentsMap) config.mutex.Unlock() } else { opType = constants.OpAppend s.globalMutex.Lock() if config, exist = s.monitorMap[clientID]; !exist { config = &RealTimeMonitorConfig{ noticeChan: make(chan *transportTargets), components: newComponentsMap, } s.monitorMap[clientID] = config } else { s.globalMutex.Unlock() config.mutex.Lock() mergeComponents(config.components, newComponentsMap) config.mutex.Unlock() } s.globalMutex.Unlock() } if len(successfulTargets) > 0 { transportTargets := &transportTargets{ OperationType: opType, Targets: successfulTargets, } config.noticeChan <- transportTargets } return targetProcessResults, nil } // RemoveTargets define function to remove targets in SharedMonitorState func (s *SharedMonitorState) RemoveTargets(ctx context.Context, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) s.globalMutex.RLock() config, exist := s.monitorMap[clientID] if !exist { s.globalMutex.RUnlock() err := fmt.Errorf("clientID %s not found", clientID) logger.Error(ctx, "clientID not found in remove targets operation", "error", err) return processRealTimeRequestTargets(components, requestTargetsCount, err), err } s.globalMutex.RUnlock() var shouldRemoveClient bool // components is the list of items to be removed passed in the request transportTargets := &transportTargets{ OperationType: constants.OpRemove, Targets: make([]string, 0, requestTargetsCount), } config.mutex.Lock() for _, compent := range components { interval := compent.Interval // comp is the locally running listener configuration comp, compExist := config.components[interval] if !compExist { logger.Error(ctx, fmt.Sprintf("component with interval %s not found under clientID %s", interval, clientID), "clientID", clientID, "interval", interval) for _, target := range compent.Targets { targetResult := network.TargetResult{ ID: target, Code: constants.CancelSubFailedCode, Msg: constants.CancelSubFailedMsg, } targetProcessResults = append(targetProcessResults, targetResult) } continue } targetsToRemoveMap := make(map[string]struct{}) for _, target := range compent.Targets { targetsToRemoveMap[target] = struct{}{} } var newTargets []string for _, existingTarget := range comp.targets { if _, found := targetsToRemoveMap[existingTarget]; !found { newTargets = append(newTargets, existingTarget) } else { transportTargets.Targets = append(transportTargets.Targets, existingTarget) targetResult := network.TargetResult{ ID: existingTarget, Code: constants.CancelSubSuccessCode, Msg: constants.CancelSubSuccessMsg, } targetProcessResults = append(targetProcessResults, targetResult) delete(targetsToRemoveMap, existingTarget) delete(comp.targetParam, existingTarget) } } comp.targets = newTargets if len(comp.targets) == 0 { delete(config.components, interval) } if len(config.components) == 0 { shouldRemoveClient = true } if len(targetsToRemoveMap) > 0 { err := fmt.Errorf("target remove were not found under clientID %s and interval %s", clientID, interval) for target := range targetsToRemoveMap { targetResult := network.TargetResult{ ID: target, Code: constants.CancelSubFailedCode, Msg: fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()), } targetProcessResults = append(targetProcessResults, targetResult) } } } config.mutex.Unlock() // pass the removed subscription configuration to the notice channel config.noticeChan <- transportTargets if shouldRemoveClient { s.globalMutex.Lock() if currentConfig, exist := s.monitorMap[clientID]; exist && len(currentConfig.components) == 0 { delete(s.monitorMap, clientID) } s.globalMutex.Unlock() } return targetProcessResults, nil } // Get define function to get subscriptions config from SharedMonitorState func (s *SharedMonitorState) Get(clientID string) (*RealTimeMonitorConfig, bool) { s.globalMutex.RLock() defer s.globalMutex.RUnlock() config, ok := s.monitorMap[clientID] if !ok { return nil, false } return config, true } // TODO 增加一个update 函数用来更新 interval func processRealTimeRequestCount(components []network.RealTimeComponentItem) int { totalTargetsCount := 0 for _, compItem := range components { totalTargetsCount += len(compItem.Targets) } return totalTargetsCount } func processRealTimeRequestTargets(components []network.RealTimeComponentItem, targetCount int, err error) []network.TargetResult { targetProcessResults := make([]network.TargetResult, 0, targetCount) for _, componentItem := range components { for _, target := range componentItem.Targets { var targetResult network.TargetResult targetResult.ID = target targetResult.Code = constants.SubFailedCode targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) targetProcessResults = append(targetProcessResults, targetResult) } } return targetProcessResults } // transportTargets define struct to transport update or remove target to real // time pull api type transportTargets struct { OperationType constants.TargetOperationType Targets []string }