diff --git a/etc/telegraf/telegraf.conf b/etc/telegraf/telegraf.conf index cf272844d..ec6481f64 100644 --- a/etc/telegraf/telegraf.conf +++ b/etc/telegraf/telegraf.conf @@ -39,7 +39,7 @@ data_format = "phasor_binary" [[inputs.cl_104]] - service_address = "tcp://:8899" + service_address = "tcp://:8877" cl_url="ws://127.0.0.1:8899/api/104" api_path="/api/104" pong_wait = "60s" diff --git a/plugins/inputs/cl_104/cl_104.go b/plugins/inputs/cl_104/cl_104.go index 4d24cad95..6a9bc4016 100644 --- a/plugins/inputs/cl_104/cl_104.go +++ b/plugins/inputs/cl_104/cl_104.go @@ -52,9 +52,6 @@ type CL104 struct { listener net.Listener url *url.URL - upChan chan []byte // confirm - clChan chan []byte // command - telegraf.Parser acc telegraf.Accumulator } @@ -115,9 +112,6 @@ func (h *CL104) Init() error { h.url = u h.tlsConf = tlsConf - h.upChan = make(chan []byte, 16) - h.clChan = make(chan []byte, 16) - return nil } @@ -160,7 +154,7 @@ func (h *CL104) Start(acc telegraf.Accumulator) error { server := h.createHTTPServer() ctx, cancel := context.WithCancel(context.Background()) - go h.connectingCL(ctx, cancel) + go h.connectingCLAndParse(ctx, cancel) h.wg.Add(1) go func() { @@ -245,7 +239,7 @@ func (h *CL104) writeControl(session *wsSession, mt int, payload []byte) error { } } -func (h *CL104) setUpSessionConn(session *wsSession) error { +func (h *CL104) setupSessionConn(session *wsSession) error { session.conn.SetPongHandler(func(appData string) error { session.conn.SetReadDeadline(time.Now().Add(time.Duration(h.PongWait))) return nil diff --git a/plugins/inputs/cl_104/clstream.go b/plugins/inputs/cl_104/clstream.go index 8ab1f3858..8c53f6693 100644 --- a/plugins/inputs/cl_104/clstream.go +++ b/plugins/inputs/cl_104/clstream.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf/internal" ) -func (h *CL104) connectingCL(ctx context.Context, cancel context.CancelFunc) { +func (h *CL104) connectingCLAndParse(ctx context.Context, cancel context.CancelFunc) { for { select { case <-ctx.Done(): @@ -19,13 +19,13 @@ func (h *CL104) connectingCL(ctx context.Context, cancel context.CancelFunc) { case <-h.close: return default: - h.newConnectCL(ctx, cancel) + h.newConnectCLAndParse(ctx, cancel) time.Sleep(time.Second * 5) } } } -func (h *CL104) newConnectCL(ctx context.Context, cancel context.CancelFunc) { +func (h *CL104) newConnectCLAndParse(ctx context.Context, cancel context.CancelFunc) { c, _, err := websocket.DefaultDialer.DialContext(ctx, h.CLURL, nil) if err != nil { h.Log.Error("client dial:", err) @@ -40,27 +40,33 @@ func (h *CL104) newConnectCL(ctx context.Context, cancel context.CancelFunc) { ctrlCh: make(chan wsMsg, 2), } - if err := h.setUpSessionConn(session); err != nil { + if err := h.setupSessionConn(session); err != nil { h.Log.Error(err) return } - h.startClWorkers(session) + h.startClWorkersWithChAndParse(session) } func (h *CL104) startClWorkers(session *wsSession) { + upChan := make(chan []byte, 16) + clChan := make(chan []byte, 16) + h.startClWorkersWithCh(session, upChan, clChan) +} + +func (h *CL104) startClWorkersWithCh(session *wsSession, upChan, clChan chan []byte) { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - h.monitorClWrite(session) + h.monitorClWrite(session, clChan) }() wg.Add(1) go func() { defer wg.Done() - h.monitorClRead(session) + h.monitorClRead(session, upChan) }() wg.Add(1) @@ -72,8 +78,31 @@ func (h *CL104) startClWorkers(session *wsSession) { wg.Wait() } -func (h *CL104) monitorClWrite(session *wsSession) { +func (h *CL104) startClWorkersWithChAndParse(session *wsSession) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + h.monitorClWriteAndParse(session) + }() + + wg.Add(1) + go func() { + defer wg.Done() + h.monitorClReadAndParse(session) + }() + + wg.Add(1) + go func() { + defer wg.Done() + h.sendPing(session) + }() + + wg.Wait() +} + +func (h *CL104) monitorClWrite(session *wsSession, clChan chan []byte) { var err error for { select { @@ -90,7 +119,7 @@ func (h *CL104) monitorClWrite(session *wsSession) { session.cancel() return } - case msg := <-h.clChan: + case msg := <-clChan: err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait))) if err != nil { session.cancel() @@ -105,7 +134,28 @@ func (h *CL104) monitorClWrite(session *wsSession) { } } -func (h *CL104) monitorClRead(session *wsSession) { +func (h *CL104) monitorClWriteAndParse(session *wsSession) { + var err error + for { + select { + case <-session.ctx.Done(): + return + case ctrl := <-session.ctrlCh: + err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait))) + if err != nil { + session.cancel() + return + } + err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(time.Duration(h.WriteWait))) + if err != nil { + session.cancel() + return + } + } + } +} + +func (h *CL104) monitorClRead(session *wsSession, upChan chan []byte) { for { select { case <-session.ctx.Done(): @@ -129,7 +179,7 @@ func (h *CL104) monitorClRead(session *wsSession) { switch mt { case websocket.TextMessage: - if err := h.fromClstream(session, rm); err != nil { + if err := h.fromClstream(session, rm, upChan); err != nil { h.Log.Error(err) } default: @@ -139,7 +189,76 @@ func (h *CL104) monitorClRead(session *wsSession) { } } -func (h *CL104) fromClstream(session *wsSession, m []byte) error { +func (h *CL104) monitorClReadAndParse(session *wsSession) { + for { + select { + case <-session.ctx.Done(): + return + default: + mt, rm, err := session.conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + session.cancel() + return + } + if ce, ok := err.(*websocket.CloseError); ok { + h.Log.Infof("client closed with code:", ce.Code, "text:", ce.Text) + session.cancel() + return + } + h.Log.Error("server read:", err) + session.cancel() + return + } + + switch mt { + case websocket.TextMessage: + if err := h.fromClstreamAndParse(session, rm); err != nil { + h.Log.Error(err) + } + default: + h.Log.Info("rm not text") + } + } + } +} + +func (h *CL104) fromClstream(session *wsSession, m []byte, upChan chan []byte) error { + msg := new(msg) + if err := json.Unmarshal(m, msg); err != nil { + return err + } + + switch { + case msg.TI >= 1 && msg.TI <= 40, msg.TI >= 110 && msg.TI <= 119: + + case msg.TI >= 45 && msg.TI <= 69, msg.TI >= 100 && msg.TI <= 109: + select { + case <-session.ctx.Done(): + return session.ctx.Err() + default: + msg.PN = msg.COT & 0x40 + msg.COT = msg.COT & 0x3F + if m, err := json.Marshal(msg); err != nil { + return err + } else { + select { + case upChan <- m: + case <-time.After(time.Second * 5): + h.Log.Error("drop up msg:", msg) + } + } + + } + + default: + return fmt.Errorf("invalid TI: %d", msg.TI) + } + + return nil +} + +func (h *CL104) fromClstreamAndParse(session *wsSession, m []byte) error { msg := new(msg) if err := json.Unmarshal(m, msg); err != nil { return err @@ -164,23 +283,6 @@ func (h *CL104) fromClstream(session *wsSession, m []byte) error { } case msg.TI >= 45 && msg.TI <= 69, msg.TI >= 100 && msg.TI <= 109: - select { - case <-session.ctx.Done(): - return session.ctx.Err() - default: - msg.PN = msg.COT & 0x40 - msg.COT = msg.COT & 0x3F - if m, err := json.Marshal(msg); err != nil { - return err - } else { - select { - case h.upChan <- m: - case <-time.After(time.Second * 5): - h.Log.Error("drop up msg:", msg) - } - } - - } default: return fmt.Errorf("invalid TI: %d", msg.TI) diff --git a/plugins/inputs/cl_104/sample.conf b/plugins/inputs/cl_104/sample.conf index adfb681b1..343a44809 100644 --- a/plugins/inputs/cl_104/sample.conf +++ b/plugins/inputs/cl_104/sample.conf @@ -3,7 +3,7 @@ ## Address to host HTTP listener on ## can be prefixed by protocol tcp, or unix if not provided defaults to tcp ## if unix network type provided it should be followed by absolute path for unix socket - service_address = "tcp://:8080" + service_address = "tcp://:8877" ## service_address = "tcp://:8443" ## URL to connect to server diff --git a/plugins/inputs/cl_104/upstream.go b/plugins/inputs/cl_104/upstream.go index 299c7a7b4..b5a78a8c7 100644 --- a/plugins/inputs/cl_104/upstream.go +++ b/plugins/inputs/cl_104/upstream.go @@ -11,6 +11,8 @@ import ( "github.com/gorilla/websocket" ) +const upConnLimit int64 = 10 + var upConnNum int64 func (h *CL104) serveUpstream(res http.ResponseWriter, req *http.Request) { @@ -19,50 +21,90 @@ func (h *CL104) serveUpstream(res http.ResponseWriter, req *http.Request) { res.WriteHeader(http.StatusGone) return default: - if atomic.SwapInt64(&upConnNum, 1) > 0 { - res.WriteHeader(http.StatusConflict) + if atomic.AddInt64(&upConnNum, 1) > upConnLimit { + atomic.AddInt64(&upConnNum, -1) + res.WriteHeader(http.StatusTooManyRequests) return } } + defer atomic.AddInt64(&upConnNum, -1) - conn, err := upgrader.Upgrade(res, req, nil) + upConn, err := upgrader.Upgrade(res, req, nil) if err != nil { h.Log.Error(err) return } - defer conn.Close() + defer upConn.Close() + + clConn, _, err := websocket.DefaultDialer.Dial(h.CLURL, nil) + if err != nil { + h.Log.Error("dial clURL:", err) + return + } + defer clConn.Close() stopCtx, stopCancel := context.WithCancel(context.Background()) defer stopCancel() - session := &wsSession{ - conn: conn, + upSession := &wsSession{ + conn: upConn, ctx: stopCtx, cancel: stopCancel, ctrlCh: make(chan wsMsg, 2), } - if err := h.setUpSessionConn(session); err != nil { + clSession := &wsSession{ + conn: clConn, + ctx: stopCtx, + cancel: stopCancel, + ctrlCh: make(chan wsMsg, 2), + } + + if err := h.setupSessionConn(upSession); err != nil { + h.Log.Error(err) + return + } + if err := h.setupSessionConn(clSession); err != nil { h.Log.Error(err) return } - h.startUpWorkers(session) + upChan := make(chan []byte, 16) + clChan := make(chan []byte, 16) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + h.startUpWorkersWithCh(upSession, upChan, clChan) + }() + go func() { + defer wg.Done() + h.startClWorkersWithCh(clSession, upChan, clChan) + }() + + wg.Wait() } func (h *CL104) startUpWorkers(session *wsSession) { + upChan := make(chan []byte, 16) + clChan := make(chan []byte, 16) + h.startUpWorkersWithCh(session, upChan, clChan) +} + +func (h *CL104) startUpWorkersWithCh(session *wsSession, upChan, clChan chan []byte) { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - h.monitorUpWrite(session) + h.monitorUpWrite(session, upChan) }() wg.Add(1) go func() { defer wg.Done() - h.monitorUpRead(session) + h.monitorUpRead(session, clChan) }() wg.Add(1) @@ -72,10 +114,9 @@ func (h *CL104) startUpWorkers(session *wsSession) { }() wg.Wait() - atomic.SwapInt64(&upConnNum, 0) } -func (h *CL104) monitorUpWrite(session *wsSession) { +func (h *CL104) monitorUpWrite(session *wsSession, upChan chan []byte) { var err error for { select { @@ -92,7 +133,7 @@ func (h *CL104) monitorUpWrite(session *wsSession) { session.cancel() return } - case msg := <-h.upChan: + case msg := <-upChan: err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait))) if err != nil { session.cancel() @@ -107,7 +148,7 @@ func (h *CL104) monitorUpWrite(session *wsSession) { } } -func (h *CL104) monitorUpRead(session *wsSession) { +func (h *CL104) monitorUpRead(session *wsSession, clChan chan []byte) { for { select { case <-session.ctx.Done(): @@ -131,7 +172,7 @@ func (h *CL104) monitorUpRead(session *wsSession) { switch mt { case websocket.TextMessage: - if err := h.fromUpstream(rm); err != nil { + if err := h.fromUpstream(rm, clChan); err != nil { h.Log.Error(err) } default: @@ -141,7 +182,7 @@ func (h *CL104) monitorUpRead(session *wsSession) { } } -func (h *CL104) fromUpstream(m []byte) error { +func (h *CL104) fromUpstream(m []byte, clChan chan []byte) error { msg := new(msg) if err := json.Unmarshal(m, msg); err != nil { return err @@ -149,7 +190,7 @@ func (h *CL104) fromUpstream(m []byte) error { if msg.TI >= 45 && msg.TI <= 69 || msg.TI >= 100 && msg.TI <= 109 { select { - case h.clChan <- m: + case clChan <- m: case <-time.After(time.Second * 5): h.Log.Error("drop cl msg:", msg) } diff --git a/plugins/inputs/cl_kafka_subscriber/sample.conf b/plugins/inputs/cl_kafka_subscriber/sample.conf index 31285206d..6477a9d99 100644 --- a/plugins/inputs/cl_kafka_subscriber/sample.conf +++ b/plugins/inputs/cl_kafka_subscriber/sample.conf @@ -3,7 +3,7 @@ ## Address to host HTTP listener on ## can be prefixed by protocol tcp, or unix if not provided defaults to tcp ## if unix network type provided it should be followed by absolute path for unix socket - address = "tcp://:8877" + address = "tcp://:8866" ## address = "tcp://:8443" ## Paths to listen to.