Compare commits

..

No commits in common. "1bcfc7d91688431cec2eabb0615a882130e9c6a1" and "3830fff0d58c9f33306b9203d7b4596330c87032" have entirely different histories.

11 changed files with 47 additions and 43 deletions

View File

@ -9,4 +9,4 @@ steps:
GO111MODULE: on GO111MODULE: on
GOPROXY: https://goproxy.cn,direct GOPROXY: https://goproxy.cn,direct
commands: commands:
- go build -tags "custom,inputs.cl_104,outputs.influxdb_v2,parsers.cl_104" ./cmd/telegraf - go build -tags "custom,outputs.influxdb_v2" ./cmd/telegraf

View File

@ -40,9 +40,9 @@
[[inputs.cl_104]] [[inputs.cl_104]]
service_address = "tcp://:8899" service_address = "tcp://:8899"
cl_url="ws://127.0.0.1:8899/api/104" path_cl = "/api/104"
api_path="/api/104" path_up = "/api/104up"
pong_wait = "60s" pong_wait = "60s"
ping_period = "54s" ping_period = "54s"
write_wait = "10s" write_waite = "10s"
data_format = "cl_104" data_format = "cl_104"

View File

@ -1,4 +1,4 @@
//go:build !custom || inputs || inputs.cl_104 //go:build custom || inputs || inputs.cl_104
package all package all

View File

@ -1,4 +1,4 @@
//go:build !custom || inputs || inputs.cl_kafka_consumer //go:build custom || inputs || inputs.cl_kafka_consumer
package all package all

View File

@ -1,4 +1,4 @@
//go:build !custom || inputs || inputs.cl_kafka_subscriber //go:build custom || inputs || inputs.cl_kafka_subscriber
package all package all

View File

@ -40,10 +40,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
service_address = "tcp://:8080" service_address = "tcp://:8080"
## service_address = "tcp://:8443" ## service_address = "tcp://:8443"
## URL to connect to server ## Paths to listen to.
cl_url="ws://127.0.0.1:8899/api/104" # path_cl="/api/104"
## path for access # path_up="/api/104up"
api_path="/api/104"
## maximum duration before timing out read of the request ## maximum duration before timing out read of the request
# read_timeout = "10s" # read_timeout = "10s"

View File

@ -32,8 +32,8 @@ type CL104 struct {
ServiceAddress string `toml:"service_address"` ServiceAddress string `toml:"service_address"`
ReadTimeout config.Duration `toml:"read_timeout"` ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"` WriteTimeout config.Duration `toml:"write_timeout"`
CLURL string `toml:"cl_url"` PathCl string `toml:"path_cl"`
ApiPath string `toml:"api_path"` PathUp string `toml:"path_up"`
PongWait config.Duration `toml:"pong_wait"` PongWait config.Duration `toml:"pong_wait"`
PingPeriod config.Duration `toml:"ping_period"` PingPeriod config.Duration `toml:"ping_period"`
WriteWait config.Duration `toml:"write_wait"` WriteWait config.Duration `toml:"write_wait"`
@ -52,6 +52,7 @@ type CL104 struct {
listener net.Listener listener net.Listener
url *url.URL url *url.URL
route map[string]func(res http.ResponseWriter, req *http.Request)
upChan chan []byte // confirm upChan chan []byte // confirm
clChan chan []byte // command clChan chan []byte // command
@ -115,6 +116,10 @@ func (h *CL104) Init() error {
h.url = u h.url = u
h.tlsConf = tlsConf h.tlsConf = tlsConf
h.route = map[string]func(res http.ResponseWriter, req *http.Request){
h.PathCl: h.serveClstream,
h.PathUp: h.serveUpstream,
}
h.upChan = make(chan []byte, 16) h.upChan = make(chan []byte, 16)
h.clChan = make(chan []byte, 16) h.clChan = make(chan []byte, 16)
@ -159,9 +164,6 @@ func (h *CL104) Start(acc telegraf.Accumulator) error {
server := h.createHTTPServer() server := h.createHTTPServer()
ctx, cancel := context.WithCancel(context.Background())
go h.connectingCL(ctx, cancel)
h.wg.Add(1) h.wg.Add(1)
go func() { go func() {
defer h.wg.Done() defer h.wg.Done()
@ -170,7 +172,6 @@ func (h *CL104) Start(acc telegraf.Accumulator) error {
h.Log.Errorf("Serve failed: %v", err) h.Log.Errorf("Serve failed: %v", err)
} }
close(h.close) close(h.close)
cancel()
} }
}() }()
@ -192,9 +193,9 @@ func (h *CL104) Stop() {
// ServeHTTP implements [http.Handler] // ServeHTTP implements [http.Handler]
func (h *CL104) ServeHTTP(res http.ResponseWriter, req *http.Request) { func (h *CL104) ServeHTTP(res http.ResponseWriter, req *http.Request) {
handler := h.serveUpstream
if req.URL.Path != h.ApiPath { handler, ok := h.route[req.URL.Path]
if !ok {
handler = http.NotFound handler = http.NotFound
} }

View File

@ -4,39 +4,43 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
) )
func (h *CL104) connectingCL(ctx context.Context, cancel context.CancelFunc) { var clConnNum int64
for {
func (h *CL104) serveClstream(res http.ResponseWriter, req *http.Request) {
select { select {
case <-ctx.Done():
return
case <-h.close: case <-h.close:
res.WriteHeader(http.StatusGone)
return return
default: default:
h.newConnectCL(ctx, cancel) if atomic.SwapInt64(&clConnNum, 1) > 0 {
time.Sleep(time.Second * 5) res.WriteHeader(http.StatusConflict)
}
}
}
func (h *CL104) newConnectCL(ctx context.Context, cancel context.CancelFunc) {
c, _, err := websocket.DefaultDialer.DialContext(ctx, h.CLURL, nil)
if err != nil {
h.Log.Error("client dial:", err)
return return
} }
defer c.Close() }
conn, err := upgrader.Upgrade(res, req, nil)
if err != nil {
h.Log.Error(err)
return
}
defer conn.Close()
stopCtx, stopCancel := context.WithCancel(context.Background())
defer stopCancel()
session := &wsSession{ session := &wsSession{
conn: c, conn: conn,
ctx: ctx, ctx: stopCtx,
cancel: cancel, cancel: stopCancel,
ctrlCh: make(chan wsMsg, 2), ctrlCh: make(chan wsMsg, 2),
} }
@ -70,6 +74,7 @@ func (h *CL104) startClWorkers(session *wsSession) {
}() }()
wg.Wait() wg.Wait()
atomic.SwapInt64(&clConnNum, 0)
} }
func (h *CL104) monitorClWrite(session *wsSession) { func (h *CL104) monitorClWrite(session *wsSession) {
@ -133,7 +138,7 @@ func (h *CL104) monitorClRead(session *wsSession) {
h.Log.Error(err) h.Log.Error(err)
} }
default: default:
h.Log.Info("rm not text") h.Log.Info("not text:", string(rm))
} }
} }
} }

View File

@ -6,10 +6,9 @@
service_address = "tcp://:8080" service_address = "tcp://:8080"
## service_address = "tcp://:8443" ## service_address = "tcp://:8443"
## URL to connect to server ## Paths to listen to.
cl_url="ws://127.0.0.1:8899/api/104" # path_cl="/api/104"
## path for access # path_up="/api/104up"
api_path="/api/104"
## maximum duration before timing out read of the request ## maximum duration before timing out read of the request
# read_timeout = "10s" # read_timeout = "10s"

View File

@ -1,4 +1,4 @@
//go:build !custom || parsers || parsers.cl_104 //go:build custom || parsers || parsers.cl_104
package all package all

View File

@ -1,4 +1,4 @@
//go:build !custom || parsers || parsers.phasor_binary //go:build custom || parsers || parsers.phasor_binary
package all package all