From 535f56551971203409fc6f60492c67d3c0acee1d Mon Sep 17 00:00:00 2001 From: zhuxu Date: Wed, 24 Jun 2026 16:11:26 +0800 Subject: [PATCH] update for 104 --- data/cl104/cl104.go | 84 ++++++++++++++++++++++++++++++++++++++++++--- route/api/104up.go | 35 +++++++++++++++---- 2 files changed, 109 insertions(+), 10 deletions(-) diff --git a/data/cl104/cl104.go b/data/cl104/cl104.go index 0e7a1e3..a133cfc 100644 --- a/data/cl104/cl104.go +++ b/data/cl104/cl104.go @@ -4,6 +4,7 @@ import ( "context" "datart/config" "datart/log" + "sync" "time" "github.com/gorilla/websocket" @@ -24,7 +25,17 @@ type Msg struct { Infos []*info `json:"infos"` } -var UpChan = make(chan []byte, 64) +const UpConnLimit int64 = 10 + +type StatusCh struct { + Status byte + UpChan chan []byte +} + +var broadcastCh = make(chan []byte, 1024) +var UpChans []*StatusCh = make([]*StatusCh, UpConnLimit) +var UpChansMu sync.RWMutex + var cl2Chan = map[string]chan []byte{} var CA2Chan = map[int]chan []byte{} @@ -40,6 +51,29 @@ func init() { } } } + + for i := 0; i < len(UpChans); i++ { + UpChans[i] = &StatusCh{ + Status: 0, + UpChan: make(chan []byte, 16), + } + } + + go func() { + for data := range broadcastCh { + d := append([]byte(nil), data...) + UpChansMu.RLock() + for i := 0; i < len(UpChans); i++ { + if UpChans[i].Status == 1 { + select { + case UpChans[i].UpChan <- d: + default: + } + } + } + UpChansMu.RUnlock() + } + }() } func ConnectCLs(ctx context.Context) { @@ -59,7 +93,7 @@ func connectingCL(ctx context.Context, cl string, ch chan []byte) { } func newConnectCL(ctx context.Context, cancel context.CancelFunc, cl string, ch chan []byte) { - c, _, err := websocket.DefaultDialer.DialContext(ctx, "ws://"+cl+"/api/104up", nil) + c, _, err := websocket.DefaultDialer.DialContext(ctx, "ws://"+cl+"/api/104", nil) if err != nil { log.Error("client dial:", err) return @@ -97,9 +131,9 @@ func monitoringCLRead(ctx context.Context, cancel context.CancelFunc, c *websock switch mt { case websocket.TextMessage: select { - case UpChan <- rm: + case broadcastCh <- rm: case <-time.After(time.Second * 5): - log.Error("drop msg:", string(rm)) + log.Error("drop cl msg:", string(rm)) } default: log.Error("invalid msg type:", mt) @@ -129,3 +163,45 @@ func monitoringCLWrite(ctx context.Context, cancel context.CancelFunc, c *websoc } } } + +func AcquireUpSlot() (int, bool) { + UpChansMu.Lock() + defer UpChansMu.Unlock() + for i := 0; i < len(UpChans); i++ { + if UpChans[i].Status == 0 { + UpChans[i].Status = 1 + return i, true + } + } + return -1, false +} + +func ReleaseUpSlot(idx int) { + if idx < 0 || idx >= len(UpChans) { + return + } + + UpChansMu.Lock() + old := UpChans[idx].UpChan + UpChans[idx].UpChan = make(chan []byte, 16) + UpChans[idx].Status = 0 + UpChansMu.Unlock() + + cleanupDone := make(chan struct{}) + go func() { + defer close(cleanupDone) + for { + select { + case <-old: + continue + default: + return + } + } + }() + + select { + case <-cleanupDone: + case <-time.After(100 * time.Millisecond): + } +} diff --git a/route/api/104up.go b/route/api/104up.go index e585742..08b4208 100644 --- a/route/api/104up.go +++ b/route/api/104up.go @@ -22,6 +22,7 @@ const ( ) type upSession struct { + idx int conn *websocket.Conn ctx context.Context cancel context.CancelFunc @@ -43,17 +44,22 @@ var upgrader = websocket.Upgrader{ }, } +var lock sync.Mutex var upConnNum int64 func (a *Api) Cl104Up(ctx *gin.Context) { - if atomic.SwapInt64(&upConnNum, 1) > 0 { - ctx.JSON(http.StatusConflict, nil) + lock.Lock() + if upConnNum+1 > cl104.UpConnLimit { + lock.Unlock() + ctx.JSON(http.StatusNotAcceptable, nil) return } conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil) if err != nil { + lock.Unlock() log.Error(err) + ctx.JSON(http.StatusInternalServerError, err.Error()) return } defer conn.Close() @@ -62,6 +68,7 @@ func (a *Api) Cl104Up(ctx *gin.Context) { defer stopCancel() session := &upSession{ + idx: -1, conn: conn, ctx: stopCtx, cancel: stopCancel, @@ -69,11 +76,27 @@ func (a *Api) Cl104Up(ctx *gin.Context) { } if err := setUpSessionConn104Up(session); err != nil { + lock.Unlock() log.Error(err) + ctx.JSON(http.StatusInternalServerError, err.Error()) return } + idx, ok := cl104.AcquireUpSlot() + if !ok { + lock.Unlock() + ctx.JSON(http.StatusServiceUnavailable, "no slot available") + return + } + session.idx = idx + + atomic.AddInt64(&upConnNum, 1) + lock.Unlock() + start104UpWorkers(session) + + cl104.ReleaseUpSlot(session.idx) + atomic.AddInt64(&upConnNum, -1) } func writeControl104Up(session *upSession, mt int, payload []byte) error { @@ -131,7 +154,6 @@ func start104UpWorkers(session *upSession) { }() wg.Wait() - atomic.SwapInt64(&upConnNum, 0) } func monitor104UpWrite(session *upSession) { @@ -151,7 +173,7 @@ func monitor104UpWrite(session *upSession) { session.cancel() return } - case msg := <-cl104.UpChan: + case msg := <-cl104.UpChans[session.idx].UpChan: err = session.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err != nil { session.cancel() @@ -200,15 +222,16 @@ func monitor104UpRead(session *upSession) { return } + data := append([]byte(nil), rm...) if ch, ok := cl104.CA2Chan[msg.CA]; ok { select { - case ch <- rm: + case ch <- data: case <-time.After(time.Second * 5): log.Error("drop msg:", msg) } } else if ch, ok := cl104.CA2Chan[0]; ok { select { - case ch <- rm: + case ch <- data: case <-time.After(time.Second * 5): log.Error("drop msg:", msg) }