package cl104 import ( "context" "datart/config" "datart/log" "time" "github.com/gorilla/websocket" ) type info struct { IOA int `json:"ioa"` Val float64 `json:"val"` Q int `json:"q"` MS int64 `json:"ms"` } type Msg struct { TI int `json:"ti"` COT int `json:"cot"` PN int `json:"pn"` CA int `json:"ca"` Infos []*info `json:"infos"` } var UpChan = make(chan []byte, 64) var cl2Chan = map[string]chan []byte{} var CA2Chan = map[int]chan []byte{} func init() { conf := config.Conf().CL104ConfAll() for _, addr2CAs := range conf { for addr, cas := range addr2CAs { clChan := make(chan []byte, 16) cl2Chan[addr] = clChan for _, ca := range cas { CA2Chan[ca] = clChan } } } } func ConnectCLs() { for cl, ch := range cl2Chan { go func(ctx context.Context, cl string, ch chan []byte) { connectingCL(ctx, cl, ch) }(context.Background(), cl, ch) } } func connectingCL(ctx context.Context, cl string, ch chan []byte) { for { time.Sleep(time.Second * 5) subctx, cancel := context.WithCancel(ctx) newConnectCL(subctx, cancel, cl, ch) } } func newConnectCL(ctx context.Context, cancel context.CancelFunc, cl string, ch chan []byte) { c, _, err := websocket.DefaultDialer.DialContext(ctx, "ws://"+cl+"/api/104up", nil) if err != nil { log.Error("client dial:", err) return } defer c.Close() go monitoringCLRead(ctx, cancel, c) go monitoringCLWrite(ctx, cancel, c, ch) <-ctx.Done() } func monitoringCLRead(ctx context.Context, cancel context.CancelFunc, c *websocket.Conn) { for { select { case <-ctx.Done(): return default: mt, rm, err := c.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure) { cancel() return } if ce, ok := err.(*websocket.CloseError); ok { log.Info("client closed with code:", ce.Code, "text:", ce.Text) cancel() return } log.Error("server read:", err) cancel() return } switch mt { case websocket.TextMessage: select { case UpChan <- rm: case <-time.After(time.Second * 5): log.Error("drop msg:", string(rm)) } default: log.Error("invalid msg type:", mt) } } } } func monitoringCLWrite(ctx context.Context, cancel context.CancelFunc, c *websocket.Conn, ch chan []byte) { var err error for { select { case <-ctx.Done(): return case <-time.After(time.Second * 54): err = c.WriteControl(websocket.PingMessage, nil, time.Now().Add(time.Second*10)) if err != nil { cancel() return } case msg := <-ch: err = c.WriteMessage(websocket.TextMessage, msg) if err != nil { cancel() return } } } }