add code of send all target removed system signal in real time data pull api and fix bug of component attribute query api

This commit is contained in:
douxu 2026-01-14 17:32:01 +08:00
parent cf880279e4
commit d3b1f0afbe
10 changed files with 71 additions and 30 deletions

View File

@ -5,6 +5,9 @@ const (
// RespCodeSuccess define constant to indicates that the API was processed success // RespCodeSuccess define constant to indicates that the API was processed success
RespCodeSuccess = 2000 RespCodeSuccess = 2000
// RespCodeSuccessWithNoSub define constant to ndicates that the request was processed successfully, with all subscriptions removed for the given client_id.
RespCodeSuccessWithNoSub = 2101
// RespCodeFailed define constant to indicates that the API was processed failed // RespCodeFailed define constant to indicates that the API was processed failed
RespCodeFailed = 3000 RespCodeFailed = 3000

View File

@ -35,6 +35,17 @@ const (
UpdateSubFailedCode = "1009" UpdateSubFailedCode = "1009"
) )
const (
// SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine
SysCtrlPrefix = "SYS_CTRL_"
// SysCtrlAllRemoved define to indicates that all active polling targets have been removed for the current client, and no further data streams are active
SysCtrlAllRemoved = "SYS_CTRL_ALL_REMOVED"
// SysCtrlSessionExpired define to indicates reserved for indicating that the current websocket session has timed out or is no longer valid
SysCtrlSessionExpired = "SYS_CTRL_SESSION_EXPIRED"
)
const ( const (
// SubSuccessMsg define subscription success message // SubSuccessMsg define subscription success message
SubSuccessMsg = "subscription success" SubSuccessMsg = "subscription success"

View File

@ -58,11 +58,11 @@ func BatchGetProjectNames(db *gorm.DB, identifiers []orm.ProjectIdentifier) (map
var projects []orm.ProjectManager var projects []orm.ProjectManager
queryArgs := make([][]any, len(identifiers)) queryArgs := make([][]any, len(identifiers))
for i, id := range identifiers { for i, id := range identifiers {
queryArgs[i] = []any{id.Tag, id.MetaModel} queryArgs[i] = []any{id.Tag, id.GroupName}
} }
err := db.Select("tag", "meta_model", "name"). err := db.Select("tag", "group_name", "name").
Where("(tag, meta_model) IN ?", queryArgs). Where("(tag, group_name) IN ?", queryArgs).
Find(&projects).Error Find(&projects).Error
if err != nil { if err != nil {
return nil, err return nil, err
@ -70,7 +70,7 @@ func BatchGetProjectNames(db *gorm.DB, identifiers []orm.ProjectIdentifier) (map
resultMap := make(map[orm.ProjectIdentifier]string) resultMap := make(map[orm.ProjectIdentifier]string)
for _, p := range projects { for _, p := range projects {
key := orm.ProjectIdentifier{Tag: p.Tag, MetaModel: p.MetaModel} key := orm.ProjectIdentifier{Tag: p.Tag, GroupName: p.GroupName}
resultMap[key] = p.Name resultMap[key] = p.Name
} }

View File

@ -133,7 +133,7 @@ INSERT INTO public.component (global_uuid, nspath, tag, name, model_name, descri
VALUES VALUES
( (
'968dd6e6-faec-4f78-b58a-d6e68426b09e', '968dd6e6-faec-4f78-b58a-d6e68426b09e',
'ns1', 'tag1', 'component1', '', '', 'ns1', 'tag1', 'component1', 'bus_1', '',
'grid1', 'zone1', 'station1', 1, 'grid1', 'zone1', 'station1', 1,
-1, -1,
false, false,
@ -146,7 +146,7 @@ VALUES
), ),
( (
'968dd6e6-faec-4f78-b58a-d6e68426b08e', '968dd6e6-faec-4f78-b58a-d6e68426b08e',
'ns2', 'tag2', 'component2', '', '', 'ns2', 'tag2', 'component2', 'bus_1', '',
'grid1', 'zone1', 'station1', 1, 'grid1', 'zone1', 'station1', 1,
-1, -1,
false, false,
@ -159,7 +159,7 @@ VALUES
), ),
( (
'968dd6e6-faec-4f78-b58a-d6e88426b09e', '968dd6e6-faec-4f78-b58a-d6e88426b09e',
'ns3', 'tag3', 'component3', '', '', 'ns3', 'tag3', 'component3', 'bus_1', '',
'grid1', 'zone1', 'station2', 2, 'grid1', 'zone1', 'station2', 2,
-1, -1,
false, false,

View File

@ -109,7 +109,7 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
items[i].attributeModelName = comp.ModelName items[i].attributeModelName = comp.ModelName
items[i].globalUUID = comp.GlobalUUID items[i].globalUUID = comp.GlobalUUID
identifiers = append(identifiers, orm.ProjectIdentifier{ identifiers = append(identifiers, orm.ProjectIdentifier{
Token: items[i].token, Tag: comp.ModelName, MetaModel: items[i].attributeExtendType, Token: items[i].token, Tag: comp.ModelName, GroupName: items[i].attributeExtendType,
}) })
} }
} }
@ -130,7 +130,7 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
continue continue
} }
tbl, ok := tableNameMap[orm.ProjectIdentifier{Tag: item.attributeModelName, MetaModel: item.attributeExtendType}] tbl, ok := tableNameMap[orm.ProjectIdentifier{Tag: item.attributeModelName, GroupName: item.attributeExtendType}]
if !ok { if !ok {
queryResults[item.token] = queryResult{err: errcode.ErrFoundTargetFailed} queryResults[item.token] = queryResult{err: errcode.ErrFoundTargetFailed}
continue continue

View File

@ -83,7 +83,7 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
identifiers[i] = orm.ProjectIdentifier{ identifiers[i] = orm.ProjectIdentifier{
Token: mod.attributeToken, Token: mod.attributeToken,
Tag: compInfo.ModelName, Tag: compInfo.ModelName,
MetaModel: mod.attributeExtendType, GroupName: mod.attributeExtendType,
} }
} }
tableNameMap, err := database.BatchGetProjectNames(tx, identifiers) tableNameMap, err := database.BatchGetProjectNames(tx, identifiers)
@ -103,7 +103,7 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
redisUpdateMap := make(map[string][]cacheUpdateItem) redisUpdateMap := make(map[string][]cacheUpdateItem)
for _, mod := range attriModifyConfs { for _, mod := range attriModifyConfs {
id := orm.ProjectIdentifier{Tag: compInfo.ModelName, MetaModel: mod.attributeExtendType} id := orm.ProjectIdentifier{Tag: compInfo.ModelName, GroupName: mod.attributeExtendType}
tableName, exists := tableNameMap[id] tableName, exists := tableNameMap[id]
if !exists { if !exists {
updateResults[mod.attributeToken] = errcode.ErrFoundTargetFailed updateResults[mod.attributeToken] = errcode.ErrFoundTargetFailed

View File

@ -64,7 +64,7 @@ func PullRealTimeDataHandler(c *gin.Context) {
fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize) fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize)
sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize) sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize)
go processTargetPolling(ctx, globalSubState, clientID, fanInChan) go processTargetPolling(ctx, globalSubState, clientID, fanInChan, sendChan)
go readClientMessages(ctx, conn, clientID, cancel) go readClientMessages(ctx, conn, clientID, cancel)
go sendDataStream(ctx, conn, clientID, sendChan, cancel) go sendDataStream(ctx, conn, clientID, sendChan, cancel)
defer close(sendChan) defer close(sendChan)
@ -166,27 +166,34 @@ func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) { func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) {
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID) logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
for { for targetsData := range sendChan {
select { // TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
case targetsData, ok := <-sendChan: if len(targetsData) == 1 && targetsData[0].ID == constants.SysCtrlAllRemoved {
if !ok { err := conn.WriteJSON(map[string]any{
logger.Info(ctx, "send channel closed, sender goroutine exiting", "client_id", clientID) "code": 2101,
return "msg": "all targets removed in given client_id",
} "payload": map[string]int{
if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil { "active_targets_count": 0,
logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err) },
})
if err != nil {
logger.Error(ctx, "send all targets removed system signal failed", "client_id", clientID, "error", err)
cancel() cancel()
return
} }
case <-ctx.Done(): continue
logger.Info(ctx, "sender goroutine exiting as context is done", "client_id", clientID) }
if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil {
logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err)
cancel()
return return
} }
} }
logger.Info(ctx, "sender goroutine exiting as channel is closed", "client_id", clientID)
} }
// processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target // processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget) { func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- []network.RealTimePullTarget) {
// ensure the fanInChan will not leak // ensure the fanInChan will not leak
defer close(fanInChan) defer close(fanInChan)
logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID)) logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID))
@ -251,7 +258,7 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin
case constants.OpAppend: case constants.OpAppend:
appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets)
case constants.OpRemove: case constants.OpRemove:
removeTargets(ctx, stopChanMap, transportTargets.Targets) removeTargets(ctx, stopChanMap, transportTargets.Targets, sendChan)
case constants.OpUpdate: case constants.OpUpdate:
updateTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) updateTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets)
} }
@ -377,7 +384,7 @@ func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
} }
// removeTargets define func to stops running polling goroutines for targets that were removed // removeTargets define func to stops running polling goroutines for targets that were removed
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string) { func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- []network.RealTimePullTarget) {
for _, target := range removeTargets { for _, target := range removeTargets {
stopChan, exists := stopChanMap[target] stopChan, exists := stopChanMap[target]
if !exists { if !exists {
@ -389,6 +396,25 @@ func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, re
delete(stopChanMap, target) delete(stopChanMap, target)
logger.Info(ctx, "stopped polling goroutine for removed target", "target", target) logger.Info(ctx, "stopped polling goroutine for removed target", "target", target)
} }
if len(stopChanMap) == 0 {
logger.Info(ctx, "all polling goroutines have been stopped for this client")
sendSpecialStatusToClient(ctx, sendChan)
}
}
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- []network.RealTimePullTarget) {
specialTarget := network.RealTimePullTarget{
ID: constants.SysCtrlAllRemoved,
Datas: []network.RealTimePullData{},
}
select {
case sendChan <- []network.RealTimePullTarget{specialTarget}:
logger.Info(ctx, "sent 2101 status request to sendChan")
default:
logger.Warn(ctx, "sendChan is full, skipping 2101 status message")
}
} }
// stopAllPolling stops all running query goroutines for a specific client // stopAllPolling stops all running query goroutines for a specific client

View File

@ -22,5 +22,5 @@ func (p *ProjectManager) TableName() string {
type ProjectIdentifier struct { type ProjectIdentifier struct {
Token string Token string
Tag string Tag string
MetaModel string GroupName string
} }

View File

@ -8,8 +8,9 @@ import (
) )
// registerComponentRoutes define func of register component routes // registerComponentRoutes define func of register component routes
func registerComponentRoutes(rg *gin.RouterGroup) { func registerComponentRoutes(rg *gin.RouterGroup, middlewares ...gin.HandlerFunc) {
g := rg.Group("/component/") g := rg.Group("/component/")
g.Use(middlewares...)
g.GET("attribute/get/:tokens", handler.ComponentAttributeQueryHandler) g.GET("attribute/get/:tokens", handler.ComponentAttributeQueryHandler)
g.POST("attribute/update", handler.ComponentAttributeUpdateHandler) g.POST("attribute/update", handler.ComponentAttributeUpdateHandler)
} }

View File

@ -26,5 +26,5 @@ func RegisterRoutes(engine *gin.Engine, clientToken string) {
registerMeasurementRoutes(routeGroup, middleware.SetTokenMiddleware(clientToken), measurementLimiter.Middleware) registerMeasurementRoutes(routeGroup, middleware.SetTokenMiddleware(clientToken), measurementLimiter.Middleware)
registerDataRoutes(routeGroup) registerDataRoutes(routeGroup)
registerMonitorRoutes(routeGroup) registerMonitorRoutes(routeGroup)
registerComponentRoutes(routeGroup) registerComponentRoutes(routeGroup, middleware.SetTokenMiddleware(clientToken))
} }