handle 104 data
This commit is contained in:
parent
ebd375d6c7
commit
a4711c553b
|
|
@ -0,0 +1,5 @@
|
|||
package config
|
||||
|
||||
func cl104ConfigName() string {
|
||||
return "cl104.json"
|
||||
}
|
||||
|
|
@ -14,6 +14,7 @@ type config struct {
|
|||
redisConf map[string]*redisConfig
|
||||
mongoConf map[string]*mongoConfig
|
||||
rabbitConf map[string][]*rabbitConfig
|
||||
cl104Conf map[string]map[string][]int
|
||||
}
|
||||
|
||||
var conf *config
|
||||
|
|
@ -52,6 +53,10 @@ func init() {
|
|||
conf.rabbitConf = make(map[string][]*rabbitConfig)
|
||||
rabbitConf := confDir + string(os.PathSeparator) + rabbitConfigName()
|
||||
conf.unmarshalJsonFile(rabbitConf, &conf.rabbitConf)
|
||||
|
||||
conf.cl104Conf = make(map[string]map[string][]int)
|
||||
cl104Conf := confDir + string(os.PathSeparator) + cl104ConfigName()
|
||||
conf.unmarshalJsonFile(cl104Conf, &conf.cl104Conf)
|
||||
}
|
||||
|
||||
func Conf() *config {
|
||||
|
|
@ -107,6 +112,13 @@ func (c *config) RabbitConf(tag string) []*rabbitConfig {
|
|||
return c.rabbitConf[tag]
|
||||
}
|
||||
|
||||
func (c *config) CL104ConfAll() map[string]map[string][]int {
|
||||
if c == nil {
|
||||
panic("config is nil")
|
||||
}
|
||||
return c.cl104Conf
|
||||
}
|
||||
|
||||
func (c *config) unmarshalJsonFile(file string, dest any) {
|
||||
if filejson, err := os.ReadFile(file); err != nil {
|
||||
panic(err.Error())
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
{
|
||||
"station000":{
|
||||
"127.0.0.1:8899":[0]
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,131 @@
|
|||
package cl104
|
||||
|
||||
import (
|
||||
"context"
|
||||
"datart/config"
|
||||
"datart/log"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
type info struct {
|
||||
IOA int `json:"ioa"`
|
||||
Val float64 `json:"val"`
|
||||
Q int `json:"q"`
|
||||
MS int64 `json:"ms"`
|
||||
}
|
||||
|
||||
type Msg struct {
|
||||
TI int `json:"ti"`
|
||||
COT int `json:"cot"`
|
||||
PN int `json:"pn"`
|
||||
CA int `json:"ca"`
|
||||
Infos []*info `json:"infos"`
|
||||
}
|
||||
|
||||
var UpChan = make(chan []byte, 64)
|
||||
var cl2Chan = map[string]chan []byte{}
|
||||
var CA2Chan = map[int]chan []byte{}
|
||||
|
||||
func init() {
|
||||
conf := config.Conf().CL104ConfAll()
|
||||
|
||||
for _, addr2CAs := range conf {
|
||||
for addr, cas := range addr2CAs {
|
||||
clChan := make(chan []byte, 16)
|
||||
cl2Chan[addr] = clChan
|
||||
for _, ca := range cas {
|
||||
CA2Chan[ca] = clChan
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ConnectCLs() {
|
||||
for cl, ch := range cl2Chan {
|
||||
go func(ctx context.Context, cl string, ch chan []byte) {
|
||||
connectingCL(ctx, cl, ch)
|
||||
}(context.Background(), cl, ch)
|
||||
}
|
||||
}
|
||||
|
||||
func connectingCL(ctx context.Context, cl string, ch chan []byte) {
|
||||
for {
|
||||
time.Sleep(time.Second * 5)
|
||||
subctx, cancel := context.WithCancel(ctx)
|
||||
newConnectCL(subctx, cancel, cl, ch)
|
||||
}
|
||||
}
|
||||
|
||||
func newConnectCL(ctx context.Context, cancel context.CancelFunc, cl string, ch chan []byte) {
|
||||
c, _, err := websocket.DefaultDialer.DialContext(ctx, "ws://"+cl+"/api/104up", nil)
|
||||
if err != nil {
|
||||
log.Error("client dial:", err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
go monitoringCLRead(ctx, cancel, c)
|
||||
go monitoringCLWrite(ctx, cancel, c, ch)
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func monitoringCLRead(ctx context.Context, cancel context.CancelFunc, c *websocket.Conn) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
mt, rm, err := c.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if ce, ok := err.(*websocket.CloseError); ok {
|
||||
log.Info("client closed with code:", ce.Code, "text:", ce.Text)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
log.Error("server read:", err)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
switch mt {
|
||||
case websocket.TextMessage:
|
||||
select {
|
||||
case UpChan <- rm:
|
||||
case <-time.After(time.Second * 5):
|
||||
log.Error("drop msg:", string(rm))
|
||||
}
|
||||
default:
|
||||
log.Error("invalid msg type:", mt)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func monitoringCLWrite(ctx context.Context, cancel context.CancelFunc, c *websocket.Conn, ch chan []byte) {
|
||||
var err error
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Second * 54):
|
||||
err = c.WriteControl(websocket.PingMessage, nil, time.Now().Add(time.Second*10))
|
||||
if err != nil {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
case msg := <-ch:
|
||||
err = c.WriteMessage(websocket.TextMessage, msg)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2,6 +2,7 @@ package data
|
|||
|
||||
import (
|
||||
"context"
|
||||
"datart/data/cl104"
|
||||
"datart/data/influx"
|
||||
"datart/data/mongo"
|
||||
"datart/data/postgres"
|
||||
|
|
@ -27,6 +28,8 @@ func (p *Processes) StartDataProcessing() {
|
|||
}
|
||||
|
||||
updatingRedisPhasor(ctx)
|
||||
|
||||
cl104.ConnectCLs()
|
||||
}
|
||||
|
||||
func (p *Processes) Cancel(ctx context.Context) {
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -5,6 +5,7 @@ go 1.24.0
|
|||
require (
|
||||
github.com/gin-gonic/gin v1.11.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0
|
||||
github.com/redis/go-redis/v9 v9.14.0
|
||||
go.mongodb.org/mongo-driver/v2 v2.3.0
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -52,6 +52,8 @@ github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J
|
|||
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package route
|
|||
import (
|
||||
"datart/route/admin"
|
||||
"datart/route/api"
|
||||
"datart/route/ws"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
|
@ -20,4 +21,8 @@ func LoadRoute(engine *gin.Engine) {
|
|||
d := new(admin.Admin)
|
||||
gd := engine.Group("admin")
|
||||
gd.POST("/command", d.PostExecuteCommand)
|
||||
|
||||
w := new(ws.Ws)
|
||||
gw := engine.Group("ws")
|
||||
gw.GET("/104up", w.Cl104Up)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,231 @@
|
|||
package ws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"datart/data/cl104"
|
||||
"datart/log"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
const (
|
||||
pongWait = 60 * time.Second
|
||||
pingPeriod = (pongWait * 9) / 10
|
||||
writeWait = 10 * time.Second
|
||||
)
|
||||
|
||||
type upSession struct {
|
||||
conn *websocket.Conn
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
ctrlCh chan wsMessage
|
||||
}
|
||||
|
||||
type wsMessage struct {
|
||||
mt int
|
||||
data []byte
|
||||
}
|
||||
|
||||
var upConnNum int64
|
||||
|
||||
func (w *Ws) Cl104Up(ctx *gin.Context) {
|
||||
if atomic.SwapInt64(&upConnNum, 1) > 0 {
|
||||
ctx.JSON(http.StatusConflict, nil)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
stopCtx, stopCancel := context.WithCancel(ctx.Request.Context())
|
||||
defer stopCancel()
|
||||
|
||||
session := &upSession{
|
||||
conn: conn,
|
||||
ctx: stopCtx,
|
||||
cancel: stopCancel,
|
||||
ctrlCh: make(chan wsMessage, 16),
|
||||
}
|
||||
|
||||
if err := setUpSessionConn104(session); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
startUpWorkers(session)
|
||||
}
|
||||
|
||||
func writeControl104(session *upSession, mt int, payload []byte) error {
|
||||
select {
|
||||
case <-session.ctx.Done():
|
||||
return session.ctx.Err()
|
||||
case session.ctrlCh <- wsMessage{mt: mt, data: payload}:
|
||||
return nil
|
||||
case <-time.After(writeWait):
|
||||
return errors.New("write control timeout")
|
||||
}
|
||||
}
|
||||
|
||||
func setUpSessionConn104(session *upSession) error {
|
||||
session.conn.SetPongHandler(func(appData string) error {
|
||||
session.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
return nil
|
||||
})
|
||||
|
||||
session.conn.SetPingHandler(func(appData string) error {
|
||||
if err := writeControl104(session, websocket.PongMessage, []byte(appData)); err != nil {
|
||||
session.cancel()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
session.conn.SetCloseHandler(func(code int, text string) error {
|
||||
session.cancel()
|
||||
return nil
|
||||
})
|
||||
|
||||
return session.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
}
|
||||
|
||||
func startUpWorkers(session *upSession) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
monitorUpWrite(session)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
monitorUpRead(session)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
sendPing104(session)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
atomic.SwapInt64(&upConnNum, 0)
|
||||
}
|
||||
|
||||
func monitorUpWrite(session *upSession) {
|
||||
var err error
|
||||
for {
|
||||
select {
|
||||
case <-session.ctx.Done():
|
||||
return
|
||||
case ctrl := <-session.ctrlCh:
|
||||
err = session.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if err != nil {
|
||||
session.cancel()
|
||||
return
|
||||
}
|
||||
err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(writeWait))
|
||||
if err != nil {
|
||||
session.cancel()
|
||||
return
|
||||
}
|
||||
case msg := <-cl104.UpChan:
|
||||
err = session.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if err != nil {
|
||||
session.cancel()
|
||||
return
|
||||
}
|
||||
err = session.conn.WriteMessage(websocket.TextMessage, msg)
|
||||
if err != nil {
|
||||
session.cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func monitorUpRead(session *upSession) {
|
||||
for {
|
||||
select {
|
||||
case <-session.ctx.Done():
|
||||
return
|
||||
default:
|
||||
mt, rm, err := session.conn.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||
session.cancel()
|
||||
return
|
||||
}
|
||||
if ce, ok := err.(*websocket.CloseError); ok {
|
||||
log.Infof("client closed with code:", ce.Code, "text:", ce.Text)
|
||||
session.cancel()
|
||||
return
|
||||
}
|
||||
log.Error("server read:", err)
|
||||
session.cancel()
|
||||
return
|
||||
}
|
||||
|
||||
switch mt {
|
||||
case websocket.TextMessage:
|
||||
// TODO
|
||||
// if not type cl104.Msg?
|
||||
msg := new(cl104.Msg)
|
||||
err := json.Unmarshal(rm, msg)
|
||||
if err != nil {
|
||||
log.Error("server unmarshal:", err)
|
||||
session.cancel()
|
||||
return
|
||||
}
|
||||
|
||||
if ch, ok := cl104.CA2Chan[msg.CA]; ok {
|
||||
select {
|
||||
case ch <- rm:
|
||||
case <-time.After(time.Second * 5):
|
||||
log.Error("drop msg:", msg)
|
||||
}
|
||||
} else if ch, ok := cl104.CA2Chan[0]; ok {
|
||||
select {
|
||||
case ch <- rm:
|
||||
case <-time.After(time.Second * 5):
|
||||
log.Error("drop msg:", msg)
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
log.Info("not text:", string(rm))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sendPing104(session *upSession) {
|
||||
|
||||
ticker := time.NewTicker(pingPeriod)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-session.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
err := writeControl104(session, websocket.PingMessage, nil)
|
||||
if err != nil {
|
||||
session.cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
package ws
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
type Ws struct{}
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 4096,
|
||||
WriteBufferPool: &sync.Pool{
|
||||
New: func() any {
|
||||
return make([]byte, 4096)
|
||||
},
|
||||
},
|
||||
}
|
||||
Loading…
Reference in New Issue