diff --git a/plugins/inputs/all/cl_kafka_subscriber.go b/plugins/inputs/all/cl_kafka_subscriber.go new file mode 100644 index 000000000..9847bed19 --- /dev/null +++ b/plugins/inputs/all/cl_kafka_subscriber.go @@ -0,0 +1,5 @@ +//go:build custom || inputs || inputs.cl_kafka_subscriber + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/cl_kafka_subscriber" // register plugin diff --git a/plugins/inputs/cl_kafka_subscriber/kafka_subscriber.go b/plugins/inputs/cl_kafka_subscriber/kafka_subscriber.go new file mode 100644 index 000000000..fb5cc7cc9 --- /dev/null +++ b/plugins/inputs/cl_kafka_subscriber/kafka_subscriber.go @@ -0,0 +1,317 @@ +package kafka_subscriber + +import ( + "compress/gzip" + "crypto/subtle" + "crypto/tls" + _ "embed" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/url" + "regexp" + "sync" + "time" + + "github.com/IBM/sarama" + "github.com/golang/snappy" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/kafka" + common_tls "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +//go:embed sample.conf +var sampleConfig string + +type KafkaSubscriber struct { + Address string `toml:"address"` + Path string `toml:"path"` + Brokers []string `toml:"brokers"` + ReadTimeout config.Duration `toml:"read_timeout"` + WriteTimeout config.Duration `toml:"write_timeout"` + BasicUsername string `toml:"basic_username"` + BasicPassword string `toml:"basic_password"` + MaxBodySize config.Size `toml:"max_body_size"` + CallURL string `toml:"call_url"` + CallTimeout config.Duration `toml:"call_timeout"` + Log telegraf.Logger `toml:"-"` + common_tls.ServerConfig + tlsConf *tls.Config + wg sync.WaitGroup + close chan struct{} + listener net.Listener + url *url.URL + client sarama.Client + consumer sarama.Consumer + httpClient *http.Client + parser telegraf.Parser + acc telegraf.Accumulator +} + +func (*KafkaSubscriber) SampleConfig() string { + return sampleConfig +} + +func (k *KafkaSubscriber) Init() error { + kafka.SetLogger(k.Log.Level()) + + tlsConf, err := k.ServerConfig.TLSConfig() + if err != nil { + return err + } + + protoRegex := regexp.MustCompile(`\w://`) + if !protoRegex.MatchString(k.Address) { + k.Address = "tcp://" + k.Address + } + + u, err := url.Parse(k.Address) + if err != nil { + return fmt.Errorf("parsing address failed: %w", err) + } + + k.tlsConf = tlsConf + k.url = u + + // TODO initialization + + k.httpClient = &http.Client{ + Timeout: 10 * time.Second, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: time.Second, + }).DialContext, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + }, + } + + return nil +} + +func (k *KafkaSubscriber) SetParser(parser telegraf.Parser) { + k.parser = parser +} + +func (k *KafkaSubscriber) Start(acc telegraf.Accumulator) error { + var listener net.Listener + var err error + if k.tlsConf != nil { + listener, err = tls.Listen(k.url.Scheme, k.url.Host, k.tlsConf) + } else { + listener, err = net.Listen(k.url.Scheme, k.url.Host) + } + if err != nil { + return err + } + k.listener = listener + + if k.MaxBodySize == 0 { + k.MaxBodySize = config.Size(100 * 1024 * 1024) + } + + if k.ReadTimeout < config.Duration(time.Second) { + k.ReadTimeout = config.Duration(time.Second * 10) + } + if k.WriteTimeout < config.Duration(time.Second) { + k.WriteTimeout = config.Duration(time.Second * 10) + } + if k.CallTimeout < config.Duration(time.Second) { + k.CallTimeout = config.Duration(time.Second * 10) + } + + k.acc = acc + + // TODO + + server := k.createHTTPServer() + + k.wg.Add(1) + go func() { + defer k.wg.Done() + if err := server.Serve(k.listener); err != nil { + if !errors.Is(err, net.ErrClosed) { + k.Log.Errorf("Serve failed: %v", err) + } + close(k.close) + } + }() + + k.Log.Infof("Listening on %s", k.listener.Addr().String()) + + return nil +} + +func (*KafkaSubscriber) Gather(telegraf.Accumulator) error { + return nil +} + +func (k *KafkaSubscriber) Stop() { + if k.listener != nil { + k.listener.Close() + } + k.wg.Wait() +} + +// ServeHTTP implements [http.Handler] +func (k *KafkaSubscriber) ServeHTTP(res http.ResponseWriter, req *http.Request) { + if k.BasicUsername != "" && k.BasicPassword != "" { + reqUsername, reqPassword, ok := req.BasicAuth() + if !ok || + subtle.ConstantTimeCompare([]byte(reqUsername), []byte(k.BasicUsername)) != 1 || + subtle.ConstantTimeCompare([]byte(reqPassword), []byte(k.BasicPassword)) != 1 { + http.Error(res, "Unauthorized.", http.StatusUnauthorized) + return + } + } + + if req.URL.Path != k.Path { + http.NotFound(res, req) + } else { + k.handle(res, req) + } +} + +func (k *KafkaSubscriber) createHTTPServer() *http.Server { + return &http.Server{ + Addr: k.Address, + Handler: k, + ReadTimeout: time.Duration(k.ReadTimeout), + WriteTimeout: time.Duration(k.WriteTimeout), + TLSConfig: k.tlsConf, + } +} + +func (k *KafkaSubscriber) handle(res http.ResponseWriter, req *http.Request) { + select { + case <-k.close: + res.WriteHeader(http.StatusGone) + return + default: + + } + + if req.Method != http.MethodPost { + if err := methodNotAllowed(res); err != nil { + k.Log.Debugf("error in method-not-allowed: %v", err) + } + return + } + + bytes, ok := k.collectBody(res, req) + if !ok { + return + } + + // TODO + _ = bytes +} + +func (h *KafkaSubscriber) collectBody(res http.ResponseWriter, req *http.Request) ([]byte, bool) { + encoding := req.Header.Get("Content-Encoding") + + switch encoding { + case "gzip": + r, err := gzip.NewReader(req.Body) + if err != nil { + h.Log.Debug(err.Error()) + if err := badRequest(res); err != nil { + h.Log.Debugf("error in bad-request: %v", err) + } + return nil, false + } + defer r.Close() + maxReader := http.MaxBytesReader(res, r, int64(h.MaxBodySize)) + bytes, err := io.ReadAll(maxReader) + if err != nil { + if err := tooLarge(res); err != nil { + h.Log.Debugf("error in too-large: %v", err) + } + return nil, false + } + return bytes, true + case "snappy": + defer req.Body.Close() + bytes, err := io.ReadAll(req.Body) + if err != nil { + h.Log.Debug(err.Error()) + if err := badRequest(res); err != nil { + h.Log.Debugf("error in bad-request: %v", err) + } + return nil, false + } + // snappy block format is only supported by decode/encode not snappy reader/writer + bytes, err = snappy.Decode(nil, bytes) + if err != nil { + h.Log.Debug(err.Error()) + if err := badRequest(res); err != nil { + h.Log.Debugf("error in bad-request: %v", err) + } + return nil, false + } + return bytes, true + default: + defer req.Body.Close() + bytes, err := io.ReadAll(req.Body) + if err != nil { + h.Log.Debug(err.Error()) + if err := badRequest(res); err != nil { + h.Log.Debugf("error in bad-request: %v", err) + } + return nil, false + } + return bytes, true + } +} + +func (k *KafkaSubscriber) subscribe() error { + + // TODO + return nil +} + +type callRequest struct { + + // TODO +} + +func (k *KafkaSubscriber) call(req *callRequest) error { + + // TODO + return nil +} + +func init() { + inputs.Add("cl_kafka_subscriber", func() telegraf.Input { + return &KafkaSubscriber{ + Address: ":8877", + close: make(chan struct{}), + } + }) +} + +func tooLarge(res http.ResponseWriter) error { + res.Header().Set("Content-Type", "application/json") + res.WriteHeader(http.StatusRequestEntityTooLarge) + _, err := res.Write([]byte(`{"error":"http: request body too large"}`)) + return err +} + +func methodNotAllowed(res http.ResponseWriter) error { + res.Header().Set("Content-Type", "application/json") + res.WriteHeader(http.StatusMethodNotAllowed) + _, err := res.Write([]byte(`{"error":"http: method not allowed"}`)) + return err +} + +func badRequest(res http.ResponseWriter) error { + res.Header().Set("Content-Type", "application/json") + res.WriteHeader(http.StatusBadRequest) + _, err := res.Write([]byte(`{"error":"http: bad request"}`)) + return err +} diff --git a/plugins/inputs/cl_kafka_subscriber/sample.conf b/plugins/inputs/cl_kafka_subscriber/sample.conf new file mode 100644 index 000000000..31285206d --- /dev/null +++ b/plugins/inputs/cl_kafka_subscriber/sample.conf @@ -0,0 +1,49 @@ +# Generic HTTP write listener +[[inputs.cl_kafka_subscriber]] + ## Address to host HTTP listener on + ## can be prefixed by protocol tcp, or unix if not provided defaults to tcp + ## if unix network type provided it should be followed by absolute path for unix socket + address = "tcp://:8877" + ## address = "tcp://:8443" + + ## Paths to listen to. + path="/api/subscribe" + + ## Kafka brokers. + brokers = ["localhost:9092"] + + ## maximum duration before timing out read of the request + # read_timeout = "10s" + ## maximum duration before timing out write of the response + # write_timeout = "10s" + + ## Optional username and password to accept for HTTP basic authentication. + ## You probably want to make sure you have TLS configured above for this. + # basic_username = "foobar" + # basic_password = "barfoo" + + ## Maximum allowed http request body size in bytes. + # max_body_size = "100MB" + + ## url to call. + call_url="/api/subscribe/feedback" + + ## maximum duration before timing out call http + # call_timeout = "10s" + + ## Minimal TLS version accepted by the server + # tls_min_version = "TLS12" + + ## Set one or more allowed client CA certificate file names to + ## enable mutually authenticated TLS connections + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Add service certificate and key + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "sample_binary" \ No newline at end of file diff --git a/plugins/inputs/cl_kafka_subscriber/subscriber_handler.go b/plugins/inputs/cl_kafka_subscriber/subscriber_handler.go new file mode 100644 index 000000000..20a070d2f --- /dev/null +++ b/plugins/inputs/cl_kafka_subscriber/subscriber_handler.go @@ -0,0 +1,16 @@ +package kafka_subscriber + +import ( + "context" + "sync" + + "github.com/influxdata/telegraf" +) + +type subscriberHandler struct { + acc telegraf.TrackingAccumulator + parser telegraf.Parser + wg sync.WaitGroup + cancel context.CancelFunc + log telegraf.Logger +}