update for 104

This commit is contained in:
zhuxu 2026-06-24 16:11:26 +08:00
parent 04c6991403
commit 535f565519
2 changed files with 109 additions and 10 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"datart/config"
"datart/log"
"sync"
"time"
"github.com/gorilla/websocket"
@ -24,7 +25,17 @@ type Msg struct {
Infos []*info `json:"infos"`
}
var UpChan = make(chan []byte, 64)
const UpConnLimit int64 = 10
type StatusCh struct {
Status byte
UpChan chan []byte
}
var broadcastCh = make(chan []byte, 1024)
var UpChans []*StatusCh = make([]*StatusCh, UpConnLimit)
var UpChansMu sync.RWMutex
var cl2Chan = map[string]chan []byte{}
var CA2Chan = map[int]chan []byte{}
@ -40,6 +51,29 @@ func init() {
}
}
}
for i := 0; i < len(UpChans); i++ {
UpChans[i] = &StatusCh{
Status: 0,
UpChan: make(chan []byte, 16),
}
}
go func() {
for data := range broadcastCh {
d := append([]byte(nil), data...)
UpChansMu.RLock()
for i := 0; i < len(UpChans); i++ {
if UpChans[i].Status == 1 {
select {
case UpChans[i].UpChan <- d:
default:
}
}
}
UpChansMu.RUnlock()
}
}()
}
func ConnectCLs(ctx context.Context) {
@ -59,7 +93,7 @@ func connectingCL(ctx context.Context, cl string, ch chan []byte) {
}
func newConnectCL(ctx context.Context, cancel context.CancelFunc, cl string, ch chan []byte) {
c, _, err := websocket.DefaultDialer.DialContext(ctx, "ws://"+cl+"/api/104up", nil)
c, _, err := websocket.DefaultDialer.DialContext(ctx, "ws://"+cl+"/api/104", nil)
if err != nil {
log.Error("client dial:", err)
return
@ -97,9 +131,9 @@ func monitoringCLRead(ctx context.Context, cancel context.CancelFunc, c *websock
switch mt {
case websocket.TextMessage:
select {
case UpChan <- rm:
case broadcastCh <- rm:
case <-time.After(time.Second * 5):
log.Error("drop msg:", string(rm))
log.Error("drop cl msg:", string(rm))
}
default:
log.Error("invalid msg type:", mt)
@ -129,3 +163,45 @@ func monitoringCLWrite(ctx context.Context, cancel context.CancelFunc, c *websoc
}
}
}
func AcquireUpSlot() (int, bool) {
UpChansMu.Lock()
defer UpChansMu.Unlock()
for i := 0; i < len(UpChans); i++ {
if UpChans[i].Status == 0 {
UpChans[i].Status = 1
return i, true
}
}
return -1, false
}
func ReleaseUpSlot(idx int) {
if idx < 0 || idx >= len(UpChans) {
return
}
UpChansMu.Lock()
old := UpChans[idx].UpChan
UpChans[idx].UpChan = make(chan []byte, 16)
UpChans[idx].Status = 0
UpChansMu.Unlock()
cleanupDone := make(chan struct{})
go func() {
defer close(cleanupDone)
for {
select {
case <-old:
continue
default:
return
}
}
}()
select {
case <-cleanupDone:
case <-time.After(100 * time.Millisecond):
}
}

View File

@ -22,6 +22,7 @@ const (
)
type upSession struct {
idx int
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
@ -43,17 +44,22 @@ var upgrader = websocket.Upgrader{
},
}
var lock sync.Mutex
var upConnNum int64
func (a *Api) Cl104Up(ctx *gin.Context) {
if atomic.SwapInt64(&upConnNum, 1) > 0 {
ctx.JSON(http.StatusConflict, nil)
lock.Lock()
if upConnNum+1 > cl104.UpConnLimit {
lock.Unlock()
ctx.JSON(http.StatusNotAcceptable, nil)
return
}
conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
lock.Unlock()
log.Error(err)
ctx.JSON(http.StatusInternalServerError, err.Error())
return
}
defer conn.Close()
@ -62,6 +68,7 @@ func (a *Api) Cl104Up(ctx *gin.Context) {
defer stopCancel()
session := &upSession{
idx: -1,
conn: conn,
ctx: stopCtx,
cancel: stopCancel,
@ -69,11 +76,27 @@ func (a *Api) Cl104Up(ctx *gin.Context) {
}
if err := setUpSessionConn104Up(session); err != nil {
lock.Unlock()
log.Error(err)
ctx.JSON(http.StatusInternalServerError, err.Error())
return
}
idx, ok := cl104.AcquireUpSlot()
if !ok {
lock.Unlock()
ctx.JSON(http.StatusServiceUnavailable, "no slot available")
return
}
session.idx = idx
atomic.AddInt64(&upConnNum, 1)
lock.Unlock()
start104UpWorkers(session)
cl104.ReleaseUpSlot(session.idx)
atomic.AddInt64(&upConnNum, -1)
}
func writeControl104Up(session *upSession, mt int, payload []byte) error {
@ -131,7 +154,6 @@ func start104UpWorkers(session *upSession) {
}()
wg.Wait()
atomic.SwapInt64(&upConnNum, 0)
}
func monitor104UpWrite(session *upSession) {
@ -151,7 +173,7 @@ func monitor104UpWrite(session *upSession) {
session.cancel()
return
}
case msg := <-cl104.UpChan:
case msg := <-cl104.UpChans[session.idx].UpChan:
err = session.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
session.cancel()
@ -200,15 +222,16 @@ func monitor104UpRead(session *upSession) {
return
}
data := append([]byte(nil), rm...)
if ch, ok := cl104.CA2Chan[msg.CA]; ok {
select {
case ch <- rm:
case ch <- data:
case <-time.After(time.Second * 5):
log.Error("drop msg:", msg)
}
} else if ch, ok := cl104.CA2Chan[0]; ok {
select {
case ch <- rm:
case ch <- data:
case <-time.After(time.Second * 5):
log.Error("drop msg:", msg)
}