From d3b1f0afbe4b67e787eabbb78278d5752f103e00 Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 14 Jan 2026 17:32:01 +0800 Subject: [PATCH] add code of send all target removed system signal in real time data pull api and fix bug of component attribute query api --- constants/resp_code.go | 3 ++ constants/subscription_business_code.go | 11 +++++ database/query_project_manager.go | 8 ++-- deploy/deploy.md | 6 +-- handler/component_attribute_query.go | 4 +- handler/component_attribute_update.go | 4 +- handler/real_time_data_pull.go | 58 ++++++++++++++++++------- orm/project_manager.go | 2 +- router/component.go | 3 +- router/router.go | 2 +- 10 files changed, 71 insertions(+), 30 deletions(-) diff --git a/constants/resp_code.go b/constants/resp_code.go index 2fa4d4d..3fca4fb 100644 --- a/constants/resp_code.go +++ b/constants/resp_code.go @@ -5,6 +5,9 @@ const ( // RespCodeSuccess define constant to indicates that the API was processed success RespCodeSuccess = 2000 + // RespCodeSuccessWithNoSub define constant to ndicates that the request was processed successfully, with all subscriptions removed for the given client_id. + RespCodeSuccessWithNoSub = 2101 + // RespCodeFailed define constant to indicates that the API was processed failed RespCodeFailed = 3000 diff --git a/constants/subscription_business_code.go b/constants/subscription_business_code.go index 80e0308..4b202bb 100644 --- a/constants/subscription_business_code.go +++ b/constants/subscription_business_code.go @@ -35,6 +35,17 @@ const ( UpdateSubFailedCode = "1009" ) +const ( + // SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine + SysCtrlPrefix = "SYS_CTRL_" + + // SysCtrlAllRemoved define to indicates that all active polling targets have been removed for the current client, and no further data streams are active + SysCtrlAllRemoved = "SYS_CTRL_ALL_REMOVED" + + // SysCtrlSessionExpired define to indicates reserved for indicating that the current websocket session has timed out or is no longer valid + SysCtrlSessionExpired = "SYS_CTRL_SESSION_EXPIRED" +) + const ( // SubSuccessMsg define subscription success message SubSuccessMsg = "subscription success" diff --git a/database/query_project_manager.go b/database/query_project_manager.go index 016ffa3..7dbcc33 100644 --- a/database/query_project_manager.go +++ b/database/query_project_manager.go @@ -58,11 +58,11 @@ func BatchGetProjectNames(db *gorm.DB, identifiers []orm.ProjectIdentifier) (map var projects []orm.ProjectManager queryArgs := make([][]any, len(identifiers)) for i, id := range identifiers { - queryArgs[i] = []any{id.Tag, id.MetaModel} + queryArgs[i] = []any{id.Tag, id.GroupName} } - err := db.Select("tag", "meta_model", "name"). - Where("(tag, meta_model) IN ?", queryArgs). + err := db.Select("tag", "group_name", "name"). + Where("(tag, group_name) IN ?", queryArgs). Find(&projects).Error if err != nil { return nil, err @@ -70,7 +70,7 @@ func BatchGetProjectNames(db *gorm.DB, identifiers []orm.ProjectIdentifier) (map resultMap := make(map[orm.ProjectIdentifier]string) for _, p := range projects { - key := orm.ProjectIdentifier{Tag: p.Tag, MetaModel: p.MetaModel} + key := orm.ProjectIdentifier{Tag: p.Tag, GroupName: p.GroupName} resultMap[key] = p.Name } diff --git a/deploy/deploy.md b/deploy/deploy.md index fd0506d..3364838 100644 --- a/deploy/deploy.md +++ b/deploy/deploy.md @@ -133,7 +133,7 @@ INSERT INTO public.component (global_uuid, nspath, tag, name, model_name, descri VALUES ( '968dd6e6-faec-4f78-b58a-d6e68426b09e', - 'ns1', 'tag1', 'component1', '', '', + 'ns1', 'tag1', 'component1', 'bus_1', '', 'grid1', 'zone1', 'station1', 1, -1, false, @@ -146,7 +146,7 @@ VALUES ), ( '968dd6e6-faec-4f78-b58a-d6e68426b08e', - 'ns2', 'tag2', 'component2', '', '', + 'ns2', 'tag2', 'component2', 'bus_1', '', 'grid1', 'zone1', 'station1', 1, -1, false, @@ -159,7 +159,7 @@ VALUES ), ( '968dd6e6-faec-4f78-b58a-d6e88426b09e', - 'ns3', 'tag3', 'component3', '', '', + 'ns3', 'tag3', 'component3', 'bus_1', '', 'grid1', 'zone1', 'station2', 2, -1, false, diff --git a/handler/component_attribute_query.go b/handler/component_attribute_query.go index 49ac3d1..f6296f4 100644 --- a/handler/component_attribute_query.go +++ b/handler/component_attribute_query.go @@ -109,7 +109,7 @@ func ComponentAttributeQueryHandler(c *gin.Context) { items[i].attributeModelName = comp.ModelName items[i].globalUUID = comp.GlobalUUID identifiers = append(identifiers, orm.ProjectIdentifier{ - Token: items[i].token, Tag: comp.ModelName, MetaModel: items[i].attributeExtendType, + Token: items[i].token, Tag: comp.ModelName, GroupName: items[i].attributeExtendType, }) } } @@ -130,7 +130,7 @@ func ComponentAttributeQueryHandler(c *gin.Context) { continue } - tbl, ok := tableNameMap[orm.ProjectIdentifier{Tag: item.attributeModelName, MetaModel: item.attributeExtendType}] + tbl, ok := tableNameMap[orm.ProjectIdentifier{Tag: item.attributeModelName, GroupName: item.attributeExtendType}] if !ok { queryResults[item.token] = queryResult{err: errcode.ErrFoundTargetFailed} continue diff --git a/handler/component_attribute_update.go b/handler/component_attribute_update.go index 90fdc1a..585f5ad 100644 --- a/handler/component_attribute_update.go +++ b/handler/component_attribute_update.go @@ -83,7 +83,7 @@ func ComponentAttributeUpdateHandler(c *gin.Context) { identifiers[i] = orm.ProjectIdentifier{ Token: mod.attributeToken, Tag: compInfo.ModelName, - MetaModel: mod.attributeExtendType, + GroupName: mod.attributeExtendType, } } tableNameMap, err := database.BatchGetProjectNames(tx, identifiers) @@ -103,7 +103,7 @@ func ComponentAttributeUpdateHandler(c *gin.Context) { redisUpdateMap := make(map[string][]cacheUpdateItem) for _, mod := range attriModifyConfs { - id := orm.ProjectIdentifier{Tag: compInfo.ModelName, MetaModel: mod.attributeExtendType} + id := orm.ProjectIdentifier{Tag: compInfo.ModelName, GroupName: mod.attributeExtendType} tableName, exists := tableNameMap[id] if !exists { updateResults[mod.attributeToken] = errcode.ErrFoundTargetFailed diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index f8fe232..30c009d 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -64,7 +64,7 @@ func PullRealTimeDataHandler(c *gin.Context) { fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize) sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize) - go processTargetPolling(ctx, globalSubState, clientID, fanInChan) + go processTargetPolling(ctx, globalSubState, clientID, fanInChan, sendChan) go readClientMessages(ctx, conn, clientID, cancel) go sendDataStream(ctx, conn, clientID, sendChan, cancel) defer close(sendChan) @@ -166,27 +166,34 @@ func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) { logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID) - for { - select { - case targetsData, ok := <-sendChan: - if !ok { - logger.Info(ctx, "send channel closed, sender goroutine exiting", "client_id", clientID) - return - } - if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil { - logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err) + for targetsData := range sendChan { + // TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展 + if len(targetsData) == 1 && targetsData[0].ID == constants.SysCtrlAllRemoved { + err := conn.WriteJSON(map[string]any{ + "code": 2101, + "msg": "all targets removed in given client_id", + "payload": map[string]int{ + "active_targets_count": 0, + }, + }) + if err != nil { + logger.Error(ctx, "send all targets removed system signal failed", "client_id", clientID, "error", err) cancel() - return } - case <-ctx.Done(): - logger.Info(ctx, "sender goroutine exiting as context is done", "client_id", clientID) + continue + } + + if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil { + logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err) + cancel() return } } + logger.Info(ctx, "sender goroutine exiting as channel is closed", "client_id", clientID) } // processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target -func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget) { +func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- []network.RealTimePullTarget) { // ensure the fanInChan will not leak defer close(fanInChan) logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID)) @@ -251,7 +258,7 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin case constants.OpAppend: appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) case constants.OpRemove: - removeTargets(ctx, stopChanMap, transportTargets.Targets) + removeTargets(ctx, stopChanMap, transportTargets.Targets, sendChan) case constants.OpUpdate: updateTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) } @@ -377,7 +384,7 @@ func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m } // removeTargets define func to stops running polling goroutines for targets that were removed -func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string) { +func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- []network.RealTimePullTarget) { for _, target := range removeTargets { stopChan, exists := stopChanMap[target] if !exists { @@ -389,6 +396,25 @@ func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, re delete(stopChanMap, target) logger.Info(ctx, "stopped polling goroutine for removed target", "target", target) } + + if len(stopChanMap) == 0 { + logger.Info(ctx, "all polling goroutines have been stopped for this client") + sendSpecialStatusToClient(ctx, sendChan) + } +} + +func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- []network.RealTimePullTarget) { + specialTarget := network.RealTimePullTarget{ + ID: constants.SysCtrlAllRemoved, + Datas: []network.RealTimePullData{}, + } + + select { + case sendChan <- []network.RealTimePullTarget{specialTarget}: + logger.Info(ctx, "sent 2101 status request to sendChan") + default: + logger.Warn(ctx, "sendChan is full, skipping 2101 status message") + } } // stopAllPolling stops all running query goroutines for a specific client diff --git a/orm/project_manager.go b/orm/project_manager.go index aed567d..450f094 100644 --- a/orm/project_manager.go +++ b/orm/project_manager.go @@ -22,5 +22,5 @@ func (p *ProjectManager) TableName() string { type ProjectIdentifier struct { Token string Tag string - MetaModel string + GroupName string } diff --git a/router/component.go b/router/component.go index cf7d12f..8cd4c78 100644 --- a/router/component.go +++ b/router/component.go @@ -8,8 +8,9 @@ import ( ) // registerComponentRoutes define func of register component routes -func registerComponentRoutes(rg *gin.RouterGroup) { +func registerComponentRoutes(rg *gin.RouterGroup, middlewares ...gin.HandlerFunc) { g := rg.Group("/component/") + g.Use(middlewares...) g.GET("attribute/get/:tokens", handler.ComponentAttributeQueryHandler) g.POST("attribute/update", handler.ComponentAttributeUpdateHandler) } diff --git a/router/router.go b/router/router.go index f785202..6fbc113 100644 --- a/router/router.go +++ b/router/router.go @@ -26,5 +26,5 @@ func RegisterRoutes(engine *gin.Engine, clientToken string) { registerMeasurementRoutes(routeGroup, middleware.SetTokenMiddleware(clientToken), measurementLimiter.Middleware) registerDataRoutes(routeGroup) registerMonitorRoutes(routeGroup) - registerComponentRoutes(routeGroup) + registerComponentRoutes(routeGroup, middleware.SetTokenMiddleware(clientToken)) }