package api import ( "context" "datart/data/cl104" "datart/log" "encoding/json" "errors" "net/http" "sync" "sync/atomic" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) const ( pongWait = 60 * time.Second pingPeriod = (pongWait * 9) / 10 writeWait = 10 * time.Second ) type upSession struct { idx int conn *websocket.Conn ctx context.Context cancel context.CancelFunc ctrlCh chan wsMessage } type wsMessage struct { mt int data []byte } var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, WriteBufferPool: &sync.Pool{ New: func() any { return make([]byte, 1024) }, }, } var lock sync.Mutex var upConnNum int64 func (a *Api) Cl104Up(ctx *gin.Context) { lock.Lock() if upConnNum+1 > cl104.UpConnLimit { lock.Unlock() ctx.JSON(http.StatusNotAcceptable, nil) return } conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil) if err != nil { lock.Unlock() log.Error(err) ctx.JSON(http.StatusInternalServerError, err.Error()) return } defer conn.Close() stopCtx, stopCancel := context.WithCancel(ctx.Request.Context()) defer stopCancel() session := &upSession{ idx: -1, conn: conn, ctx: stopCtx, cancel: stopCancel, ctrlCh: make(chan wsMessage, 16), } if err := setUpSessionConn104Up(session); err != nil { lock.Unlock() log.Error(err) ctx.JSON(http.StatusInternalServerError, err.Error()) return } idx, ok := cl104.AcquireUpSlot() if !ok { lock.Unlock() ctx.JSON(http.StatusServiceUnavailable, "no slot available") return } session.idx = idx atomic.AddInt64(&upConnNum, 1) lock.Unlock() start104UpWorkers(session) cl104.ReleaseUpSlot(session.idx) atomic.AddInt64(&upConnNum, -1) } func writeControl104Up(session *upSession, mt int, payload []byte) error { select { case <-session.ctx.Done(): return session.ctx.Err() case session.ctrlCh <- wsMessage{mt: mt, data: payload}: return nil case <-time.After(writeWait): return errors.New("write control timeout") } } func setUpSessionConn104Up(session *upSession) error { session.conn.SetPongHandler(func(appData string) error { session.conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) session.conn.SetPingHandler(func(appData string) error { if err := writeControl104Up(session, websocket.PongMessage, []byte(appData)); err != nil { session.cancel() return err } return nil }) session.conn.SetCloseHandler(func(code int, text string) error { session.cancel() return nil }) return session.conn.SetReadDeadline(time.Now().Add(pongWait)) } func start104UpWorkers(session *upSession) { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() monitor104UpWrite(session) }() wg.Add(1) go func() { defer wg.Done() monitor104UpRead(session) }() wg.Add(1) go func() { defer wg.Done() sendPing104Up(session) }() wg.Wait() } func monitor104UpWrite(session *upSession) { var err error for { select { case <-session.ctx.Done(): return case ctrl := <-session.ctrlCh: err = session.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err != nil { session.cancel() return } err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(writeWait)) if err != nil { session.cancel() return } case msg := <-cl104.UpChans[session.idx].UpChan: err = session.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err != nil { session.cancel() return } err = session.conn.WriteMessage(websocket.TextMessage, msg) if err != nil { session.cancel() return } } } } func monitor104UpRead(session *upSession) { for { select { case <-session.ctx.Done(): return default: mt, rm, err := session.conn.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure) { session.cancel() return } if ce, ok := err.(*websocket.CloseError); ok { log.Infof("client closed with code:", ce.Code, "text:", ce.Text) session.cancel() return } log.Error("server read:", err) session.cancel() return } switch mt { case websocket.TextMessage: // TODO // if not type cl104.Msg? msg := new(cl104.Msg) err := json.Unmarshal(rm, msg) if err != nil { log.Error("server unmarshal:", err) session.cancel() return } data := append([]byte(nil), rm...) if ch, ok := cl104.CA2Chan[msg.CA]; ok { select { case ch <- data: case <-time.After(time.Second * 5): log.Error("drop msg:", msg) } } else if ch, ok := cl104.CA2Chan[0]; ok { select { case ch <- data: case <-time.After(time.Second * 5): log.Error("drop msg:", msg) } } default: log.Info("not text:", string(rm)) } } } } func sendPing104Up(session *upSession) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() for { select { case <-session.ctx.Done(): return case <-ticker.C: err := writeControl104Up(session, websocket.PingMessage, nil) if err != nil { session.cancel() return } } } }