optimzie code of constants package

This commit is contained in:
douxu 2026-01-16 17:08:28 +08:00
parent d3b1f0afbe
commit ba5e5b3d1c
2 changed files with 2 additions and 3 deletions

View File

@ -164,6 +164,7 @@ func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network
return conn.WriteJSON(response)
}
// sendDataStream define func to manages a dedicated goroutine to push data batches or system signals to the websocket client
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) {
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
for targetsData := range sendChan {
@ -192,7 +193,7 @@ func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string,
logger.Info(ctx, "sender goroutine exiting as channel is closed", "client_id", clientID)
}
// processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target
// processTargetPolling define func to process target in subscription map and data is continuously retrieved from redis based on the target
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- []network.RealTimePullTarget) {
// ensure the fanInChan will not leak
defer close(fanInChan)
@ -207,8 +208,6 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin
}
s.globalMutex.RUnlock()
logger.Info(ctx, fmt.Sprintf("found subscription config for clientID:%s, start initial polling goroutines", clientID), "components len", config.measurements)
config.mutex.RLock()
for interval, measurementTargets := range config.measurements {
for _, target := range measurementTargets {