package kafka_subscriber import ( "compress/gzip" "crypto/subtle" "crypto/tls" _ "embed" "errors" "fmt" "io" "net" "net/http" "net/url" "regexp" "sync" "time" "github.com/IBM/sarama" "github.com/golang/snappy" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/kafka" common_tls "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) //go:embed sample.conf var sampleConfig string type KafkaSubscriber struct { Address string `toml:"address"` Path string `toml:"path"` Brokers []string `toml:"brokers"` ReadTimeout config.Duration `toml:"read_timeout"` WriteTimeout config.Duration `toml:"write_timeout"` BasicUsername string `toml:"basic_username"` BasicPassword string `toml:"basic_password"` MaxBodySize config.Size `toml:"max_body_size"` CallURL string `toml:"call_url"` CallTimeout config.Duration `toml:"call_timeout"` Log telegraf.Logger `toml:"-"` common_tls.ServerConfig tlsConf *tls.Config wg sync.WaitGroup close chan struct{} listener net.Listener url *url.URL client sarama.Client consumer sarama.Consumer httpClient *http.Client parser telegraf.Parser acc telegraf.Accumulator } func (*KafkaSubscriber) SampleConfig() string { return sampleConfig } func (k *KafkaSubscriber) Init() error { kafka.SetLogger(k.Log.Level()) tlsConf, err := k.ServerConfig.TLSConfig() if err != nil { return err } protoRegex := regexp.MustCompile(`\w://`) if !protoRegex.MatchString(k.Address) { k.Address = "tcp://" + k.Address } u, err := url.Parse(k.Address) if err != nil { return fmt.Errorf("parsing address failed: %w", err) } k.tlsConf = tlsConf k.url = u // TODO initialization k.httpClient = &http.Client{ Timeout: 10 * time.Second, Transport: &http.Transport{ DialContext: (&net.Dialer{ Timeout: time.Second, }).DialContext, MaxIdleConns: 100, MaxIdleConnsPerHost: 100, IdleConnTimeout: 90 * time.Second, }, } return nil } func (k *KafkaSubscriber) SetParser(parser telegraf.Parser) { k.parser = parser } func (k *KafkaSubscriber) Start(acc telegraf.Accumulator) error { var listener net.Listener var err error if k.tlsConf != nil { listener, err = tls.Listen(k.url.Scheme, k.url.Host, k.tlsConf) } else { listener, err = net.Listen(k.url.Scheme, k.url.Host) } if err != nil { return err } k.listener = listener if k.MaxBodySize == 0 { k.MaxBodySize = config.Size(100 * 1024 * 1024) } if k.ReadTimeout < config.Duration(time.Second) { k.ReadTimeout = config.Duration(time.Second * 10) } if k.WriteTimeout < config.Duration(time.Second) { k.WriteTimeout = config.Duration(time.Second * 10) } if k.CallTimeout < config.Duration(time.Second) { k.CallTimeout = config.Duration(time.Second * 10) } k.acc = acc // TODO server := k.createHTTPServer() k.wg.Add(1) go func() { defer k.wg.Done() if err := server.Serve(k.listener); err != nil { if !errors.Is(err, net.ErrClosed) { k.Log.Errorf("Serve failed: %v", err) } close(k.close) } }() k.Log.Infof("Listening on %s", k.listener.Addr().String()) return nil } func (*KafkaSubscriber) Gather(telegraf.Accumulator) error { return nil } func (k *KafkaSubscriber) Stop() { if k.listener != nil { k.listener.Close() } k.wg.Wait() } // ServeHTTP implements [http.Handler] func (k *KafkaSubscriber) ServeHTTP(res http.ResponseWriter, req *http.Request) { if k.BasicUsername != "" && k.BasicPassword != "" { reqUsername, reqPassword, ok := req.BasicAuth() if !ok || subtle.ConstantTimeCompare([]byte(reqUsername), []byte(k.BasicUsername)) != 1 || subtle.ConstantTimeCompare([]byte(reqPassword), []byte(k.BasicPassword)) != 1 { http.Error(res, "Unauthorized.", http.StatusUnauthorized) return } } if req.URL.Path != k.Path { http.NotFound(res, req) } else { k.handle(res, req) } } func (k *KafkaSubscriber) createHTTPServer() *http.Server { return &http.Server{ Addr: k.Address, Handler: k, ReadTimeout: time.Duration(k.ReadTimeout), WriteTimeout: time.Duration(k.WriteTimeout), TLSConfig: k.tlsConf, } } func (k *KafkaSubscriber) handle(res http.ResponseWriter, req *http.Request) { select { case <-k.close: res.WriteHeader(http.StatusGone) return default: } if req.Method != http.MethodPost { if err := methodNotAllowed(res); err != nil { k.Log.Debugf("error in method-not-allowed: %v", err) } return } bytes, ok := k.collectBody(res, req) if !ok { return } // TODO _ = bytes } func (h *KafkaSubscriber) collectBody(res http.ResponseWriter, req *http.Request) ([]byte, bool) { encoding := req.Header.Get("Content-Encoding") switch encoding { case "gzip": r, err := gzip.NewReader(req.Body) if err != nil { h.Log.Debug(err.Error()) if err := badRequest(res); err != nil { h.Log.Debugf("error in bad-request: %v", err) } return nil, false } defer r.Close() maxReader := http.MaxBytesReader(res, r, int64(h.MaxBodySize)) bytes, err := io.ReadAll(maxReader) if err != nil { if err := tooLarge(res); err != nil { h.Log.Debugf("error in too-large: %v", err) } return nil, false } return bytes, true case "snappy": defer req.Body.Close() bytes, err := io.ReadAll(req.Body) if err != nil { h.Log.Debug(err.Error()) if err := badRequest(res); err != nil { h.Log.Debugf("error in bad-request: %v", err) } return nil, false } // snappy block format is only supported by decode/encode not snappy reader/writer bytes, err = snappy.Decode(nil, bytes) if err != nil { h.Log.Debug(err.Error()) if err := badRequest(res); err != nil { h.Log.Debugf("error in bad-request: %v", err) } return nil, false } return bytes, true default: defer req.Body.Close() bytes, err := io.ReadAll(req.Body) if err != nil { h.Log.Debug(err.Error()) if err := badRequest(res); err != nil { h.Log.Debugf("error in bad-request: %v", err) } return nil, false } return bytes, true } } func (k *KafkaSubscriber) subscribe() error { // TODO return nil } type callRequest struct { // TODO } func (k *KafkaSubscriber) call(req *callRequest) error { // TODO return nil } func init() { inputs.Add("cl_kafka_subscriber", func() telegraf.Input { return &KafkaSubscriber{ Address: ":8877", close: make(chan struct{}), } }) } func tooLarge(res http.ResponseWriter) error { res.Header().Set("Content-Type", "application/json") res.WriteHeader(http.StatusRequestEntityTooLarge) _, err := res.Write([]byte(`{"error":"http: request body too large"}`)) return err } func methodNotAllowed(res http.ResponseWriter) error { res.Header().Set("Content-Type", "application/json") res.WriteHeader(http.StatusMethodNotAllowed) _, err := res.Write([]byte(`{"error":"http: method not allowed"}`)) return err } func badRequest(res http.ResponseWriter) error { res.Header().Set("Content-Type", "application/json") res.WriteHeader(http.StatusBadRequest) _, err := res.Write([]byte(`{"error":"http: bad request"}`)) return err }