From f39c912e9ae904d5038fb132706e53d7a1aa3571 Mon Sep 17 00:00:00 2001 From: zhuxu Date: Thu, 5 Feb 2026 20:48:57 +0800 Subject: [PATCH] cl 104 parse --- .drone.yml | 2 +- docs/DATA_FORMATS_INPUT.md | 1 + etc/telegraf/telegraf.conf | 9 + plugins/inputs/all/cl_104.go | 5 + plugins/inputs/all/cl_kafka_consumer.go | 2 +- plugins/inputs/cl_104/README.md | 88 +++++++ plugins/inputs/cl_104/cl_104.go | 309 ++++++++++++++++++++++++ plugins/inputs/cl_104/cl_104_test.go | 33 +++ plugins/inputs/cl_104/clstream.go | 195 +++++++++++++++ plugins/inputs/cl_104/sample.conf | 44 ++++ plugins/inputs/cl_104/upstream.go | 159 ++++++++++++ plugins/parsers/all/cl_104.go | 5 + plugins/parsers/all/phasor_binary.go | 2 +- plugins/parsers/cl_104/README.md | 54 +++++ plugins/parsers/cl_104/parser.go | 109 +++++++++ plugins/parsers/cl_104/parser_test.go | 1 + 16 files changed, 1015 insertions(+), 3 deletions(-) create mode 100644 plugins/inputs/all/cl_104.go create mode 100644 plugins/inputs/cl_104/README.md create mode 100644 plugins/inputs/cl_104/cl_104.go create mode 100644 plugins/inputs/cl_104/cl_104_test.go create mode 100644 plugins/inputs/cl_104/clstream.go create mode 100644 plugins/inputs/cl_104/sample.conf create mode 100644 plugins/inputs/cl_104/upstream.go create mode 100644 plugins/parsers/all/cl_104.go create mode 100644 plugins/parsers/cl_104/README.md create mode 100644 plugins/parsers/cl_104/parser.go create mode 100644 plugins/parsers/cl_104/parser_test.go diff --git a/.drone.yml b/.drone.yml index 0d16cb9ec..7acaba0ac 100644 --- a/.drone.yml +++ b/.drone.yml @@ -9,4 +9,4 @@ steps: GO111MODULE: on GOPROXY: https://goproxy.cn,direct commands: - - go build -tags "custom,inputs.cl_kafka_consumer,outputs.influxdb_v2,parsers.phasor_binary" ./cmd/telegraf \ No newline at end of file + - go build -tags "custom,outputs.influxdb_v2" ./cmd/telegraf \ No newline at end of file diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 8d23e7d31..c25a504e9 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -27,6 +27,7 @@ Protocol, JSON format, or Apache Avro format. - [Wavefront](/plugins/parsers/wavefront) - [XPath](/plugins/parsers/xpath) (supports XML, JSON, MessagePack, Protocol Buffers) - [PhasorBinary](/plugins/parsers/phasor_binary) (supports special binary from CL) +- [CL104](/plugins/parsers/cl_104) (supports special 104 from CL) Any input plugin containing the `data_format` option can use it to select the desired parser: diff --git a/etc/telegraf/telegraf.conf b/etc/telegraf/telegraf.conf index 65e862261..f76280fdd 100644 --- a/etc/telegraf/telegraf.conf +++ b/etc/telegraf/telegraf.conf @@ -37,3 +37,12 @@ compression_codec = 4 max_message_len = 1000000 data_format = "phasor_binary" + +[[inputs.cl_104]] + service_address = "tcp://:8899" + path_cl = "/api/104" + path_up = "/api/104up" + pong_wait = "60s" + ping_period = "54s" + write_waite = "10s" + data_format = "cl_104" diff --git a/plugins/inputs/all/cl_104.go b/plugins/inputs/all/cl_104.go new file mode 100644 index 000000000..f8d58b8f2 --- /dev/null +++ b/plugins/inputs/all/cl_104.go @@ -0,0 +1,5 @@ +//go:build custom || inputs || inputs.cl_104 + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/cl_104" // register plugin diff --git a/plugins/inputs/all/cl_kafka_consumer.go b/plugins/inputs/all/cl_kafka_consumer.go index 6988149f1..acf905686 100644 --- a/plugins/inputs/all/cl_kafka_consumer.go +++ b/plugins/inputs/all/cl_kafka_consumer.go @@ -1,4 +1,4 @@ -//go:build !custom || inputs || inputs.cl_kafka_consumer +//go:build custom || inputs || inputs.cl_kafka_consumer package all diff --git a/plugins/inputs/cl_104/README.md b/plugins/inputs/cl_104/README.md new file mode 100644 index 000000000..0521167c8 --- /dev/null +++ b/plugins/inputs/cl_104/README.md @@ -0,0 +1,88 @@ +# CL 104 Input Plugin + +This plugin listens for metrics sent via WS in any of the supported +[data formats][data_formats]. + +⭐ Telegraf v1.9.0 +🏷️ server +💻 all + +[data_formats]: /docs/DATA_FORMATS_INPUT.md + +## Service Input + +This plugin is a service input. Normal plugins gather metrics determined by the +interval setting. Service plugins start a service to listens and waits for +metrics or events to occur. Service plugins have two key differences from +normal plugins: + +1. The global or plugin specific `interval` setting may not apply +2. The CLI options of `--test`, `--test-wait`, and `--once` may not produce + output for this plugin + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +# Generic HTTP write listener +[[inputs.cl_104]] + ## Address to host HTTP listener on + ## can be prefixed by protocol tcp, or unix if not provided defaults to tcp + ## if unix network type provided it should be followed by absolute path for unix socket + service_address = "tcp://:8080" + ## service_address = "tcp://:8443" + + ## Paths to listen to. + # path_cl="/api/104" + # path_up="/api/104up" + + ## maximum duration before timing out read of the request + # read_timeout = "10s" + ## maximum duration before timing out write of the response + # write_timeout = "10s" + ## pong wait + # pong_wait="60s" + ## ping period + # ping_period="54s" + ## write_wait + # write_wait="10s" + + ## Set one or more allowed client CA certificate file names to + ## enable mutually authenticated TLS connections + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Add service certificate and key + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Minimal TLS version accepted by the server + # tls_min_version = "TLS12" + + ## Optional username and password to accept for HTTP basic authentication. + ## You probably want to make sure you have TLS configured above for this. + # basic_username = "foobar" + # basic_password = "barfoo" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "cl_104" +``` + +## Metrics + +Metrics are collected from the part of the request specified by the +`data_source` param and are parsed depending on the value of `data_format`. + +## Example Output + +## Troubleshooting diff --git a/plugins/inputs/cl_104/cl_104.go b/plugins/inputs/cl_104/cl_104.go new file mode 100644 index 000000000..7a2b74122 --- /dev/null +++ b/plugins/inputs/cl_104/cl_104.go @@ -0,0 +1,309 @@ +//go:generate ../../../tools/readme_config_includer/generator +package cl_104 + +import ( + "context" + "crypto/subtle" + "crypto/tls" + _ "embed" + "errors" + "fmt" + "net" + "net/http" + "net/url" + "regexp" + "sync" + "time" + + "github.com/gorilla/websocket" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + common_tls "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +//go:embed sample.conf +var sampleConfig string + +var once sync.Once + +type CL104 struct { + ServiceAddress string `toml:"service_address"` + ReadTimeout config.Duration `toml:"read_timeout"` + WriteTimeout config.Duration `toml:"write_timeout"` + PathCl string `toml:"path_cl"` + PathUp string `toml:"path_up"` + PongWait config.Duration `toml:"pong_wait"` + PingPeriod config.Duration `toml:"ping_period"` + WriteWait config.Duration `toml:"write_wait"` + BasicUsername string `toml:"basic_username"` + BasicPassword string `toml:"basic_password"` + + common_tls.ServerConfig + tlsConf *tls.Config + + timeFunc + Log telegraf.Logger + + wg sync.WaitGroup + close chan struct{} + + listener net.Listener + url *url.URL + + route map[string]func(res http.ResponseWriter, req *http.Request) + upChan chan []byte // confirm + clChan chan []byte // command + + telegraf.Parser + acc telegraf.Accumulator +} + +type msg struct { + TI int `json:"ti"` + COT int `json:"cot"` + PN int `json:"pn"` + CA int `json:"ca"` + Infos []any `json:"infos"` +} + +type wsMsg struct { + mt int + data []byte +} + +type wsSession struct { + conn *websocket.Conn + ctx context.Context + cancel context.CancelFunc + ctrlCh chan wsMsg +} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + WriteBufferPool: &sync.Pool{ + New: func() any { + return make([]byte, 1024) + }, + }, +} + +// timeFunc provides a timestamp for the metrics +type timeFunc func() time.Time + +func (*CL104) SampleConfig() string { + return sampleConfig +} + +func (h *CL104) Init() error { + tlsConf, err := h.ServerConfig.TLSConfig() + if err != nil { + return err + } + + protoRegex := regexp.MustCompile(`\w://`) + if !protoRegex.MatchString(h.ServiceAddress) { + h.ServiceAddress = "tcp://" + h.ServiceAddress + } + + u, err := url.Parse(h.ServiceAddress) + if err != nil { + return fmt.Errorf("parsing address failed: %w", err) + } + + h.url = u + h.tlsConf = tlsConf + + h.route = map[string]func(res http.ResponseWriter, req *http.Request){ + h.PathCl: h.serveClstream, + h.PathUp: h.serveUpstream, + } + h.upChan = make(chan []byte, 16) + h.clChan = make(chan []byte, 16) + + return nil +} + +func (h *CL104) SetParser(parser telegraf.Parser) { + h.Parser = parser +} + +func (h *CL104) Start(acc telegraf.Accumulator) error { + + var listener net.Listener + var err error + if h.tlsConf != nil { + listener, err = tls.Listen(h.url.Scheme, h.url.Host, h.tlsConf) + } else { + listener, err = net.Listen(h.url.Scheme, h.url.Host) + } + if err != nil { + return err + } + h.listener = listener + + if h.ReadTimeout < config.Duration(time.Second) { + h.ReadTimeout = config.Duration(time.Second * 10) + } + if h.WriteTimeout < config.Duration(time.Second) { + h.WriteTimeout = config.Duration(time.Second * 10) + } + if h.PongWait <= config.Duration(time.Second) { + h.PongWait = config.Duration(time.Second * 60) + } + if h.PingPeriod <= config.Duration(time.Second) { + h.PingPeriod = (h.PongWait * 9) / 10 + } + if h.WriteWait <= config.Duration(time.Second) { + h.PongWait = config.Duration(time.Second * 10) + } + + h.acc = acc + + server := h.createHTTPServer() + + h.wg.Add(1) + go func() { + defer h.wg.Done() + if err := server.Serve(h.listener); err != nil { + if !errors.Is(err, net.ErrClosed) { + h.Log.Errorf("Serve failed: %v", err) + } + close(h.close) + } + }() + + h.Log.Infof("Listening on %s", h.listener.Addr().String()) + + return nil +} + +func (*CL104) Gather(telegraf.Accumulator) error { + return nil +} + +func (h *CL104) Stop() { + if h.listener != nil { + h.listener.Close() + } + h.wg.Wait() +} + +// ServeHTTP implements [http.Handler] +func (h *CL104) ServeHTTP(res http.ResponseWriter, req *http.Request) { + + handler, ok := h.route[req.URL.Path] + if !ok { + handler = http.NotFound + } + + h.authenticateIfSet(handler, res, req) +} + +func (h *CL104) createHTTPServer() *http.Server { + return &http.Server{ + Addr: h.ServiceAddress, + Handler: h, + ReadTimeout: time.Duration(h.ReadTimeout), + WriteTimeout: time.Duration(h.WriteTimeout), + TLSConfig: h.tlsConf, + } +} + +func (h *CL104) authenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) { + if h.BasicUsername != "" && h.BasicPassword != "" { + reqUsername, reqPassword, ok := req.BasicAuth() + if !ok || + subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.BasicUsername)) != 1 || + subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.BasicPassword)) != 1 { + http.Error(res, "Unauthorized.", http.StatusUnauthorized) + return + } + } + + handler(res, req) +} + +func init() { + inputs.Add("cl_104", func() telegraf.Input { + return &CL104{ + timeFunc: time.Now, + close: make(chan struct{}), + } + }) +} + +func (h *CL104) writeControl(session *wsSession, mt int, payload []byte) error { + select { + case <-session.ctx.Done(): + return session.ctx.Err() + case session.ctrlCh <- wsMsg{mt: mt, data: payload}: + return nil + case <-time.After(time.Duration(h.WriteWait)): + return errors.New("write control timeout") + } +} + +func (h *CL104) setUpSessionConn(session *wsSession) error { + session.conn.SetPongHandler(func(appData string) error { + session.conn.SetReadDeadline(time.Now().Add(time.Duration(h.PongWait))) + return nil + }) + + session.conn.SetPingHandler(func(appData string) error { + if err := h.writeControl(session, websocket.PongMessage, []byte(appData)); err != nil { + session.cancel() + return err + } + return nil + }) + + session.conn.SetCloseHandler(func(code int, text string) error { + session.cancel() + return nil + }) + + return session.conn.SetReadDeadline(time.Now().Add(time.Duration(h.PongWait))) +} + +func (h *CL104) sendPing(session *wsSession) { + + ticker := time.NewTicker(time.Duration(h.PingPeriod)) + defer ticker.Stop() + + for { + select { + case <-session.ctx.Done(): + return + case <-ticker.C: + err := h.writeControl(session, websocket.PingMessage, nil) + if err != nil { + session.cancel() + return + } + } + } +} + +func tooLarge(res http.ResponseWriter) error { + res.Header().Set("Content-Type", "application/json") + res.WriteHeader(http.StatusRequestEntityTooLarge) + _, err := res.Write([]byte(`{"code":1,"msg":"http: request body too large"}`)) + return err +} + +func methodNotAllowed(res http.ResponseWriter) error { + res.Header().Set("Content-Type", "application/json") + res.WriteHeader(http.StatusMethodNotAllowed) + _, err := res.Write([]byte(`{"code":1,"msg":"http: method not allowed"}`)) + return err +} + +func badRequest(res http.ResponseWriter) error { + res.Header().Set("Content-Type", "application/json") + res.WriteHeader(http.StatusBadRequest) + _, err := res.Write([]byte(`{"code":1,"msg":"http: bad request"}`)) + return err +} diff --git a/plugins/inputs/cl_104/cl_104_test.go b/plugins/inputs/cl_104/cl_104_test.go new file mode 100644 index 000000000..091a1e942 --- /dev/null +++ b/plugins/inputs/cl_104/cl_104_test.go @@ -0,0 +1,33 @@ +package cl_104 + +import ( + "time" + + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/testutil" +) + +const ( + testMsg = `{"ti":30,"cot":3,"ca":1,"infos":[{"ioa":121,"val":1,"q":0,"ms":1768205453000}]}` + badMsg = "blahblahblah: 42\n" + emptyMsg = "" + + basicUsername = "test-username-please-ignore" + basicPassword = "super-secure-password!" +) + +func newTestCL104() (*CL104, error) { + parser := &influx.Parser{} + if err := parser.Init(); err != nil { + return nil, err + } + + listener := &CL104{ + Log: testutil.Logger{}, + ServiceAddress: "localhost:0", + Parser: parser, + timeFunc: time.Now, + close: make(chan struct{}), + } + return listener, nil +} diff --git a/plugins/inputs/cl_104/clstream.go b/plugins/inputs/cl_104/clstream.go new file mode 100644 index 000000000..b3858d716 --- /dev/null +++ b/plugins/inputs/cl_104/clstream.go @@ -0,0 +1,195 @@ +package cl_104 + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" + "github.com/influxdata/telegraf/internal" +) + +var clConnNum int64 + +func (h *CL104) serveClstream(res http.ResponseWriter, req *http.Request) { + select { + case <-h.close: + res.WriteHeader(http.StatusGone) + return + default: + if atomic.SwapInt64(&clConnNum, 1) > 0 { + res.WriteHeader(http.StatusConflict) + return + } + } + + conn, err := upgrader.Upgrade(res, req, nil) + if err != nil { + h.Log.Error(err) + return + } + defer conn.Close() + + stopCtx, stopCancel := context.WithCancel(context.Background()) + defer stopCancel() + + session := &wsSession{ + conn: conn, + ctx: stopCtx, + cancel: stopCancel, + ctrlCh: make(chan wsMsg, 2), + } + + if err := h.setUpSessionConn(session); err != nil { + h.Log.Error(err) + return + } + + h.startClWorkers(session) +} + +func (h *CL104) startClWorkers(session *wsSession) { + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + h.monitorClWrite(session) + }() + + wg.Add(1) + go func() { + defer wg.Done() + h.monitorClRead(session) + }() + + wg.Add(1) + go func() { + defer wg.Done() + h.sendPing(session) + }() + + wg.Wait() + atomic.SwapInt64(&clConnNum, 0) +} + +func (h *CL104) monitorClWrite(session *wsSession) { + + var err error + for { + select { + case <-session.ctx.Done(): + return + case ctrl := <-session.ctrlCh: + err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait))) + if err != nil { + session.cancel() + return + } + err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(time.Duration(h.WriteWait))) + if err != nil { + session.cancel() + return + } + case msg := <-h.clChan: + err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait))) + if err != nil { + session.cancel() + return + } + err = session.conn.WriteMessage(websocket.TextMessage, msg) + if err != nil { + session.cancel() + return + } + } + } +} + +func (h *CL104) monitorClRead(session *wsSession) { + for { + select { + case <-session.ctx.Done(): + return + default: + mt, rm, err := session.conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + session.cancel() + return + } + if ce, ok := err.(*websocket.CloseError); ok { + h.Log.Infof("client closed with code:", ce.Code, "text:", ce.Text) + session.cancel() + return + } + h.Log.Error("server read:", err) + session.cancel() + return + } + + switch mt { + case websocket.TextMessage: + if err := h.fromClstream(session, rm); err != nil { + h.Log.Error(err) + } + default: + h.Log.Info("not text:", string(rm)) + } + } + } +} + +func (h *CL104) fromClstream(session *wsSession, m []byte) error { + msg := new(msg) + if err := json.Unmarshal(m, msg); err != nil { + return err + } + + switch { + case msg.TI >= 1 && msg.TI <= 40, msg.TI >= 110 && msg.TI <= 119: + metrics, err := h.Parse(m) + if err != nil { + h.Log.Debugf("parse error: %s", err.Error()) + return err + } + + if len(metrics) == 0 { + once.Do(func() { + h.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + + for _, m := range metrics { + h.acc.AddMetric(m) + } + + case msg.TI >= 45 && msg.TI <= 69, msg.TI >= 100 && msg.TI <= 109: + select { + case <-session.ctx.Done(): + return session.ctx.Err() + default: + msg.PN = msg.COT & 0x40 + msg.COT = msg.COT & 0x3F + if m, err := json.Marshal(msg); err != nil { + return err + } else { + select { + case h.upChan <- m: + case <-time.After(time.Second * 5): + h.Log.Error("drop up msg:", msg) + } + } + + } + + default: + return fmt.Errorf("invalid TI: %d", msg.TI) + } + + return nil +} diff --git a/plugins/inputs/cl_104/sample.conf b/plugins/inputs/cl_104/sample.conf new file mode 100644 index 000000000..b29b26ea4 --- /dev/null +++ b/plugins/inputs/cl_104/sample.conf @@ -0,0 +1,44 @@ +# Generic HTTP write listener +[[inputs.cl_104]] + ## Address to host HTTP listener on + ## can be prefixed by protocol tcp, or unix if not provided defaults to tcp + ## if unix network type provided it should be followed by absolute path for unix socket + service_address = "tcp://:8080" + ## service_address = "tcp://:8443" + + ## Paths to listen to. + # path_cl="/api/104" + # path_up="/api/104up" + + ## maximum duration before timing out read of the request + # read_timeout = "10s" + ## maximum duration before timing out write of the response + # write_timeout = "10s" + ## pong wait + # pong_wait="60s" + ## ping period + # ping_period="54s" + ## write_wait + # write_wait="10s" + + ## Set one or more allowed client CA certificate file names to + ## enable mutually authenticated TLS connections + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Add service certificate and key + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Minimal TLS version accepted by the server + # tls_min_version = "TLS12" + + ## Optional username and password to accept for HTTP basic authentication. + ## You probably want to make sure you have TLS configured above for this. + # basic_username = "foobar" + # basic_password = "barfoo" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "cl_104" diff --git a/plugins/inputs/cl_104/upstream.go b/plugins/inputs/cl_104/upstream.go new file mode 100644 index 000000000..299c7a7b4 --- /dev/null +++ b/plugins/inputs/cl_104/upstream.go @@ -0,0 +1,159 @@ +package cl_104 + +import ( + "context" + "encoding/json" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" +) + +var upConnNum int64 + +func (h *CL104) serveUpstream(res http.ResponseWriter, req *http.Request) { + select { + case <-h.close: + res.WriteHeader(http.StatusGone) + return + default: + if atomic.SwapInt64(&upConnNum, 1) > 0 { + res.WriteHeader(http.StatusConflict) + return + } + } + + conn, err := upgrader.Upgrade(res, req, nil) + if err != nil { + h.Log.Error(err) + return + } + defer conn.Close() + + stopCtx, stopCancel := context.WithCancel(context.Background()) + defer stopCancel() + + session := &wsSession{ + conn: conn, + ctx: stopCtx, + cancel: stopCancel, + ctrlCh: make(chan wsMsg, 2), + } + + if err := h.setUpSessionConn(session); err != nil { + h.Log.Error(err) + return + } + + h.startUpWorkers(session) +} + +func (h *CL104) startUpWorkers(session *wsSession) { + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + h.monitorUpWrite(session) + }() + + wg.Add(1) + go func() { + defer wg.Done() + h.monitorUpRead(session) + }() + + wg.Add(1) + go func() { + defer wg.Done() + h.sendPing(session) + }() + + wg.Wait() + atomic.SwapInt64(&upConnNum, 0) +} + +func (h *CL104) monitorUpWrite(session *wsSession) { + var err error + for { + select { + case <-session.ctx.Done(): + return + case ctrl := <-session.ctrlCh: + err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait))) + if err != nil { + session.cancel() + return + } + err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(time.Duration(h.WriteWait))) + if err != nil { + session.cancel() + return + } + case msg := <-h.upChan: + err = session.conn.SetWriteDeadline(time.Now().Add(time.Duration(h.WriteWait))) + if err != nil { + session.cancel() + return + } + err = session.conn.WriteMessage(websocket.TextMessage, msg) + if err != nil { + session.cancel() + return + } + } + } +} + +func (h *CL104) monitorUpRead(session *wsSession) { + for { + select { + case <-session.ctx.Done(): + return + default: + mt, rm, err := session.conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + session.cancel() + return + } + if ce, ok := err.(*websocket.CloseError); ok { + h.Log.Infof("client closed with code:", ce.Code, "text:", ce.Text) + session.cancel() + return + } + h.Log.Error("server read:", err) + session.cancel() + return + } + + switch mt { + case websocket.TextMessage: + if err := h.fromUpstream(rm); err != nil { + h.Log.Error(err) + } + default: + h.Log.Info("not text:", string(rm)) + } + } + } +} + +func (h *CL104) fromUpstream(m []byte) error { + msg := new(msg) + if err := json.Unmarshal(m, msg); err != nil { + return err + } + + if msg.TI >= 45 && msg.TI <= 69 || msg.TI >= 100 && msg.TI <= 109 { + select { + case h.clChan <- m: + case <-time.After(time.Second * 5): + h.Log.Error("drop cl msg:", msg) + } + } + + return nil +} diff --git a/plugins/parsers/all/cl_104.go b/plugins/parsers/all/cl_104.go new file mode 100644 index 000000000..f945df37d --- /dev/null +++ b/plugins/parsers/all/cl_104.go @@ -0,0 +1,5 @@ +//go:build custom || parsers || parsers.cl_104 + +package all + +import _ "github.com/influxdata/telegraf/plugins/parsers/cl_104" // register plugin diff --git a/plugins/parsers/all/phasor_binary.go b/plugins/parsers/all/phasor_binary.go index 5297df1d1..49b3fe979 100644 --- a/plugins/parsers/all/phasor_binary.go +++ b/plugins/parsers/all/phasor_binary.go @@ -1,4 +1,4 @@ -//go:build !custom || parsers || parsers.phasor_binary +//go:build custom || parsers || parsers.phasor_binary package all diff --git a/plugins/parsers/cl_104/README.md b/plugins/parsers/cl_104/README.md new file mode 100644 index 000000000..d0b385b45 --- /dev/null +++ b/plugins/parsers/cl_104/README.md @@ -0,0 +1,54 @@ +# CL 104 Parser Plugin + +This parser takes valid JSON input and turns it into line protocol. + +## Configuration + +```toml +[[inputs.cl_104]] + ## Address to host HTTP listener on + ## can be prefixed by protocol tcp, or unix if not provided defaults to tcp + ## if unix network type provided it should be followed by absolute path for unix socket + service_address = "tcp://:8080" + ## service_address = "tcp://:8443" + + ## Paths to listen to. + # path_cl="/api/104" + # path_up="/api/104up" + + ## maximum duration before timing out read of the request + # read_timeout = "10s" + ## maximum duration before timing out write of the response + # write_timeout = "10s" + ## pong wait + # pong_wait="60s" + ## ping period + # ping_period="54s" + ## write_wait + # write_wait="10s" + + ## Set one or more allowed client CA certificate file names to + ## enable mutually authenticated TLS connections + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Add service certificate and key + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Minimal TLS version accepted by the server + # tls_min_version = "TLS12" + + ## Optional username and password to accept for HTTP basic authentication. + ## You probably want to make sure you have TLS configured above for this. + # basic_username = "foobar" + # basic_password = "barfoo" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "cl_104" +``` + +--- + diff --git a/plugins/parsers/cl_104/parser.go b/plugins/parsers/cl_104/parser.go new file mode 100644 index 000000000..d5cbf192a --- /dev/null +++ b/plugins/parsers/cl_104/parser.go @@ -0,0 +1,109 @@ +package cl_104 + +import ( + "encoding/json" + "errors" + "strconv" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" +) + +// Parser +type Parser struct { + DefaultMetricName string + DefaultTags map[string]string + Log telegraf.Logger + + // **** The struct fields below this comment are used for processing individual configs **** + + // measurementName is the name of the current config used in each line protocol + measurementName string + + // parseMutex is here because Parse() is not threadsafe. If it is made threadsafe at some point, then we won't need it anymore. + parseMutex sync.Mutex +} + +type info struct { + IOA int `json:"ioa"` + Val float64 `json:"val"` + Q int `json:"q"` + MS int64 `json:"ms"` +} + +type msg struct { + TI int `json:"ti"` + COT int `json:"cot"` + PN int `json:"pn"` + CA int `json:"ca"` + Infos []*info `json:"infos"` +} + +func (p *Parser) Init() error { + if len(p.measurementName) == 0 { + p.measurementName = "cl104" + } + return nil +} + +func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { + msg := new(msg) + if err := json.Unmarshal(input, msg); err != nil { + return nil, err + } + + metrics := make([]telegraf.Metric, 0, len(msg.Infos)) + for _, info := range msg.Infos { + if info == nil { + continue + } + + tags := map[string]string{ + "ca": strconv.Itoa(msg.CA), + "cot": strconv.Itoa(msg.COT), + "ioa": strconv.Itoa(info.IOA), + "ti": strconv.Itoa(msg.TI), + } + + fields := map[string]any{ + "val": info.Val, + } + + tm := time.Now() + if info.MS > 0 { + tm = time.UnixMilli(info.MS) + } + + metrics = append(metrics, metric.New( + p.measurementName, + tags, + fields, + tm, + )) + } + + return metrics, nil +} + +func (*Parser) ParseLine(string) (telegraf.Metric, error) { + return nil, errors.New("parsing line is not implemented") +} + +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} + +func init() { + // Register all variants + parsers.Add("cl_104", + func(defaultMetricName string) telegraf.Parser { + return &Parser{ + DefaultMetricName: defaultMetricName, + measurementName: "cl104", + } + }, + ) +} diff --git a/plugins/parsers/cl_104/parser_test.go b/plugins/parsers/cl_104/parser_test.go new file mode 100644 index 000000000..4e1ec45b9 --- /dev/null +++ b/plugins/parsers/cl_104/parser_test.go @@ -0,0 +1 @@ +package cl_104