diff --git a/.drone.yml b/.drone.yml index 7acaba0ac..6c25d2d15 100644 --- a/.drone.yml +++ b/.drone.yml @@ -9,4 +9,4 @@ steps: GO111MODULE: on GOPROXY: https://goproxy.cn,direct commands: - - go build -tags "custom,outputs.influxdb_v2" ./cmd/telegraf \ No newline at end of file + - go build -tags "custom,inputs.cl_104,outputs.influxdb_v2,parsers.cl_104" ./cmd/telegraf \ No newline at end of file diff --git a/etc/telegraf/telegraf.conf b/etc/telegraf/telegraf.conf index f76280fdd..cf272844d 100644 --- a/etc/telegraf/telegraf.conf +++ b/etc/telegraf/telegraf.conf @@ -40,9 +40,9 @@ [[inputs.cl_104]] service_address = "tcp://:8899" - path_cl = "/api/104" - path_up = "/api/104up" + cl_url="ws://127.0.0.1:8899/api/104" + api_path="/api/104" pong_wait = "60s" ping_period = "54s" - write_waite = "10s" + write_wait = "10s" data_format = "cl_104" diff --git a/plugins/inputs/all/cl_104.go b/plugins/inputs/all/cl_104.go index f8d58b8f2..289c3b87e 100644 --- a/plugins/inputs/all/cl_104.go +++ b/plugins/inputs/all/cl_104.go @@ -1,4 +1,4 @@ -//go:build custom || inputs || inputs.cl_104 +//go:build !custom || inputs || inputs.cl_104 package all diff --git a/plugins/inputs/all/cl_kafka_consumer.go b/plugins/inputs/all/cl_kafka_consumer.go index acf905686..6988149f1 100644 --- a/plugins/inputs/all/cl_kafka_consumer.go +++ b/plugins/inputs/all/cl_kafka_consumer.go @@ -1,4 +1,4 @@ -//go:build custom || inputs || inputs.cl_kafka_consumer +//go:build !custom || inputs || inputs.cl_kafka_consumer package all diff --git a/plugins/inputs/all/cl_kafka_subscriber.go b/plugins/inputs/all/cl_kafka_subscriber.go index 9847bed19..d03bd0ff4 100644 --- a/plugins/inputs/all/cl_kafka_subscriber.go +++ b/plugins/inputs/all/cl_kafka_subscriber.go @@ -1,4 +1,4 @@ -//go:build custom || inputs || inputs.cl_kafka_subscriber +//go:build !custom || inputs || inputs.cl_kafka_subscriber package all diff --git a/plugins/inputs/cl_104/README.md b/plugins/inputs/cl_104/README.md index 0521167c8..97138170e 100644 --- a/plugins/inputs/cl_104/README.md +++ b/plugins/inputs/cl_104/README.md @@ -40,9 +40,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. service_address = "tcp://:8080" ## service_address = "tcp://:8443" - ## Paths to listen to. - # path_cl="/api/104" - # path_up="/api/104up" + ## URL to connect to server + cl_url="ws://127.0.0.1:8899/api/104" + ## path for access + api_path="/api/104" ## maximum duration before timing out read of the request # read_timeout = "10s" diff --git a/plugins/inputs/cl_104/cl_104.go b/plugins/inputs/cl_104/cl_104.go index 7a2b74122..4d24cad95 100644 --- a/plugins/inputs/cl_104/cl_104.go +++ b/plugins/inputs/cl_104/cl_104.go @@ -32,8 +32,8 @@ type CL104 struct { ServiceAddress string `toml:"service_address"` ReadTimeout config.Duration `toml:"read_timeout"` WriteTimeout config.Duration `toml:"write_timeout"` - PathCl string `toml:"path_cl"` - PathUp string `toml:"path_up"` + CLURL string `toml:"cl_url"` + ApiPath string `toml:"api_path"` PongWait config.Duration `toml:"pong_wait"` PingPeriod config.Duration `toml:"ping_period"` WriteWait config.Duration `toml:"write_wait"` @@ -52,7 +52,6 @@ type CL104 struct { listener net.Listener url *url.URL - route map[string]func(res http.ResponseWriter, req *http.Request) upChan chan []byte // confirm clChan chan []byte // command @@ -116,10 +115,6 @@ func (h *CL104) Init() error { h.url = u h.tlsConf = tlsConf - h.route = map[string]func(res http.ResponseWriter, req *http.Request){ - h.PathCl: h.serveClstream, - h.PathUp: h.serveUpstream, - } h.upChan = make(chan []byte, 16) h.clChan = make(chan []byte, 16) @@ -164,6 +159,9 @@ func (h *CL104) Start(acc telegraf.Accumulator) error { server := h.createHTTPServer() + ctx, cancel := context.WithCancel(context.Background()) + go h.connectingCL(ctx, cancel) + h.wg.Add(1) go func() { defer h.wg.Done() @@ -172,6 +170,7 @@ func (h *CL104) Start(acc telegraf.Accumulator) error { h.Log.Errorf("Serve failed: %v", err) } close(h.close) + cancel() } }() @@ -193,9 +192,9 @@ func (h *CL104) Stop() { // ServeHTTP implements [http.Handler] func (h *CL104) ServeHTTP(res http.ResponseWriter, req *http.Request) { + handler := h.serveUpstream - handler, ok := h.route[req.URL.Path] - if !ok { + if req.URL.Path != h.ApiPath { handler = http.NotFound } diff --git a/plugins/inputs/cl_104/clstream.go b/plugins/inputs/cl_104/clstream.go index b3858d716..dd147afd9 100644 --- a/plugins/inputs/cl_104/clstream.go +++ b/plugins/inputs/cl_104/clstream.go @@ -4,43 +4,39 @@ import ( "context" "encoding/json" "fmt" - "net/http" "sync" - "sync/atomic" "time" "github.com/gorilla/websocket" "github.com/influxdata/telegraf/internal" ) -var clConnNum int64 - -func (h *CL104) serveClstream(res http.ResponseWriter, req *http.Request) { - select { - case <-h.close: - res.WriteHeader(http.StatusGone) - return - default: - if atomic.SwapInt64(&clConnNum, 1) > 0 { - res.WriteHeader(http.StatusConflict) +func (h *CL104) connectingCL(ctx context.Context, cancel context.CancelFunc) { + for { + select { + case <-ctx.Done(): return + case <-h.close: + return + default: + h.newConnectCL(ctx, cancel) + time.Sleep(time.Second * 5) } } +} - conn, err := upgrader.Upgrade(res, req, nil) +func (h *CL104) newConnectCL(ctx context.Context, cancel context.CancelFunc) { + c, _, err := websocket.DefaultDialer.DialContext(ctx, h.CLURL, nil) if err != nil { - h.Log.Error(err) + h.Log.Error("client dial:", err) return } - defer conn.Close() - - stopCtx, stopCancel := context.WithCancel(context.Background()) - defer stopCancel() + defer c.Close() session := &wsSession{ - conn: conn, - ctx: stopCtx, - cancel: stopCancel, + conn: c, + ctx: ctx, + cancel: cancel, ctrlCh: make(chan wsMsg, 2), } @@ -74,7 +70,6 @@ func (h *CL104) startClWorkers(session *wsSession) { }() wg.Wait() - atomic.SwapInt64(&clConnNum, 0) } func (h *CL104) monitorClWrite(session *wsSession) { diff --git a/plugins/inputs/cl_104/sample.conf b/plugins/inputs/cl_104/sample.conf index b29b26ea4..adfb681b1 100644 --- a/plugins/inputs/cl_104/sample.conf +++ b/plugins/inputs/cl_104/sample.conf @@ -6,9 +6,10 @@ service_address = "tcp://:8080" ## service_address = "tcp://:8443" - ## Paths to listen to. - # path_cl="/api/104" - # path_up="/api/104up" + ## URL to connect to server + cl_url="ws://127.0.0.1:8899/api/104" + ## path for access + api_path="/api/104" ## maximum duration before timing out read of the request # read_timeout = "10s" diff --git a/plugins/parsers/all/cl_104.go b/plugins/parsers/all/cl_104.go index f945df37d..51835335b 100644 --- a/plugins/parsers/all/cl_104.go +++ b/plugins/parsers/all/cl_104.go @@ -1,4 +1,4 @@ -//go:build custom || parsers || parsers.cl_104 +//go:build !custom || parsers || parsers.cl_104 package all diff --git a/plugins/parsers/all/phasor_binary.go b/plugins/parsers/all/phasor_binary.go index 49b3fe979..5297df1d1 100644 --- a/plugins/parsers/all/phasor_binary.go +++ b/plugins/parsers/all/phasor_binary.go @@ -1,4 +1,4 @@ -//go:build custom || parsers || parsers.phasor_binary +//go:build !custom || parsers || parsers.phasor_binary package all