From de6c2f74d68b0588311809a367e3982e8268bcfe Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Wed, 24 Nov 2021 12:03:55 -0700 Subject: [PATCH] feat: add retry to 413 errors with InfluxDB output (#10130) --- plugins/outputs/influxdb_v2/http.go | 33 +++++++- plugins/outputs/influxdb_v2/http_test.go | 98 ++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 2 deletions(-) diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index c07658025..098ebd9dd 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -177,6 +177,12 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error if c.BucketTag == "" { err := c.writeBatch(ctx, c.Bucket, metrics) if err != nil { + if err, ok := err.(*APIError); ok { + if err.StatusCode == http.StatusRequestEntityTooLarge { + return c.splitAndWriteBatch(ctx, c.Bucket, metrics) + } + } + return err } } else { @@ -203,6 +209,12 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error for bucket, batch := range batches { err := c.writeBatch(ctx, bucket, batch) if err != nil { + if err, ok := err.(*APIError); ok { + if err.StatusCode == http.StatusRequestEntityTooLarge { + return c.splitAndWriteBatch(ctx, c.Bucket, metrics) + } + } + return err } } @@ -210,6 +222,17 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error return nil } +func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error { + log.Printf("W! [outputs.influxdb_v2] Retrying write after splitting metric payload in half to reduce batch size") + midpoint := len(metrics) / 2 + + if err := c.writeBatch(ctx, bucket, metrics[:midpoint]); err != nil { + return err + } + + return c.writeBatch(ctx, bucket, metrics[midpoint:]) +} + func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error { loc, err := makeWriteURL(*c.url, c.Organization, bucket) if err != nil { @@ -257,11 +280,17 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te } switch resp.StatusCode { + // request was too large, send back to try again + case http.StatusRequestEntityTooLarge: + log.Printf("E! [outputs.influxdb_v2] Failed to write metric, request was too large (413)") + return &APIError{ + StatusCode: resp.StatusCode, + Title: resp.Status, + Description: desc, + } case // request was malformed: http.StatusBadRequest, - // request was too large: - http.StatusRequestEntityTooLarge, // request was received but server refused to process it due to a semantic problem with the request. // for example, submitting metrics outside the retention period. // Clients should *not* repeat the request and the metrics should be dropped. diff --git a/plugins/outputs/influxdb_v2/http_test.go b/plugins/outputs/influxdb_v2/http_test.go index 0637cd806..e44729eec 100644 --- a/plugins/outputs/influxdb_v2/http_test.go +++ b/plugins/outputs/influxdb_v2/http_test.go @@ -111,3 +111,101 @@ func TestWriteBucketTagWorksOnRetry(t *testing.T) { err = client.Write(ctx, metrics) require.NoError(t, err) } + +func TestTooLargeWriteRetry(t *testing.T) { + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/v2/write": + err := r.ParseForm() + require.NoError(t, err) + + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + + // Ensure metric body size is small + if len(body) > 16 { + w.WriteHeader(http.StatusRequestEntityTooLarge) + } else { + w.WriteHeader(http.StatusNoContent) + } + + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }), + ) + defer ts.Close() + + addr := &url.URL{ + Scheme: "http", + Host: ts.Listener.Addr().String(), + } + + config := &influxdb.HTTPConfig{ + URL: addr, + Bucket: "telegraf", + BucketTag: "bucket", + ExcludeBucketTag: true, + } + + client, err := influxdb.NewHTTPClient(config) + require.NoError(t, err) + + // Together the metric batch size is too big, split up, we get success + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "cpu", + map[string]string{ + "bucket": "bar", + }, + map[string]interface{}{ + "value": 99.0, + }, + time.Unix(0, 0), + ), + } + + ctx := context.Background() + err = client.Write(ctx, metrics) + require.NoError(t, err) + + // These metrics are too big, even after splitting in half, expect error + hugeMetrics := []telegraf.Metric{ + testutil.MustMetric( + "reallyLargeMetric", + map[string]string{ + "bucket": "foobar", + }, + map[string]interface{}{ + "value": 123.456, + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "evenBiggerMetric", + map[string]string{ + "bucket": "fizzbuzzbang", + }, + map[string]interface{}{ + "value": 999.999, + }, + time.Unix(0, 0), + ), + } + + err = client.Write(ctx, hugeMetrics) + require.Error(t, err) +}