feat: add retry to 413 errors with InfluxDB output (#10130)

This commit is contained in:
Joshua Powers 2021-11-24 12:03:55 -07:00 committed by GitHub
parent 79e479c691
commit de6c2f74d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 129 additions and 2 deletions

View File

@ -177,6 +177,12 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
if c.BucketTag == "" {
err := c.writeBatch(ctx, c.Bucket, metrics)
if err != nil {
if err, ok := err.(*APIError); ok {
if err.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.Bucket, metrics)
}
}
return err
}
} else {
@ -203,6 +209,12 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
for bucket, batch := range batches {
err := c.writeBatch(ctx, bucket, batch)
if err != nil {
if err, ok := err.(*APIError); ok {
if err.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.Bucket, metrics)
}
}
return err
}
}
@ -210,6 +222,17 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
return nil
}
func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
log.Printf("W! [outputs.influxdb_v2] Retrying write after splitting metric payload in half to reduce batch size")
midpoint := len(metrics) / 2
if err := c.writeBatch(ctx, bucket, metrics[:midpoint]); err != nil {
return err
}
return c.writeBatch(ctx, bucket, metrics[midpoint:])
}
func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
loc, err := makeWriteURL(*c.url, c.Organization, bucket)
if err != nil {
@ -257,11 +280,17 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
}
switch resp.StatusCode {
// request was too large, send back to try again
case http.StatusRequestEntityTooLarge:
log.Printf("E! [outputs.influxdb_v2] Failed to write metric, request was too large (413)")
return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Description: desc,
}
case
// request was malformed:
http.StatusBadRequest,
// request was too large:
http.StatusRequestEntityTooLarge,
// request was received but server refused to process it due to a semantic problem with the request.
// for example, submitting metrics outside the retention period.
// Clients should *not* repeat the request and the metrics should be dropped.

View File

@ -111,3 +111,101 @@ func TestWriteBucketTagWorksOnRetry(t *testing.T) {
err = client.Write(ctx, metrics)
require.NoError(t, err)
}
func TestTooLargeWriteRetry(t *testing.T) {
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/write":
err := r.ParseForm()
require.NoError(t, err)
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
// Ensure metric body size is small
if len(body) > 16 {
w.WriteHeader(http.StatusRequestEntityTooLarge)
} else {
w.WriteHeader(http.StatusNoContent)
}
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
}),
)
defer ts.Close()
addr := &url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
}
config := &influxdb.HTTPConfig{
URL: addr,
Bucket: "telegraf",
BucketTag: "bucket",
ExcludeBucketTag: true,
}
client, err := influxdb.NewHTTPClient(config)
require.NoError(t, err)
// Together the metric batch size is too big, split up, we get success
metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"bucket": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
testutil.MustMetric(
"cpu",
map[string]string{
"bucket": "bar",
},
map[string]interface{}{
"value": 99.0,
},
time.Unix(0, 0),
),
}
ctx := context.Background()
err = client.Write(ctx, metrics)
require.NoError(t, err)
// These metrics are too big, even after splitting in half, expect error
hugeMetrics := []telegraf.Metric{
testutil.MustMetric(
"reallyLargeMetric",
map[string]string{
"bucket": "foobar",
},
map[string]interface{}{
"value": 123.456,
},
time.Unix(0, 0),
),
testutil.MustMetric(
"evenBiggerMetric",
map[string]string{
"bucket": "fizzbuzzbang",
},
map[string]interface{}{
"value": 999.999,
},
time.Unix(0, 0),
),
}
err = client.Write(ctx, hugeMetrics)
require.Error(t, err)
}