//go:generate ../../../tools/readme_config_includer/generator package cl_104 import ( "context" "crypto/subtle" "crypto/tls" _ "embed" "errors" "fmt" "net" "net/http" "net/url" "regexp" "sync" "time" "github.com/gorilla/websocket" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" common_tls "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) //go:embed sample.conf var sampleConfig string var once sync.Once type CL104 struct { ServiceAddress string `toml:"service_address"` ReadTimeout config.Duration `toml:"read_timeout"` WriteTimeout config.Duration `toml:"write_timeout"` PathCl string `toml:"path_cl"` PathUp string `toml:"path_up"` PongWait config.Duration `toml:"pong_wait"` PingPeriod config.Duration `toml:"ping_period"` WriteWait config.Duration `toml:"write_wait"` BasicUsername string `toml:"basic_username"` BasicPassword string `toml:"basic_password"` common_tls.ServerConfig tlsConf *tls.Config timeFunc Log telegraf.Logger wg sync.WaitGroup close chan struct{} listener net.Listener url *url.URL route map[string]func(res http.ResponseWriter, req *http.Request) upChan chan []byte // confirm clChan chan []byte // command telegraf.Parser acc telegraf.Accumulator } type msg struct { TI int `json:"ti"` COT int `json:"cot"` PN int `json:"pn"` CA int `json:"ca"` Infos []any `json:"infos"` } type wsMsg struct { mt int data []byte } type wsSession struct { conn *websocket.Conn ctx context.Context cancel context.CancelFunc ctrlCh chan wsMsg } var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, WriteBufferPool: &sync.Pool{ New: func() any { return make([]byte, 1024) }, }, } // timeFunc provides a timestamp for the metrics type timeFunc func() time.Time func (*CL104) SampleConfig() string { return sampleConfig } func (h *CL104) Init() error { tlsConf, err := h.ServerConfig.TLSConfig() if err != nil { return err } protoRegex := regexp.MustCompile(`\w://`) if !protoRegex.MatchString(h.ServiceAddress) { h.ServiceAddress = "tcp://" + h.ServiceAddress } u, err := url.Parse(h.ServiceAddress) if err != nil { return fmt.Errorf("parsing address failed: %w", err) } h.url = u h.tlsConf = tlsConf h.route = map[string]func(res http.ResponseWriter, req *http.Request){ h.PathCl: h.serveClstream, h.PathUp: h.serveUpstream, } h.upChan = make(chan []byte, 16) h.clChan = make(chan []byte, 16) return nil } func (h *CL104) SetParser(parser telegraf.Parser) { h.Parser = parser } func (h *CL104) Start(acc telegraf.Accumulator) error { var listener net.Listener var err error if h.tlsConf != nil { listener, err = tls.Listen(h.url.Scheme, h.url.Host, h.tlsConf) } else { listener, err = net.Listen(h.url.Scheme, h.url.Host) } if err != nil { return err } h.listener = listener if h.ReadTimeout < config.Duration(time.Second) { h.ReadTimeout = config.Duration(time.Second * 10) } if h.WriteTimeout < config.Duration(time.Second) { h.WriteTimeout = config.Duration(time.Second * 10) } if h.PongWait <= config.Duration(time.Second) { h.PongWait = config.Duration(time.Second * 60) } if h.PingPeriod <= config.Duration(time.Second) { h.PingPeriod = (h.PongWait * 9) / 10 } if h.WriteWait <= config.Duration(time.Second) { h.PongWait = config.Duration(time.Second * 10) } h.acc = acc server := h.createHTTPServer() h.wg.Add(1) go func() { defer h.wg.Done() if err := server.Serve(h.listener); err != nil { if !errors.Is(err, net.ErrClosed) { h.Log.Errorf("Serve failed: %v", err) } close(h.close) } }() h.Log.Infof("Listening on %s", h.listener.Addr().String()) return nil } func (*CL104) Gather(telegraf.Accumulator) error { return nil } func (h *CL104) Stop() { if h.listener != nil { h.listener.Close() } h.wg.Wait() } // ServeHTTP implements [http.Handler] func (h *CL104) ServeHTTP(res http.ResponseWriter, req *http.Request) { handler, ok := h.route[req.URL.Path] if !ok { handler = http.NotFound } h.authenticateIfSet(handler, res, req) } func (h *CL104) createHTTPServer() *http.Server { return &http.Server{ Addr: h.ServiceAddress, Handler: h, ReadTimeout: time.Duration(h.ReadTimeout), WriteTimeout: time.Duration(h.WriteTimeout), TLSConfig: h.tlsConf, } } func (h *CL104) authenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) { if h.BasicUsername != "" && h.BasicPassword != "" { reqUsername, reqPassword, ok := req.BasicAuth() if !ok || subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.BasicUsername)) != 1 || subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.BasicPassword)) != 1 { http.Error(res, "Unauthorized.", http.StatusUnauthorized) return } } handler(res, req) } func init() { inputs.Add("cl_104", func() telegraf.Input { return &CL104{ timeFunc: time.Now, close: make(chan struct{}), } }) } func (h *CL104) writeControl(session *wsSession, mt int, payload []byte) error { select { case <-session.ctx.Done(): return session.ctx.Err() case session.ctrlCh <- wsMsg{mt: mt, data: payload}: return nil case <-time.After(time.Duration(h.WriteWait)): return errors.New("write control timeout") } } func (h *CL104) setUpSessionConn(session *wsSession) error { session.conn.SetPongHandler(func(appData string) error { session.conn.SetReadDeadline(time.Now().Add(time.Duration(h.PongWait))) return nil }) session.conn.SetPingHandler(func(appData string) error { if err := h.writeControl(session, websocket.PongMessage, []byte(appData)); err != nil { session.cancel() return err } return nil }) session.conn.SetCloseHandler(func(code int, text string) error { session.cancel() return nil }) return session.conn.SetReadDeadline(time.Now().Add(time.Duration(h.PongWait))) } func (h *CL104) sendPing(session *wsSession) { ticker := time.NewTicker(time.Duration(h.PingPeriod)) defer ticker.Stop() for { select { case <-session.ctx.Done(): return case <-ticker.C: err := h.writeControl(session, websocket.PingMessage, nil) if err != nil { session.cancel() return } } } } func tooLarge(res http.ResponseWriter) error { res.Header().Set("Content-Type", "application/json") res.WriteHeader(http.StatusRequestEntityTooLarge) _, err := res.Write([]byte(`{"code":1,"msg":"http: request body too large"}`)) return err } func methodNotAllowed(res http.ResponseWriter) error { res.Header().Set("Content-Type", "application/json") res.WriteHeader(http.StatusMethodNotAllowed) _, err := res.Write([]byte(`{"code":1,"msg":"http: method not allowed"}`)) return err } func badRequest(res http.ResponseWriter) error { res.Header().Set("Content-Type", "application/json") res.WriteHeader(http.StatusBadRequest) _, err := res.Write([]byte(`{"code":1,"msg":"http: bad request"}`)) return err }