[outputs.influxdb_v2] add exponential backoff, and respect client error responses (#8662)

* [outputs.influxdb_v2] add exponential backoff, and respect client error responses

* add test

* Update to 60 seconds

* fix test
This commit is contained in:
Steven Soroka 2021-01-27 16:07:42 -05:00 committed by GitHub
parent cbe99ef596
commit 9c7cf99fa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 27 deletions

View File

@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"log"
"math"
"net"
"net/http"
"net/url"
@ -36,7 +37,7 @@ func (e APIError) Error() string {
const (
defaultRequestTimeout = time.Second * 5
defaultMaxWait = 10 // seconds
defaultMaxWait = 60 // seconds
defaultDatabase = "telegraf"
)
@ -70,6 +71,7 @@ type httpClient struct {
serializer *influx.Serializer
url *url.URL
retryTime time.Time
retryCount int
}
func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
@ -233,7 +235,18 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
switch resp.StatusCode {
case
// this is the expected response:
http.StatusNoContent,
// but if we get these we should still accept it as delivered:
http.StatusOK,
http.StatusCreated,
http.StatusAccepted,
http.StatusPartialContent,
http.StatusMultiStatus,
http.StatusAlreadyReported:
c.retryCount = 0
return nil
}
@ -245,33 +258,37 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
}
switch resp.StatusCode {
case http.StatusBadRequest, http.StatusRequestEntityTooLarge:
log.Printf("E! [outputs.influxdb_v2] Failed to write metric: %s\n", desc)
case
// request was malformed:
http.StatusBadRequest,
// request was too large:
http.StatusRequestEntityTooLarge,
// request was received but server refused to process it due to a semantic problem with the request.
// for example, submitting metrics outside the retention period.
// Clients should *not* repeat the request and the metrics should be dropped.
http.StatusUnprocessableEntity,
http.StatusNotAcceptable:
log.Printf("E! [outputs.influxdb_v2] Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc)
return nil
case http.StatusUnauthorized, http.StatusForbidden:
return fmt.Errorf("failed to write metric: %s", desc)
case http.StatusTooManyRequests:
retryAfter := resp.Header.Get("Retry-After")
retry, err := strconv.Atoi(retryAfter)
if err != nil {
return errors.New("rate limit exceeded")
}
if retry > defaultMaxWait {
retry = defaultMaxWait
}
c.retryTime = time.Now().Add(time.Duration(retry) * time.Second)
return fmt.Errorf("waiting %ds for server before sending metric again", retry)
case http.StatusServiceUnavailable:
retryAfter := resp.Header.Get("Retry-After")
retry, err := strconv.Atoi(retryAfter)
if err != nil {
return errors.New("server responded: service unavailable")
}
if retry > defaultMaxWait {
retry = defaultMaxWait
}
c.retryTime = time.Now().Add(time.Duration(retry) * time.Second)
return fmt.Errorf("waiting %ds for server before sending metric again", retry)
return fmt.Errorf("failed to write metric (%s): %s", resp.Status, desc)
case http.StatusTooManyRequests,
http.StatusServiceUnavailable,
http.StatusBadGateway,
http.StatusGatewayTimeout:
// ^ these handle the cases where the server is likely overloaded, and may not be able to say so.
c.retryCount++
retryDuration := c.getRetryDuration(resp.Header)
c.retryTime = time.Now().Add(retryDuration)
log.Printf("W! [outputs.influxdb_v2] Failed to write; will retry in %s. (%s)\n", retryDuration, resp.Status)
return fmt.Errorf("waiting %s for server before sending metric again", retryDuration)
}
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
// retrying will not make the request magically work.
if len(resp.Status) > 0 && resp.Status[0] == '4' {
log.Printf("E! [outputs.influxdb_v2] Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc)
return nil
}
// This is only until platform spec is fully implemented. As of the
@ -287,6 +304,29 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
}
}
// retryDuration takes the longer of the Retry-After header and our own back-off calculation
func (c *httpClient) getRetryDuration(headers http.Header) time.Duration {
// basic exponential backoff (x^2)/40 (denominator to widen the slope)
// at 40 denominator, it'll take 35 retries to hit the max defaultMaxWait of 30s
backoff := math.Pow(float64(c.retryCount), 2) / 40
// get any value from the header, if available
retryAfterHeader := float64(0)
retryAfterHeaderString := headers.Get("Retry-After")
if len(retryAfterHeaderString) > 0 {
var err error
retryAfterHeader, err = strconv.ParseFloat(retryAfterHeaderString, 64)
if err != nil {
// there was a value but we couldn't parse it? guess minimum 10 sec
retryAfterHeader = 10
}
}
// take the highest value from both, but not over the max wait.
retry := math.Max(backoff, retryAfterHeader)
retry = math.Min(retry, defaultMaxWait)
return time.Duration(retry) * time.Second
}
func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
var err error

View File

@ -1,8 +1,11 @@
package influxdb_v2
import (
"fmt"
"net/http"
"net/url"
"testing"
"time"
"github.com/stretchr/testify/require"
)
@ -45,3 +48,27 @@ func TestMakeWriteURL(t *testing.T) {
}
}
}
func TestExponentialBackoffCalculation(t *testing.T) {
c := &httpClient{}
tests := []struct {
retryCount int
expected time.Duration
}{
{retryCount: 0, expected: 0},
{retryCount: 1, expected: 0},
{retryCount: 5, expected: 0},
{retryCount: 10, expected: 2 * time.Second},
{retryCount: 30, expected: 22 * time.Second},
{retryCount: 40, expected: 40 * time.Second},
{retryCount: 50, expected: 60 * time.Second},
{retryCount: 100, expected: 60 * time.Second},
{retryCount: 1000, expected: 60 * time.Second},
}
for _, test := range tests {
t.Run(fmt.Sprintf("%d_retries", test.retryCount), func(t *testing.T) {
c.retryCount = test.retryCount
require.EqualValues(t, test.expected, c.getRetryDuration(http.Header{}))
})
}
}