update for 104

This commit is contained in:
zhuxu 2026-06-12 17:05:21 +08:00
parent 1bcfc7d916
commit b207368c51
6 changed files with 194 additions and 57 deletions

View File

@ -39,7 +39,7 @@
data_format = "phasor_binary"
[[inputs.cl_104]]
service_address = "tcp://:8899"
service_address = "tcp://:8877"
cl_url="ws://127.0.0.1:8899/api/104"
api_path="/api/104"
pong_wait = "60s"

View File

@ -52,9 +52,6 @@ type CL104 struct {
listener net.Listener
url *url.URL
upChan chan []byte // confirm
clChan chan []byte // command
telegraf.Parser
acc telegraf.Accumulator
}
@ -115,9 +112,6 @@ func (h *CL104) Init() error {
h.url = u
h.tlsConf = tlsConf
h.upChan = make(chan []byte, 16)
h.clChan = make(chan []byte, 16)
return nil
}
@ -160,7 +154,7 @@ func (h *CL104) Start(acc telegraf.Accumulator) error {
server := h.createHTTPServer()
ctx, cancel := context.WithCancel(context.Background())
go h.connectingCL(ctx, cancel)
go h.connectingCLAndParse(ctx, cancel)
h.wg.Add(1)
go func() {
@ -245,7 +239,7 @@ func (h *CL104) writeControl(session *wsSession, mt int, payload []byte) error {
}
}
func (h *CL104) setUpSessionConn(session *wsSession) error {
func (h *CL104) setupSessionConn(session *wsSession) error {
session.conn.SetPongHandler(func(appData string) error {
session.conn.SetReadDeadline(time.Now().Add(time.Duration(h.PongWait)))
return nil

View File

@ -11,7 +11,7 @@ import (
"github.com/influxdata/telegraf/internal"
)
func (h *CL104) connectingCL(ctx context.Context, cancel context.CancelFunc) {
func (h *CL104) connectingCLAndParse(ctx context.Context, cancel context.CancelFunc) {
for {
select {
case <-ctx.Done():
@ -19,13 +19,13 @@ func (h *CL104) connectingCL(ctx context.Context, cancel context.CancelFunc) {
case <-h.close:
return
default:
h.newConnectCL(ctx, cancel)
h.newConnectCLAndParse(ctx, cancel)
time.Sleep(time.Second * 5)
}
}
}
func (h *CL104) newConnectCL(ctx context.Context, cancel context.CancelFunc) {
func (h *CL104) newConnectCLAndParse(ctx context.Context, cancel context.CancelFunc) {
c, _, err := websocket.DefaultDialer.DialContext(ctx, h.CLURL, nil)
if err != nil {
h.Log.Error("client dial:", err)
@ -40,27 +40,33 @@ func (h *CL104) newConnectCL(ctx context.Context, cancel context.CancelFunc) {
ctrlCh: make(chan wsMsg, 2),
}
if err := h.setUpSessionConn(session); err != nil {
if err := h.setupSessionConn(session); err != nil {
h.Log.Error(err)
return
}
h.startClWorkers(session)
h.startClWorkersWithChAndParse(session)
}
func (h *CL104) startClWorkers(session *wsSession) {
upChan := make(chan []byte, 16)
clChan := make(chan []byte, 16)
h.startClWorkersWithCh(session, upChan, clChan)
}
func (h *CL104) startClWorkersWithCh(session *wsSession, upChan, clChan chan []byte) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
h.monitorClWrite(session)
h.monitorClWrite(session, clChan)
}()
wg.Add(1)
go func() {
defer wg.Done()
h.monitorClRead(session)
h.monitorClRead(session, upChan)
}()
wg.Add(1)
@ -72,8 +78,31 @@ func (h *CL104) startClWorkers(session *wsSession) {
wg.Wait()
}
func (h *CL104) monitorClWrite(session *wsSession) {
func (h *CL104) startClWorkersWithChAndParse(session *wsSession) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
h.monitorClWriteAndParse(session)
}()
wg.Add(1)
go func() {
defer wg.Done()
h.monitorClReadAndParse(session)
}()
wg.Add(1)
go func() {
defer wg.Done()
h.sendPing(session)
}()
wg.Wait()
}
func (h *CL104) monitorClWrite(session *wsSession, clChan chan []byte) {
var err error
for {
select {
@ -90,7 +119,7 @@ func (h *CL104) monitorClWrite(session *wsSession) {
session.cancel()
return
}
case msg := <-h.clChan:
case msg := <-clChan:
err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait)))
if err != nil {
session.cancel()
@ -105,7 +134,28 @@ func (h *CL104) monitorClWrite(session *wsSession) {
}
}
func (h *CL104) monitorClRead(session *wsSession) {
func (h *CL104) monitorClWriteAndParse(session *wsSession) {
var err error
for {
select {
case <-session.ctx.Done():
return
case ctrl := <-session.ctrlCh:
err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait)))
if err != nil {
session.cancel()
return
}
err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(time.Duration(h.WriteWait)))
if err != nil {
session.cancel()
return
}
}
}
}
func (h *CL104) monitorClRead(session *wsSession, upChan chan []byte) {
for {
select {
case <-session.ctx.Done():
@ -129,7 +179,7 @@ func (h *CL104) monitorClRead(session *wsSession) {
switch mt {
case websocket.TextMessage:
if err := h.fromClstream(session, rm); err != nil {
if err := h.fromClstream(session, rm, upChan); err != nil {
h.Log.Error(err)
}
default:
@ -139,7 +189,76 @@ func (h *CL104) monitorClRead(session *wsSession) {
}
}
func (h *CL104) fromClstream(session *wsSession, m []byte) error {
func (h *CL104) monitorClReadAndParse(session *wsSession) {
for {
select {
case <-session.ctx.Done():
return
default:
mt, rm, err := session.conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
session.cancel()
return
}
if ce, ok := err.(*websocket.CloseError); ok {
h.Log.Infof("client closed with code:", ce.Code, "text:", ce.Text)
session.cancel()
return
}
h.Log.Error("server read:", err)
session.cancel()
return
}
switch mt {
case websocket.TextMessage:
if err := h.fromClstreamAndParse(session, rm); err != nil {
h.Log.Error(err)
}
default:
h.Log.Info("rm not text")
}
}
}
}
func (h *CL104) fromClstream(session *wsSession, m []byte, upChan chan []byte) error {
msg := new(msg)
if err := json.Unmarshal(m, msg); err != nil {
return err
}
switch {
case msg.TI >= 1 && msg.TI <= 40, msg.TI >= 110 && msg.TI <= 119:
case msg.TI >= 45 && msg.TI <= 69, msg.TI >= 100 && msg.TI <= 109:
select {
case <-session.ctx.Done():
return session.ctx.Err()
default:
msg.PN = msg.COT & 0x40
msg.COT = msg.COT & 0x3F
if m, err := json.Marshal(msg); err != nil {
return err
} else {
select {
case upChan <- m:
case <-time.After(time.Second * 5):
h.Log.Error("drop up msg:", msg)
}
}
}
default:
return fmt.Errorf("invalid TI: %d", msg.TI)
}
return nil
}
func (h *CL104) fromClstreamAndParse(session *wsSession, m []byte) error {
msg := new(msg)
if err := json.Unmarshal(m, msg); err != nil {
return err
@ -164,23 +283,6 @@ func (h *CL104) fromClstream(session *wsSession, m []byte) error {
}
case msg.TI >= 45 && msg.TI <= 69, msg.TI >= 100 && msg.TI <= 109:
select {
case <-session.ctx.Done():
return session.ctx.Err()
default:
msg.PN = msg.COT & 0x40
msg.COT = msg.COT & 0x3F
if m, err := json.Marshal(msg); err != nil {
return err
} else {
select {
case h.upChan <- m:
case <-time.After(time.Second * 5):
h.Log.Error("drop up msg:", msg)
}
}
}
default:
return fmt.Errorf("invalid TI: %d", msg.TI)

View File

@ -3,7 +3,7 @@
## Address to host HTTP listener on
## can be prefixed by protocol tcp, or unix if not provided defaults to tcp
## if unix network type provided it should be followed by absolute path for unix socket
service_address = "tcp://:8080"
service_address = "tcp://:8877"
## service_address = "tcp://:8443"
## URL to connect to server

View File

@ -11,6 +11,8 @@ import (
"github.com/gorilla/websocket"
)
const upConnLimit int64 = 10
var upConnNum int64
func (h *CL104) serveUpstream(res http.ResponseWriter, req *http.Request) {
@ -19,50 +21,90 @@ func (h *CL104) serveUpstream(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(http.StatusGone)
return
default:
if atomic.SwapInt64(&upConnNum, 1) > 0 {
res.WriteHeader(http.StatusConflict)
if atomic.AddInt64(&upConnNum, 1) > upConnLimit {
atomic.AddInt64(&upConnNum, -1)
res.WriteHeader(http.StatusTooManyRequests)
return
}
}
defer atomic.AddInt64(&upConnNum, -1)
conn, err := upgrader.Upgrade(res, req, nil)
upConn, err := upgrader.Upgrade(res, req, nil)
if err != nil {
h.Log.Error(err)
return
}
defer conn.Close()
defer upConn.Close()
clConn, _, err := websocket.DefaultDialer.Dial(h.CLURL, nil)
if err != nil {
h.Log.Error("dial clURL:", err)
return
}
defer clConn.Close()
stopCtx, stopCancel := context.WithCancel(context.Background())
defer stopCancel()
session := &wsSession{
conn: conn,
upSession := &wsSession{
conn: upConn,
ctx: stopCtx,
cancel: stopCancel,
ctrlCh: make(chan wsMsg, 2),
}
if err := h.setUpSessionConn(session); err != nil {
clSession := &wsSession{
conn: clConn,
ctx: stopCtx,
cancel: stopCancel,
ctrlCh: make(chan wsMsg, 2),
}
if err := h.setupSessionConn(upSession); err != nil {
h.Log.Error(err)
return
}
if err := h.setupSessionConn(clSession); err != nil {
h.Log.Error(err)
return
}
h.startUpWorkers(session)
upChan := make(chan []byte, 16)
clChan := make(chan []byte, 16)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
h.startUpWorkersWithCh(upSession, upChan, clChan)
}()
go func() {
defer wg.Done()
h.startClWorkersWithCh(clSession, upChan, clChan)
}()
wg.Wait()
}
func (h *CL104) startUpWorkers(session *wsSession) {
upChan := make(chan []byte, 16)
clChan := make(chan []byte, 16)
h.startUpWorkersWithCh(session, upChan, clChan)
}
func (h *CL104) startUpWorkersWithCh(session *wsSession, upChan, clChan chan []byte) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
h.monitorUpWrite(session)
h.monitorUpWrite(session, upChan)
}()
wg.Add(1)
go func() {
defer wg.Done()
h.monitorUpRead(session)
h.monitorUpRead(session, clChan)
}()
wg.Add(1)
@ -72,10 +114,9 @@ func (h *CL104) startUpWorkers(session *wsSession) {
}()
wg.Wait()
atomic.SwapInt64(&upConnNum, 0)
}
func (h *CL104) monitorUpWrite(session *wsSession) {
func (h *CL104) monitorUpWrite(session *wsSession, upChan chan []byte) {
var err error
for {
select {
@ -92,7 +133,7 @@ func (h *CL104) monitorUpWrite(session *wsSession) {
session.cancel()
return
}
case msg := <-h.upChan:
case msg := <-upChan:
err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait)))
if err != nil {
session.cancel()
@ -107,7 +148,7 @@ func (h *CL104) monitorUpWrite(session *wsSession) {
}
}
func (h *CL104) monitorUpRead(session *wsSession) {
func (h *CL104) monitorUpRead(session *wsSession, clChan chan []byte) {
for {
select {
case <-session.ctx.Done():
@ -131,7 +172,7 @@ func (h *CL104) monitorUpRead(session *wsSession) {
switch mt {
case websocket.TextMessage:
if err := h.fromUpstream(rm); err != nil {
if err := h.fromUpstream(rm, clChan); err != nil {
h.Log.Error(err)
}
default:
@ -141,7 +182,7 @@ func (h *CL104) monitorUpRead(session *wsSession) {
}
}
func (h *CL104) fromUpstream(m []byte) error {
func (h *CL104) fromUpstream(m []byte, clChan chan []byte) error {
msg := new(msg)
if err := json.Unmarshal(m, msg); err != nil {
return err
@ -149,7 +190,7 @@ func (h *CL104) fromUpstream(m []byte) error {
if msg.TI >= 45 && msg.TI <= 69 || msg.TI >= 100 && msg.TI <= 109 {
select {
case h.clChan <- m:
case clChan <- m:
case <-time.After(time.Second * 5):
h.Log.Error("drop cl msg:", msg)
}

View File

@ -3,7 +3,7 @@
## Address to host HTTP listener on
## can be prefixed by protocol tcp, or unix if not provided defaults to tcp
## if unix network type provided it should be followed by absolute path for unix socket
address = "tcp://:8877"
address = "tcp://:8866"
## address = "tcp://:8443"
## Paths to listen to.