package cl_104 import ( "context" "encoding/json" "fmt" "net/http" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" "github.com/influxdata/telegraf/internal" ) var clConnNum int64 func (h *CL104) serveClstream(res http.ResponseWriter, req *http.Request) { select { case <-h.close: res.WriteHeader(http.StatusGone) return default: if atomic.SwapInt64(&clConnNum, 1) > 0 { res.WriteHeader(http.StatusConflict) return } } conn, err := upgrader.Upgrade(res, req, nil) if err != nil { h.Log.Error(err) return } defer conn.Close() stopCtx, stopCancel := context.WithCancel(context.Background()) defer stopCancel() session := &wsSession{ conn: conn, ctx: stopCtx, cancel: stopCancel, ctrlCh: make(chan wsMsg, 2), } if err := h.setUpSessionConn(session); err != nil { h.Log.Error(err) return } h.startClWorkers(session) } func (h *CL104) startClWorkers(session *wsSession) { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() h.monitorClWrite(session) }() wg.Add(1) go func() { defer wg.Done() h.monitorClRead(session) }() wg.Add(1) go func() { defer wg.Done() h.sendPing(session) }() wg.Wait() atomic.SwapInt64(&clConnNum, 0) } func (h *CL104) monitorClWrite(session *wsSession) { var err error for { select { case <-session.ctx.Done(): return case ctrl := <-session.ctrlCh: err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait))) if err != nil { session.cancel() return } err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(time.Duration(h.WriteWait))) if err != nil { session.cancel() return } case msg := <-h.clChan: err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait))) if err != nil { session.cancel() return } err = session.conn.WriteMessage(websocket.TextMessage, msg) if err != nil { session.cancel() return } } } } func (h *CL104) monitorClRead(session *wsSession) { for { select { case <-session.ctx.Done(): return default: mt, rm, err := session.conn.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure) { session.cancel() return } if ce, ok := err.(*websocket.CloseError); ok { h.Log.Infof("client closed with code:", ce.Code, "text:", ce.Text) session.cancel() return } h.Log.Error("server read:", err) session.cancel() return } switch mt { case websocket.TextMessage: if err := h.fromClstream(session, rm); err != nil { h.Log.Error(err) } default: h.Log.Info("not text:", string(rm)) } } } } func (h *CL104) fromClstream(session *wsSession, m []byte) error { msg := new(msg) if err := json.Unmarshal(m, msg); err != nil { return err } switch { case msg.TI >= 1 && msg.TI <= 40, msg.TI >= 110 && msg.TI <= 119: metrics, err := h.Parse(m) if err != nil { h.Log.Debugf("parse error: %s", err.Error()) return err } if len(metrics) == 0 { once.Do(func() { h.Log.Debug(internal.NoMetricsCreatedMsg) }) } for _, m := range metrics { h.acc.AddMetric(m) } case msg.TI >= 45 && msg.TI <= 69, msg.TI >= 100 && msg.TI <= 109: select { case <-session.ctx.Done(): return session.ctx.Err() default: msg.PN = msg.COT & 0x40 msg.COT = msg.COT & 0x3F if m, err := json.Marshal(msg); err != nil { return err } else { select { case h.upChan <- m: case <-time.After(time.Second * 5): h.Log.Error("drop up msg:", msg) } } } default: return fmt.Errorf("invalid TI: %d", msg.TI) } return nil }