fix(outputs.influxdb_v2): Fix panic and API error handling (#16388)
This commit is contained in:
parent
1de8e7afbe
commit
6ea22bdd83
|
|
@ -26,16 +26,17 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type APIError struct {
|
type APIError struct {
|
||||||
StatusCode int
|
Err error
|
||||||
Title string
|
StatusCode int
|
||||||
Description string
|
Retryable bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e APIError) Error() string {
|
func (e APIError) Error() string {
|
||||||
if e.Description != "" {
|
return e.Err.Error()
|
||||||
return fmt.Sprintf("%s: %s", e.Title, e.Description)
|
}
|
||||||
}
|
|
||||||
return e.Title
|
func (e APIError) Unwrap() error {
|
||||||
|
return e.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -185,7 +186,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
||||||
}
|
}
|
||||||
|
|
||||||
batches[bucket] = append(batches[bucket], metric)
|
batches[bucket] = append(batches[bucket], metric)
|
||||||
batchIndices[c.bucket] = append(batchIndices[c.bucket], i)
|
batchIndices[bucket] = append(batchIndices[bucket], i)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -201,10 +202,14 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
||||||
var apiErr *APIError
|
var apiErr *APIError
|
||||||
if errors.As(err, &apiErr) {
|
if errors.As(err, &apiErr) {
|
||||||
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
|
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
|
||||||
|
// TODO: Need a testcase to verify rejected metrics are not retried...
|
||||||
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
|
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
|
||||||
}
|
}
|
||||||
wErr.Err = err
|
wErr.Err = err
|
||||||
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...)
|
if !apiErr.Retryable {
|
||||||
|
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...)
|
||||||
|
}
|
||||||
|
// TODO: Clarify if we should continue here to try the remaining buckets?
|
||||||
return &wErr
|
return &wErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -301,11 +306,10 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
|
||||||
}
|
}
|
||||||
|
|
||||||
// We got an error and now try to decode further
|
// We got an error and now try to decode further
|
||||||
|
var desc string
|
||||||
writeResp := &genericRespError{}
|
writeResp := &genericRespError{}
|
||||||
err = json.NewDecoder(resp.Body).Decode(writeResp)
|
if json.NewDecoder(resp.Body).Decode(writeResp) == nil {
|
||||||
desc := writeResp.Error()
|
desc = ": " + writeResp.Error()
|
||||||
if err != nil {
|
|
||||||
desc = resp.Status
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch resp.StatusCode {
|
switch resp.StatusCode {
|
||||||
|
|
@ -313,9 +317,8 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
|
||||||
case http.StatusRequestEntityTooLarge:
|
case http.StatusRequestEntityTooLarge:
|
||||||
c.log.Errorf("Failed to write metric to %s, request was too large (413)", bucket)
|
c.log.Errorf("Failed to write metric to %s, request was too large (413)", bucket)
|
||||||
return &APIError{
|
return &APIError{
|
||||||
StatusCode: resp.StatusCode,
|
Err: fmt.Errorf("%s: %s", resp.Status, desc),
|
||||||
Title: resp.Status,
|
StatusCode: resp.StatusCode,
|
||||||
Description: desc,
|
|
||||||
}
|
}
|
||||||
case
|
case
|
||||||
// request was malformed:
|
// request was malformed:
|
||||||
|
|
@ -325,17 +328,13 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
|
||||||
http.StatusUnprocessableEntity,
|
http.StatusUnprocessableEntity,
|
||||||
http.StatusNotAcceptable:
|
http.StatusNotAcceptable:
|
||||||
|
|
||||||
// Clients should *not* repeat the request and the metrics should be dropped.
|
// Clients should *not* repeat the request and the metrics should be rejected.
|
||||||
rejected := make([]int, 0, len(metrics))
|
return &APIError{
|
||||||
for i := range len(metrics) {
|
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s)%s", bucket, resp.Status, desc),
|
||||||
rejected = append(rejected, i)
|
StatusCode: resp.StatusCode,
|
||||||
}
|
|
||||||
return &internal.PartialWriteError{
|
|
||||||
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
|
|
||||||
MetricsReject: rejected,
|
|
||||||
}
|
}
|
||||||
case http.StatusUnauthorized, http.StatusForbidden:
|
case http.StatusUnauthorized, http.StatusForbidden:
|
||||||
return fmt.Errorf("failed to write metric to %s (%s): %s", bucket, resp.Status, desc)
|
return fmt.Errorf("failed to write metric to %s (%s)%s", bucket, resp.Status, desc)
|
||||||
case http.StatusTooManyRequests,
|
case http.StatusTooManyRequests,
|
||||||
http.StatusServiceUnavailable,
|
http.StatusServiceUnavailable,
|
||||||
http.StatusBadGateway,
|
http.StatusBadGateway,
|
||||||
|
|
@ -351,26 +350,22 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
|
||||||
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
|
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
|
||||||
// retrying will not make the request magically work.
|
// retrying will not make the request magically work.
|
||||||
if len(resp.Status) > 0 && resp.Status[0] == '4' {
|
if len(resp.Status) > 0 && resp.Status[0] == '4' {
|
||||||
rejected := make([]int, 0, len(metrics))
|
return &APIError{
|
||||||
for i := range len(metrics) {
|
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s)%s", bucket, resp.Status, desc),
|
||||||
rejected = append(rejected, i)
|
StatusCode: resp.StatusCode,
|
||||||
}
|
|
||||||
return &internal.PartialWriteError{
|
|
||||||
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
|
|
||||||
MetricsReject: rejected,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is only until platform spec is fully implemented. As of the
|
// This is only until platform spec is fully implemented. As of the
|
||||||
// time of writing, there is no error body returned.
|
// time of writing, there is no error body returned.
|
||||||
if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" {
|
if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" {
|
||||||
desc = fmt.Sprintf("%s; %s", desc, xErr)
|
desc = fmt.Sprintf(": %s; %s", desc, xErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &APIError{
|
return &APIError{
|
||||||
StatusCode: resp.StatusCode,
|
Err: fmt.Errorf("failed to write metric to bucket %q: %s%s", bucket, resp.Status, desc),
|
||||||
Title: resp.Status,
|
StatusCode: resp.StatusCode,
|
||||||
Description: desc,
|
Retryable: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -199,11 +199,11 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
for _, n := range rand.Perm(len(i.clients)) {
|
for _, n := range rand.Perm(len(i.clients)) {
|
||||||
client := i.clients[n]
|
client := i.clients[n]
|
||||||
if err := client.Write(ctx, metrics); err != nil {
|
if err := client.Write(ctx, metrics); err != nil {
|
||||||
|
i.Log.Errorf("When writing to [%s]: %v", client.url, err)
|
||||||
var werr *internal.PartialWriteError
|
var werr *internal.PartialWriteError
|
||||||
if errors.As(err, &werr) || errors.Is(err, internal.ErrSizeLimitReached) {
|
if errors.As(err, &werr) || errors.Is(err, internal.ErrSizeLimitReached) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
i.Log.Errorf("When writing to [%s]: %v", client.url, err)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,13 @@
|
||||||
package influxdb_v2_test
|
package influxdb_v2_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
@ -486,3 +488,374 @@ func TestRateLimit(t *testing.T) {
|
||||||
require.NoError(t, plugin.Write(metrics[3:]))
|
require.NoError(t, plugin.Write(metrics[3:]))
|
||||||
require.Equal(t, uint64(121), received.Load())
|
require.Equal(t, uint64(121), received.Load())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStatusCodeNonRetryable4xx(t *testing.T) {
|
||||||
|
codes := []int{
|
||||||
|
// Explicitly checked non-retryable status codes
|
||||||
|
http.StatusBadRequest, http.StatusUnprocessableEntity, http.StatusNotAcceptable,
|
||||||
|
// Other non-retryable 4xx status codes not explicitly checked
|
||||||
|
http.StatusNotFound, http.StatusExpectationFailed,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, code := range codes {
|
||||||
|
t.Run(fmt.Sprintf("code %d", code), func(t *testing.T) {
|
||||||
|
// Setup a test server
|
||||||
|
ts := httptest.NewServer(
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
body, err := io.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if strings.Contains(string(body), "bucket=foo") {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(code)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
// Setup plugin and connect
|
||||||
|
plugin := &influxdb.InfluxDB{
|
||||||
|
URLs: []string{"http://" + ts.Listener.Addr().String()},
|
||||||
|
BucketTag: "bucket",
|
||||||
|
Log: &testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
require.NoError(t, plugin.Connect())
|
||||||
|
defer plugin.Close()
|
||||||
|
|
||||||
|
// Together the metric batch size is too big, split up, we get success
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "foo",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 0.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "my_bucket",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 1),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "my_bucket",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 43.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 2),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "foo",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 0.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 3),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the metrics the first time and check for the expected errors
|
||||||
|
err := plugin.Write(metrics)
|
||||||
|
require.ErrorContains(t, err, "failed to write metric to my_bucket (will be dropped:")
|
||||||
|
|
||||||
|
var apiErr *influxdb.APIError
|
||||||
|
require.ErrorAs(t, err, &apiErr)
|
||||||
|
require.Equal(t, code, apiErr.StatusCode)
|
||||||
|
|
||||||
|
var writeErr *internal.PartialWriteError
|
||||||
|
require.ErrorAs(t, err, &writeErr)
|
||||||
|
require.Len(t, writeErr.MetricsReject, 2, "rejected metrics")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatusCodeInvalidAuthentication(t *testing.T) {
|
||||||
|
codes := []int{http.StatusUnauthorized, http.StatusForbidden}
|
||||||
|
|
||||||
|
for _, code := range codes {
|
||||||
|
t.Run(fmt.Sprintf("code %d", code), func(t *testing.T) {
|
||||||
|
// Setup a test server
|
||||||
|
ts := httptest.NewServer(
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
body, err := io.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if strings.Contains(string(body), "bucket=foo") {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(code)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
// Setup plugin and connect
|
||||||
|
plugin := &influxdb.InfluxDB{
|
||||||
|
URLs: []string{"http://" + ts.Listener.Addr().String()},
|
||||||
|
BucketTag: "bucket",
|
||||||
|
Log: &testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
require.NoError(t, plugin.Connect())
|
||||||
|
defer plugin.Close()
|
||||||
|
|
||||||
|
// Together the metric batch size is too big, split up, we get success
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "foo",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 0.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "my_bucket",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 1),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "my_bucket",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 43.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 2),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "foo",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 0.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 3),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the metrics the first time and check for the expected errors
|
||||||
|
err := plugin.Write(metrics)
|
||||||
|
require.ErrorContains(t, err, "failed to write metric to my_bucket")
|
||||||
|
require.ErrorContains(t, err, strconv.Itoa(code))
|
||||||
|
|
||||||
|
var writeErr *internal.PartialWriteError
|
||||||
|
require.ErrorAs(t, err, &writeErr)
|
||||||
|
require.Empty(t, writeErr.MetricsReject, "rejected metrics")
|
||||||
|
require.LessOrEqual(t, len(writeErr.MetricsAccept), 2, "accepted metrics")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatusCodeServiceUnavailable(t *testing.T) {
|
||||||
|
codes := []int{
|
||||||
|
http.StatusTooManyRequests,
|
||||||
|
http.StatusServiceUnavailable,
|
||||||
|
http.StatusBadGateway,
|
||||||
|
http.StatusGatewayTimeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, code := range codes {
|
||||||
|
t.Run(fmt.Sprintf("code %d", code), func(t *testing.T) {
|
||||||
|
// Setup a test server
|
||||||
|
ts := httptest.NewServer(
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
body, err := io.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if strings.Contains(string(body), "bucket=foo") {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(code)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
// Setup plugin and connect
|
||||||
|
plugin := &influxdb.InfluxDB{
|
||||||
|
URLs: []string{"http://" + ts.Listener.Addr().String()},
|
||||||
|
BucketTag: "bucket",
|
||||||
|
Log: &testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
require.NoError(t, plugin.Connect())
|
||||||
|
defer plugin.Close()
|
||||||
|
|
||||||
|
// Together the metric batch size is too big, split up, we get success
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "foo",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 0.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "my_bucket",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 1),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "my_bucket",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 43.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 2),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "foo",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 0.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 3),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the metrics the first time and check for the expected errors
|
||||||
|
err := plugin.Write(metrics)
|
||||||
|
require.ErrorContains(t, err, "waiting 25ms for server (my_bucket) before sending metric again")
|
||||||
|
|
||||||
|
var writeErr *internal.PartialWriteError
|
||||||
|
require.ErrorAs(t, err, &writeErr)
|
||||||
|
require.Empty(t, writeErr.MetricsReject, "rejected metrics")
|
||||||
|
require.LessOrEqual(t, len(writeErr.MetricsAccept), 2, "accepted metrics")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatusCodeUnexpected(t *testing.T) {
|
||||||
|
codes := []int{http.StatusInternalServerError}
|
||||||
|
|
||||||
|
for _, code := range codes {
|
||||||
|
t.Run(fmt.Sprintf("code %d", code), func(t *testing.T) {
|
||||||
|
// Setup a test server
|
||||||
|
ts := httptest.NewServer(
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
body, err := io.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if strings.Contains(string(body), "bucket=foo") {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(code)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
// Setup plugin and connect
|
||||||
|
plugin := &influxdb.InfluxDB{
|
||||||
|
URLs: []string{"http://" + ts.Listener.Addr().String()},
|
||||||
|
BucketTag: "bucket",
|
||||||
|
Log: &testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
require.NoError(t, plugin.Connect())
|
||||||
|
defer plugin.Close()
|
||||||
|
|
||||||
|
// Together the metric batch size is too big, split up, we get success
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "foo",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 0.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "my_bucket",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 1),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "my_bucket",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 43.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 2),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"bucket": "foo",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 0.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 3),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the metrics the first time and check for the expected errors
|
||||||
|
err := plugin.Write(metrics)
|
||||||
|
require.ErrorContains(t, err, "failed to write metric to bucket \"my_bucket\"")
|
||||||
|
require.ErrorContains(t, err, strconv.Itoa(code))
|
||||||
|
|
||||||
|
var writeErr *internal.PartialWriteError
|
||||||
|
require.ErrorAs(t, err, &writeErr)
|
||||||
|
require.Empty(t, writeErr.MetricsReject, "rejected metrics")
|
||||||
|
require.LessOrEqual(t, len(writeErr.MetricsAccept), 2, "accepted metrics")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue