telegraf/plugins/inputs/cl_kafka_subscriber/kafka_subscriber.go

318 lines
7.1 KiB
Go

package kafka_subscriber
import (
"compress/gzip"
"crypto/subtle"
"crypto/tls"
_ "embed"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"regexp"
"sync"
"time"
"github.com/IBM/sarama"
"github.com/golang/snappy"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/kafka"
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
type KafkaSubscriber struct {
Address string `toml:"address"`
Path string `toml:"path"`
Brokers []string `toml:"brokers"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
MaxBodySize config.Size `toml:"max_body_size"`
CallURL string `toml:"call_url"`
CallTimeout config.Duration `toml:"call_timeout"`
Log telegraf.Logger `toml:"-"`
common_tls.ServerConfig
tlsConf *tls.Config
wg sync.WaitGroup
close chan struct{}
listener net.Listener
url *url.URL
client sarama.Client
consumer sarama.Consumer
httpClient *http.Client
parser telegraf.Parser
acc telegraf.Accumulator
}
func (*KafkaSubscriber) SampleConfig() string {
return sampleConfig
}
func (k *KafkaSubscriber) Init() error {
kafka.SetLogger(k.Log.Level())
tlsConf, err := k.ServerConfig.TLSConfig()
if err != nil {
return err
}
protoRegex := regexp.MustCompile(`\w://`)
if !protoRegex.MatchString(k.Address) {
k.Address = "tcp://" + k.Address
}
u, err := url.Parse(k.Address)
if err != nil {
return fmt.Errorf("parsing address failed: %w", err)
}
k.tlsConf = tlsConf
k.url = u
// TODO initialization
k.httpClient = &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: time.Second,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
}
return nil
}
func (k *KafkaSubscriber) SetParser(parser telegraf.Parser) {
k.parser = parser
}
func (k *KafkaSubscriber) Start(acc telegraf.Accumulator) error {
var listener net.Listener
var err error
if k.tlsConf != nil {
listener, err = tls.Listen(k.url.Scheme, k.url.Host, k.tlsConf)
} else {
listener, err = net.Listen(k.url.Scheme, k.url.Host)
}
if err != nil {
return err
}
k.listener = listener
if k.MaxBodySize == 0 {
k.MaxBodySize = config.Size(100 * 1024 * 1024)
}
if k.ReadTimeout < config.Duration(time.Second) {
k.ReadTimeout = config.Duration(time.Second * 10)
}
if k.WriteTimeout < config.Duration(time.Second) {
k.WriteTimeout = config.Duration(time.Second * 10)
}
if k.CallTimeout < config.Duration(time.Second) {
k.CallTimeout = config.Duration(time.Second * 10)
}
k.acc = acc
// TODO
server := k.createHTTPServer()
k.wg.Add(1)
go func() {
defer k.wg.Done()
if err := server.Serve(k.listener); err != nil {
if !errors.Is(err, net.ErrClosed) {
k.Log.Errorf("Serve failed: %v", err)
}
close(k.close)
}
}()
k.Log.Infof("Listening on %s", k.listener.Addr().String())
return nil
}
func (*KafkaSubscriber) Gather(telegraf.Accumulator) error {
return nil
}
func (k *KafkaSubscriber) Stop() {
if k.listener != nil {
k.listener.Close()
}
k.wg.Wait()
}
// ServeHTTP implements [http.Handler]
func (k *KafkaSubscriber) ServeHTTP(res http.ResponseWriter, req *http.Request) {
if k.BasicUsername != "" && k.BasicPassword != "" {
reqUsername, reqPassword, ok := req.BasicAuth()
if !ok ||
subtle.ConstantTimeCompare([]byte(reqUsername), []byte(k.BasicUsername)) != 1 ||
subtle.ConstantTimeCompare([]byte(reqPassword), []byte(k.BasicPassword)) != 1 {
http.Error(res, "Unauthorized.", http.StatusUnauthorized)
return
}
}
if req.URL.Path != k.Path {
http.NotFound(res, req)
} else {
k.handle(res, req)
}
}
func (k *KafkaSubscriber) createHTTPServer() *http.Server {
return &http.Server{
Addr: k.Address,
Handler: k,
ReadTimeout: time.Duration(k.ReadTimeout),
WriteTimeout: time.Duration(k.WriteTimeout),
TLSConfig: k.tlsConf,
}
}
func (k *KafkaSubscriber) handle(res http.ResponseWriter, req *http.Request) {
select {
case <-k.close:
res.WriteHeader(http.StatusGone)
return
default:
}
if req.Method != http.MethodPost {
if err := methodNotAllowed(res); err != nil {
k.Log.Debugf("error in method-not-allowed: %v", err)
}
return
}
bytes, ok := k.collectBody(res, req)
if !ok {
return
}
// TODO
_ = bytes
}
func (h *KafkaSubscriber) collectBody(res http.ResponseWriter, req *http.Request) ([]byte, bool) {
encoding := req.Header.Get("Content-Encoding")
switch encoding {
case "gzip":
r, err := gzip.NewReader(req.Body)
if err != nil {
h.Log.Debug(err.Error())
if err := badRequest(res); err != nil {
h.Log.Debugf("error in bad-request: %v", err)
}
return nil, false
}
defer r.Close()
maxReader := http.MaxBytesReader(res, r, int64(h.MaxBodySize))
bytes, err := io.ReadAll(maxReader)
if err != nil {
if err := tooLarge(res); err != nil {
h.Log.Debugf("error in too-large: %v", err)
}
return nil, false
}
return bytes, true
case "snappy":
defer req.Body.Close()
bytes, err := io.ReadAll(req.Body)
if err != nil {
h.Log.Debug(err.Error())
if err := badRequest(res); err != nil {
h.Log.Debugf("error in bad-request: %v", err)
}
return nil, false
}
// snappy block format is only supported by decode/encode not snappy reader/writer
bytes, err = snappy.Decode(nil, bytes)
if err != nil {
h.Log.Debug(err.Error())
if err := badRequest(res); err != nil {
h.Log.Debugf("error in bad-request: %v", err)
}
return nil, false
}
return bytes, true
default:
defer req.Body.Close()
bytes, err := io.ReadAll(req.Body)
if err != nil {
h.Log.Debug(err.Error())
if err := badRequest(res); err != nil {
h.Log.Debugf("error in bad-request: %v", err)
}
return nil, false
}
return bytes, true
}
}
func (k *KafkaSubscriber) subscribe() error {
// TODO
return nil
}
type callRequest struct {
// TODO
}
func (k *KafkaSubscriber) call(req *callRequest) error {
// TODO
return nil
}
func init() {
inputs.Add("cl_kafka_subscriber", func() telegraf.Input {
return &KafkaSubscriber{
Address: ":8877",
close: make(chan struct{}),
}
})
}
func tooLarge(res http.ResponseWriter) error {
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusRequestEntityTooLarge)
_, err := res.Write([]byte(`{"error":"http: request body too large"}`))
return err
}
func methodNotAllowed(res http.ResponseWriter) error {
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusMethodNotAllowed)
_, err := res.Write([]byte(`{"error":"http: method not allowed"}`))
return err
}
func badRequest(res http.ResponseWriter) error {
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusBadRequest)
_, err := res.Write([]byte(`{"error":"http: bad request"}`))
return err
}