feat(inputs.influxdb_v2_listener): Add support for rate limiting (#15361)

This commit is contained in:
Lars Stegman 2024-05-16 15:15:52 +02:00 committed by GitHub
parent 5607934f2c
commit dcb6177263
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 154 additions and 11 deletions

View File

@ -40,6 +40,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## (Double check the port. Could be 9999 if using OSS Beta)
service_address = ":8086"
## Maximum undelivered metrics before rate limit kicks in.
## When the rate limit kicks in, HTTP status 429 will be returned.
## 0 disables rate limiting
# max_undelivered_metrics = 0
## Maximum duration before timing out read of the request
# read_timeout = "10s"
## Maximum duration before timing out write of the response

View File

@ -13,6 +13,8 @@ import (
"net"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/telegraf"
@ -52,19 +54,27 @@ type InfluxDBV2Listener struct {
port int
tlsint.ServerConfig
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MaxBodySize config.Size `toml:"max_body_size"`
Token string `toml:"token"`
BucketTag string `toml:"bucket_tag"`
ParserType string `toml:"parser_type"`
MaxUndeliveredMetrics int `toml:"max_undelivered_metrics"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MaxBodySize config.Size `toml:"max_body_size"`
Token string `toml:"token"`
BucketTag string `toml:"bucket_tag"`
ParserType string `toml:"parser_type"`
ctx context.Context
cancel context.CancelFunc
trackingMetricCount map[telegraf.TrackingID]int64
countLock sync.Mutex
totalUndeliveredMetrics atomic.Int64
timeFunc influx.TimeFunc
listener net.Listener
server http.Server
acc telegraf.Accumulator
acc telegraf.Accumulator
trackingAcc telegraf.TrackingAccumulator
bytesRecv selfstat.Stat
requestsServed selfstat.Stat
@ -135,6 +145,26 @@ func (h *InfluxDBV2Listener) Init() error {
// Start starts the InfluxDB listener service.
func (h *InfluxDBV2Listener) Start(acc telegraf.Accumulator) error {
h.acc = acc
h.ctx, h.cancel = context.WithCancel(context.Background())
if h.MaxUndeliveredMetrics > 0 {
h.trackingAcc = h.acc.WithTracking(h.MaxUndeliveredMetrics)
h.trackingMetricCount = make(map[telegraf.TrackingID]int64, h.MaxUndeliveredMetrics)
go func() {
for {
select {
case <-h.ctx.Done():
return
case info := <-h.trackingAcc.Delivered():
if count, ok := h.trackingMetricCount[info.ID()]; ok {
h.countLock.Lock()
h.totalUndeliveredMetrics.Add(-count)
delete(h.trackingMetricCount, info.ID())
h.countLock.Unlock()
}
}
}
}()
}
tlsConf, err := h.ServerConfig.TLSConfig()
if err != nil {
@ -180,6 +210,7 @@ func (h *InfluxDBV2Listener) Start(acc telegraf.Accumulator) error {
// Stop cleans up all resources
func (h *InfluxDBV2Listener) Stop() {
h.cancel()
err := h.server.Shutdown(context.Background())
if err != nil {
h.Log.Infof("Error shutting down HTTP server: %v", err.Error())
@ -219,6 +250,7 @@ func (h *InfluxDBV2Listener) handleDefault() http.HandlerFunc {
func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
defer h.writesServed.Incr(1)
// Check that the content length is not too large for us to handle.
if req.ContentLength > int64(h.MaxBodySize) {
if err := tooLarge(res, int64(h.MaxBodySize)); err != nil {
@ -308,15 +340,50 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
if h.BucketTag != "" && bucket != "" {
m.AddTag(h.BucketTag, bucket)
}
h.acc.AddMetric(m)
}
// http request success
res.WriteHeader(http.StatusNoContent)
if h.MaxUndeliveredMetrics > 0 {
h.writeWithTracking(res, metrics)
} else {
h.write(res, metrics)
}
}
}
func (h *InfluxDBV2Listener) writeWithTracking(res http.ResponseWriter, metrics []telegraf.Metric) {
if len(metrics) > h.MaxUndeliveredMetrics {
res.WriteHeader(http.StatusRequestEntityTooLarge)
h.Log.Debugf("status %d, always rejecting batch of %d metrics: larger than max_undelivered_metrics %d",
http.StatusRequestEntityTooLarge, len(metrics), h.MaxUndeliveredMetrics)
return
}
pending := h.totalUndeliveredMetrics.Load()
remainingUndeliveredMetrics := int64(h.MaxUndeliveredMetrics) - pending
if int64(len(metrics)) > remainingUndeliveredMetrics {
res.WriteHeader(http.StatusTooManyRequests)
h.Log.Debugf("status %d, rejecting batch of %d metrics: larger than remaining undelivered metrics %d",
http.StatusTooManyRequests, len(metrics), remainingUndeliveredMetrics)
return
}
h.countLock.Lock()
trackingID := h.trackingAcc.AddTrackingMetricGroup(metrics)
h.trackingMetricCount[trackingID] = int64(len(metrics))
h.totalUndeliveredMetrics.Add(int64(len(metrics)))
h.countLock.Unlock()
res.WriteHeader(http.StatusNoContent)
}
func (h *InfluxDBV2Listener) write(res http.ResponseWriter, metrics []telegraf.Metric) {
for _, m := range metrics {
h.acc.AddMetric(m)
}
res.WriteHeader(http.StatusNoContent)
}
func tooLarge(res http.ResponseWriter, maxLength int64) error {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Error", "http: request body too large")

View File

@ -67,6 +67,12 @@ func newTestAuthListener() *InfluxDBV2Listener {
return listener
}
func newRateLimitedTestListener(maxUndeliveredMetrics int) *InfluxDBV2Listener {
listener := newTestListener()
listener.MaxUndeliveredMetrics = maxUndeliveredMetrics
return listener
}
func newTestSecureListener() *InfluxDBV2Listener {
listener := &InfluxDBV2Listener{
Log: testutil.Logger{},
@ -599,4 +605,64 @@ func TestWriteWithPrecisionNoTimestamp(t *testing.T) {
require.Equal(t, time.Unix(42, 0), acc.Metrics[0].Time)
}
func TestRateLimitedConnectionDropsSecondRequest(t *testing.T) {
listener := newRateLimitedTestListener(1)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()
msg := "xyzzy value=42\n"
postURL := createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s")
resp, err := http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)
resp, err = http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 429, resp.StatusCode)
}
func TestRateLimitedConnectionAcceptsNewRequestOnDelivery(t *testing.T) {
listener := newRateLimitedTestListener(1)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()
msg := "xyzzy value=42\n"
postURL := createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s")
resp, err := http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)
ms := acc.GetTelegrafMetrics()
for _, m := range ms {
m.Accept()
}
resp, err = http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)
}
func TestRateLimitedConnectionRejectsBatchesLargerThanMaxUndeliveredMetrics(t *testing.T) {
listener := newRateLimitedTestListener(1)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()
msg := "xyzzy value=42\nxyzzy value=43"
postURL := createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s")
resp, err := http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 413, resp.StatusCode)
}
// The term 'master_repl' used here is archaic language from redis

View File

@ -4,6 +4,11 @@
## (Double check the port. Could be 9999 if using OSS Beta)
service_address = ":8086"
## Maximum undelivered metrics before rate limit kicks in.
## When the rate limit kicks in, HTTP status 429 will be returned.
## 0 disables rate limiting
# max_undelivered_metrics = 0
## Maximum duration before timing out read of the request
# read_timeout = "10s"
## Maximum duration before timing out write of the response