diff --git a/config/cl104.go b/config/cl104.go new file mode 100644 index 0000000..f5b47db --- /dev/null +++ b/config/cl104.go @@ -0,0 +1,5 @@ +package config + +func cl104ConfigName() string { + return "cl104.json" +} diff --git a/config/config.go b/config/config.go index 2b4d14e..00e9743 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,7 @@ type config struct { redisConf map[string]*redisConfig mongoConf map[string]*mongoConfig rabbitConf map[string][]*rabbitConfig + cl104Conf map[string]map[string][]int } var conf *config @@ -52,6 +53,10 @@ func init() { conf.rabbitConf = make(map[string][]*rabbitConfig) rabbitConf := confDir + string(os.PathSeparator) + rabbitConfigName() conf.unmarshalJsonFile(rabbitConf, &conf.rabbitConf) + + conf.cl104Conf = make(map[string]map[string][]int) + cl104Conf := confDir + string(os.PathSeparator) + cl104ConfigName() + conf.unmarshalJsonFile(cl104Conf, &conf.cl104Conf) } func Conf() *config { @@ -107,6 +112,13 @@ func (c *config) RabbitConf(tag string) []*rabbitConfig { return c.rabbitConf[tag] } +func (c *config) CL104ConfAll() map[string]map[string][]int { + if c == nil { + panic("config is nil") + } + return c.cl104Conf +} + func (c *config) unmarshalJsonFile(file string, dest any) { if filejson, err := os.ReadFile(file); err != nil { panic(err.Error()) diff --git a/configs/cl104.json b/configs/cl104.json new file mode 100644 index 0000000..4ce0806 --- /dev/null +++ b/configs/cl104.json @@ -0,0 +1,5 @@ +{ + "station000":{ + "127.0.0.1:8899":[0] + } +} \ No newline at end of file diff --git a/data/cl104/cl104.go b/data/cl104/cl104.go new file mode 100644 index 0000000..9e6e216 --- /dev/null +++ b/data/cl104/cl104.go @@ -0,0 +1,131 @@ +package cl104 + +import ( + "context" + "datart/config" + "datart/log" + "time" + + "github.com/gorilla/websocket" +) + +type info struct { + IOA int `json:"ioa"` + Val float64 `json:"val"` + Q int `json:"q"` + MS int64 `json:"ms"` +} + +type Msg struct { + TI int `json:"ti"` + COT int `json:"cot"` + PN int `json:"pn"` + CA int `json:"ca"` + Infos []*info `json:"infos"` +} + +var UpChan = make(chan []byte, 64) +var cl2Chan = map[string]chan []byte{} +var CA2Chan = map[int]chan []byte{} + +func init() { + conf := config.Conf().CL104ConfAll() + + for _, addr2CAs := range conf { + for addr, cas := range addr2CAs { + clChan := make(chan []byte, 16) + cl2Chan[addr] = clChan + for _, ca := range cas { + CA2Chan[ca] = clChan + } + } + } +} + +func ConnectCLs() { + for cl, ch := range cl2Chan { + go func(ctx context.Context, cl string, ch chan []byte) { + connectingCL(ctx, cl, ch) + }(context.Background(), cl, ch) + } +} + +func connectingCL(ctx context.Context, cl string, ch chan []byte) { + for { + time.Sleep(time.Second * 5) + subctx, cancel := context.WithCancel(ctx) + newConnectCL(subctx, cancel, cl, ch) + } +} + +func newConnectCL(ctx context.Context, cancel context.CancelFunc, cl string, ch chan []byte) { + c, _, err := websocket.DefaultDialer.DialContext(ctx, "ws://"+cl+"/api/104up", nil) + if err != nil { + log.Error("client dial:", err) + return + } + defer c.Close() + + go monitoringCLRead(ctx, cancel, c) + go monitoringCLWrite(ctx, cancel, c, ch) + + <-ctx.Done() +} + +func monitoringCLRead(ctx context.Context, cancel context.CancelFunc, c *websocket.Conn) { + for { + select { + case <-ctx.Done(): + return + default: + mt, rm, err := c.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + cancel() + return + } + if ce, ok := err.(*websocket.CloseError); ok { + log.Info("client closed with code:", ce.Code, "text:", ce.Text) + cancel() + return + } + log.Error("server read:", err) + cancel() + return + } + + switch mt { + case websocket.TextMessage: + select { + case UpChan <- rm: + case <-time.After(time.Second * 5): + log.Error("drop msg:", string(rm)) + } + default: + log.Error("invalid msg type:", mt) + } + } + } +} + +func monitoringCLWrite(ctx context.Context, cancel context.CancelFunc, c *websocket.Conn, ch chan []byte) { + var err error + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second * 54): + err = c.WriteControl(websocket.PingMessage, nil, time.Now().Add(time.Second*10)) + if err != nil { + cancel() + return + } + case msg := <-ch: + err = c.WriteMessage(websocket.TextMessage, msg) + if err != nil { + cancel() + return + } + } + } +} diff --git a/data/data.go b/data/data.go index ce2558e..2b1437d 100644 --- a/data/data.go +++ b/data/data.go @@ -2,6 +2,7 @@ package data import ( "context" + "datart/data/cl104" "datart/data/influx" "datart/data/mongo" "datart/data/postgres" @@ -27,6 +28,8 @@ func (p *Processes) StartDataProcessing() { } updatingRedisPhasor(ctx) + + cl104.ConnectCLs() } func (p *Processes) Cancel(ctx context.Context) { diff --git a/go.mod b/go.mod index 2f40a2d..91cf436 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.0 require ( github.com/gin-gonic/gin v1.11.0 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0 github.com/redis/go-redis/v9 v9.14.0 go.mongodb.org/mongo-driver/v2 v2.3.0 diff --git a/go.sum b/go.sum index 4ca6138..04f4197 100644 --- a/go.sum +++ b/go.sum @@ -52,6 +52,8 @@ github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/route/route.go b/route/route.go index 29bfa54..e889770 100644 --- a/route/route.go +++ b/route/route.go @@ -3,6 +3,7 @@ package route import ( "datart/route/admin" "datart/route/api" + "datart/route/ws" "github.com/gin-gonic/gin" ) @@ -20,4 +21,8 @@ func LoadRoute(engine *gin.Engine) { d := new(admin.Admin) gd := engine.Group("admin") gd.POST("/command", d.PostExecuteCommand) + + w := new(ws.Ws) + gw := engine.Group("ws") + gw.GET("/104up", w.Cl104Up) } diff --git a/route/ws/104up.go b/route/ws/104up.go new file mode 100644 index 0000000..b6f436d --- /dev/null +++ b/route/ws/104up.go @@ -0,0 +1,231 @@ +package ws + +import ( + "context" + "datart/data/cl104" + "datart/log" + "encoding/json" + "errors" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" +) + +const ( + pongWait = 60 * time.Second + pingPeriod = (pongWait * 9) / 10 + writeWait = 10 * time.Second +) + +type upSession struct { + conn *websocket.Conn + ctx context.Context + cancel context.CancelFunc + ctrlCh chan wsMessage +} + +type wsMessage struct { + mt int + data []byte +} + +var upConnNum int64 + +func (w *Ws) Cl104Up(ctx *gin.Context) { + if atomic.SwapInt64(&upConnNum, 1) > 0 { + ctx.JSON(http.StatusConflict, nil) + return + } + + conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil) + if err != nil { + log.Error(err) + return + } + defer conn.Close() + + stopCtx, stopCancel := context.WithCancel(ctx.Request.Context()) + defer stopCancel() + + session := &upSession{ + conn: conn, + ctx: stopCtx, + cancel: stopCancel, + ctrlCh: make(chan wsMessage, 16), + } + + if err := setUpSessionConn104(session); err != nil { + log.Error(err) + return + } + + startUpWorkers(session) +} + +func writeControl104(session *upSession, mt int, payload []byte) error { + select { + case <-session.ctx.Done(): + return session.ctx.Err() + case session.ctrlCh <- wsMessage{mt: mt, data: payload}: + return nil + case <-time.After(writeWait): + return errors.New("write control timeout") + } +} + +func setUpSessionConn104(session *upSession) error { + session.conn.SetPongHandler(func(appData string) error { + session.conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + session.conn.SetPingHandler(func(appData string) error { + if err := writeControl104(session, websocket.PongMessage, []byte(appData)); err != nil { + session.cancel() + return err + } + return nil + }) + + session.conn.SetCloseHandler(func(code int, text string) error { + session.cancel() + return nil + }) + + return session.conn.SetReadDeadline(time.Now().Add(pongWait)) +} + +func startUpWorkers(session *upSession) { + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + monitorUpWrite(session) + }() + + wg.Add(1) + go func() { + defer wg.Done() + monitorUpRead(session) + }() + + wg.Add(1) + go func() { + defer wg.Done() + sendPing104(session) + }() + + wg.Wait() + atomic.SwapInt64(&upConnNum, 0) +} + +func monitorUpWrite(session *upSession) { + var err error + for { + select { + case <-session.ctx.Done(): + return + case ctrl := <-session.ctrlCh: + err = session.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err != nil { + session.cancel() + return + } + err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(writeWait)) + if err != nil { + session.cancel() + return + } + case msg := <-cl104.UpChan: + err = session.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err != nil { + session.cancel() + return + } + err = session.conn.WriteMessage(websocket.TextMessage, msg) + if err != nil { + session.cancel() + return + } + } + } +} + +func monitorUpRead(session *upSession) { + for { + select { + case <-session.ctx.Done(): + return + default: + mt, rm, err := session.conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + session.cancel() + return + } + if ce, ok := err.(*websocket.CloseError); ok { + log.Infof("client closed with code:", ce.Code, "text:", ce.Text) + session.cancel() + return + } + log.Error("server read:", err) + session.cancel() + return + } + + switch mt { + case websocket.TextMessage: + // TODO + // if not type cl104.Msg? + msg := new(cl104.Msg) + err := json.Unmarshal(rm, msg) + if err != nil { + log.Error("server unmarshal:", err) + session.cancel() + return + } + + if ch, ok := cl104.CA2Chan[msg.CA]; ok { + select { + case ch <- rm: + case <-time.After(time.Second * 5): + log.Error("drop msg:", msg) + } + } else if ch, ok := cl104.CA2Chan[0]; ok { + select { + case ch <- rm: + case <-time.After(time.Second * 5): + log.Error("drop msg:", msg) + } + } + + default: + log.Info("not text:", string(rm)) + } + } + } +} + +func sendPing104(session *upSession) { + + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + for { + select { + case <-session.ctx.Done(): + return + case <-ticker.C: + err := writeControl104(session, websocket.PingMessage, nil) + if err != nil { + session.cancel() + return + } + } + } +} diff --git a/route/ws/ws.go b/route/ws/ws.go new file mode 100644 index 0000000..80a0903 --- /dev/null +++ b/route/ws/ws.go @@ -0,0 +1,19 @@ +package ws + +import ( + "sync" + + "github.com/gorilla/websocket" +) + +type Ws struct{} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 4096, + WriteBufferPool: &sync.Pool{ + New: func() any { + return make([]byte, 4096) + }, + }, +}