package ws import ( "context" "datart/data/cl104" "datart/log" "encoding/json" "errors" "net/http" "sync" "sync/atomic" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) const ( pongWait = 60 * time.Second pingPeriod = (pongWait * 9) / 10 writeWait = 10 * time.Second ) type upSession struct { conn *websocket.Conn ctx context.Context cancel context.CancelFunc ctrlCh chan wsMessage } type wsMessage struct { mt int data []byte } var upConnNum int64 func (w *Ws) Cl104Up(ctx *gin.Context) { if atomic.SwapInt64(&upConnNum, 1) > 0 { ctx.JSON(http.StatusConflict, nil) return } conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil) if err != nil { log.Error(err) return } defer conn.Close() stopCtx, stopCancel := context.WithCancel(ctx.Request.Context()) defer stopCancel() session := &upSession{ conn: conn, ctx: stopCtx, cancel: stopCancel, ctrlCh: make(chan wsMessage, 16), } if err := setUpSessionConn104(session); err != nil { log.Error(err) return } startUpWorkers(session) } func writeControl104(session *upSession, mt int, payload []byte) error { select { case <-session.ctx.Done(): return session.ctx.Err() case session.ctrlCh <- wsMessage{mt: mt, data: payload}: return nil case <-time.After(writeWait): return errors.New("write control timeout") } } func setUpSessionConn104(session *upSession) error { session.conn.SetPongHandler(func(appData string) error { session.conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) session.conn.SetPingHandler(func(appData string) error { if err := writeControl104(session, websocket.PongMessage, []byte(appData)); err != nil { session.cancel() return err } return nil }) session.conn.SetCloseHandler(func(code int, text string) error { session.cancel() return nil }) return session.conn.SetReadDeadline(time.Now().Add(pongWait)) } func startUpWorkers(session *upSession) { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() monitorUpWrite(session) }() wg.Add(1) go func() { defer wg.Done() monitorUpRead(session) }() wg.Add(1) go func() { defer wg.Done() sendPing104(session) }() wg.Wait() atomic.SwapInt64(&upConnNum, 0) } func monitorUpWrite(session *upSession) { var err error for { select { case <-session.ctx.Done(): return case ctrl := <-session.ctrlCh: err = session.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err != nil { session.cancel() return } err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(writeWait)) if err != nil { session.cancel() return } case msg := <-cl104.UpChan: err = session.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err != nil { session.cancel() return } err = session.conn.WriteMessage(websocket.TextMessage, msg) if err != nil { session.cancel() return } } } } func monitorUpRead(session *upSession) { for { select { case <-session.ctx.Done(): return default: mt, rm, err := session.conn.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure) { session.cancel() return } if ce, ok := err.(*websocket.CloseError); ok { log.Infof("client closed with code:", ce.Code, "text:", ce.Text) session.cancel() return } log.Error("server read:", err) session.cancel() return } switch mt { case websocket.TextMessage: // TODO // if not type cl104.Msg? msg := new(cl104.Msg) err := json.Unmarshal(rm, msg) if err != nil { log.Error("server unmarshal:", err) session.cancel() return } if ch, ok := cl104.CA2Chan[msg.CA]; ok { select { case ch <- rm: case <-time.After(time.Second * 5): log.Error("drop msg:", msg) } } else if ch, ok := cl104.CA2Chan[0]; ok { select { case ch <- rm: case <-time.After(time.Second * 5): log.Error("drop msg:", msg) } } default: log.Info("not text:", string(rm)) } } } } func sendPing104(session *upSession) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() for { select { case <-session.ctx.Done(): return case <-ticker.C: err := writeControl104(session, websocket.PingMessage, nil) if err != nil { session.cancel() return } } } }