dataRT/route/ws/104up.go

232 lines
4.3 KiB
Go

package ws
import (
"context"
"datart/data/cl104"
"datart/log"
"encoding/json"
"errors"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
const (
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
writeWait = 10 * time.Second
)
type upSession struct {
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
ctrlCh chan wsMessage
}
type wsMessage struct {
mt int
data []byte
}
var upConnNum int64
func (w *Ws) Cl104Up(ctx *gin.Context) {
if atomic.SwapInt64(&upConnNum, 1) > 0 {
ctx.JSON(http.StatusConflict, nil)
return
}
conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
log.Error(err)
return
}
defer conn.Close()
stopCtx, stopCancel := context.WithCancel(ctx.Request.Context())
defer stopCancel()
session := &upSession{
conn: conn,
ctx: stopCtx,
cancel: stopCancel,
ctrlCh: make(chan wsMessage, 16),
}
if err := setUpSessionConn104(session); err != nil {
log.Error(err)
return
}
startUpWorkers(session)
}
func writeControl104(session *upSession, mt int, payload []byte) error {
select {
case <-session.ctx.Done():
return session.ctx.Err()
case session.ctrlCh <- wsMessage{mt: mt, data: payload}:
return nil
case <-time.After(writeWait):
return errors.New("write control timeout")
}
}
func setUpSessionConn104(session *upSession) error {
session.conn.SetPongHandler(func(appData string) error {
session.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
session.conn.SetPingHandler(func(appData string) error {
if err := writeControl104(session, websocket.PongMessage, []byte(appData)); err != nil {
session.cancel()
return err
}
return nil
})
session.conn.SetCloseHandler(func(code int, text string) error {
session.cancel()
return nil
})
return session.conn.SetReadDeadline(time.Now().Add(pongWait))
}
func startUpWorkers(session *upSession) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
monitorUpWrite(session)
}()
wg.Add(1)
go func() {
defer wg.Done()
monitorUpRead(session)
}()
wg.Add(1)
go func() {
defer wg.Done()
sendPing104(session)
}()
wg.Wait()
atomic.SwapInt64(&upConnNum, 0)
}
func monitorUpWrite(session *upSession) {
var err error
for {
select {
case <-session.ctx.Done():
return
case ctrl := <-session.ctrlCh:
err = session.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
session.cancel()
return
}
err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(writeWait))
if err != nil {
session.cancel()
return
}
case msg := <-cl104.UpChan:
err = session.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
session.cancel()
return
}
err = session.conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
session.cancel()
return
}
}
}
}
func monitorUpRead(session *upSession) {
for {
select {
case <-session.ctx.Done():
return
default:
mt, rm, err := session.conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
session.cancel()
return
}
if ce, ok := err.(*websocket.CloseError); ok {
log.Infof("client closed with code:", ce.Code, "text:", ce.Text)
session.cancel()
return
}
log.Error("server read:", err)
session.cancel()
return
}
switch mt {
case websocket.TextMessage:
// TODO
// if not type cl104.Msg?
msg := new(cl104.Msg)
err := json.Unmarshal(rm, msg)
if err != nil {
log.Error("server unmarshal:", err)
session.cancel()
return
}
if ch, ok := cl104.CA2Chan[msg.CA]; ok {
select {
case ch <- rm:
case <-time.After(time.Second * 5):
log.Error("drop msg:", msg)
}
} else if ch, ok := cl104.CA2Chan[0]; ok {
select {
case ch <- rm:
case <-time.After(time.Second * 5):
log.Error("drop msg:", msg)
}
}
default:
log.Info("not text:", string(rm))
}
}
}
}
func sendPing104(session *upSession) {
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
select {
case <-session.ctx.Done():
return
case <-ticker.C:
err := writeControl104(session, websocket.PingMessage, nil)
if err != nil {
session.cancel()
return
}
}
}
}