telegraf/plugins/outputs/influxdb_v2/http_test.go

284 lines
5.6 KiB
Go
Raw Normal View History

2018-09-13 05:48:59 +08:00
package influxdb_v2_test
import (
"context"
"io"
"net/http"
"net/http/httptest"
2018-09-13 05:48:59 +08:00
"net/url"
"testing"
"time"
2018-09-13 05:48:59 +08:00
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
2018-09-13 05:48:59 +08:00
influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2"
"github.com/influxdata/telegraf/testutil"
2018-09-13 05:48:59 +08:00
)
func genURL(u string) *url.URL {
//nolint:errcheck // known test urls
address, _ := url.Parse(u)
return address
2018-09-13 05:48:59 +08:00
}
func TestNewHTTPClient(t *testing.T) {
tests := []struct {
err bool
cfg *influxdb.HTTPConfig
}{
{
err: true,
cfg: &influxdb.HTTPConfig{},
},
{
err: true,
cfg: &influxdb.HTTPConfig{
URL: genURL("udp://localhost:9999"),
2018-09-13 05:48:59 +08:00
},
},
{
cfg: &influxdb.HTTPConfig{
URL: genURL("unix://var/run/influxd.sock"),
},
},
{
cfg: &influxdb.HTTPConfig{
URL: genURL("unix://var/run/influxd.sock"),
PingTimeout: config.Duration(15 * time.Second),
ReadIdleTimeout: config.Duration(30 * time.Second),
},
},
2018-09-13 05:48:59 +08:00
}
for i := range tests {
client, err := influxdb.NewHTTPClient(tests[i].cfg)
if !tests[i].err {
require.NoError(t, err)
} else {
require.Error(t, err)
t.Log(err)
}
if err == nil {
client.URL()
}
}
}
func TestWrite(t *testing.T) {
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/write":
err := r.ParseForm()
require.NoError(t, err)
require.Equal(t, []string{"foobar"}, r.Form["bucket"])
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
require.Contains(t, string(body), "cpu value=42.123")
w.WriteHeader(http.StatusNoContent)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
}),
)
defer ts.Close()
addr := &url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
}
cfg := &influxdb.HTTPConfig{
URL: addr,
Bucket: "telegraf",
BucketTag: "bucket",
ExcludeBucketTag: true,
PingTimeout: config.Duration(15 * time.Second),
ReadIdleTimeout: config.Duration(30 * time.Second),
}
client, err := influxdb.NewHTTPClient(cfg)
require.NoError(t, err)
metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"bucket": "foobar",
},
map[string]interface{}{
"value": 42.123,
},
time.Unix(0, 0),
),
}
ctx := context.Background()
err = client.Write(ctx, metrics)
require.NoError(t, err)
err = client.Write(ctx, metrics)
require.NoError(t, err)
}
func TestWriteBucketTagWorksOnRetry(t *testing.T) {
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/write":
err := r.ParseForm()
require.NoError(t, err)
require.Equal(t, []string{"foo"}, r.Form["bucket"])
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
require.Contains(t, string(body), "cpu value=42")
w.WriteHeader(http.StatusNoContent)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
}),
)
defer ts.Close()
addr := &url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
}
cfg := &influxdb.HTTPConfig{
URL: addr,
Bucket: "telegraf",
BucketTag: "bucket",
ExcludeBucketTag: true,
}
client, err := influxdb.NewHTTPClient(cfg)
require.NoError(t, err)
metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"bucket": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
}
ctx := context.Background()
err = client.Write(ctx, metrics)
require.NoError(t, err)
err = client.Write(ctx, metrics)
require.NoError(t, err)
}
func TestTooLargeWriteRetry(t *testing.T) {
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/write":
err := r.ParseForm()
require.NoError(t, err)
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
// Ensure metric body size is small
if len(body) > 16 {
w.WriteHeader(http.StatusRequestEntityTooLarge)
} else {
w.WriteHeader(http.StatusNoContent)
}
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
}),
)
defer ts.Close()
addr := &url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
}
cfg := &influxdb.HTTPConfig{
URL: addr,
Bucket: "telegraf",
BucketTag: "bucket",
ExcludeBucketTag: true,
2021-11-25 04:57:14 +08:00
Log: testutil.Logger{},
}
client, err := influxdb.NewHTTPClient(cfg)
require.NoError(t, err)
// Together the metric batch size is too big, split up, we get success
metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"bucket": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
testutil.MustMetric(
"cpu",
map[string]string{
"bucket": "bar",
},
map[string]interface{}{
"value": 99.0,
},
time.Unix(0, 0),
),
}
ctx := context.Background()
err = client.Write(ctx, metrics)
require.NoError(t, err)
// These metrics are too big, even after splitting in half, expect error
hugeMetrics := []telegraf.Metric{
testutil.MustMetric(
"reallyLargeMetric",
map[string]string{
"bucket": "foobar",
},
map[string]interface{}{
"value": 123.456,
},
time.Unix(0, 0),
),
testutil.MustMetric(
"evenBiggerMetric",
map[string]string{
"bucket": "fizzbuzzbang",
},
map[string]interface{}{
"value": 999.999,
},
time.Unix(0, 0),
),
}
err = client.Write(ctx, hugeMetrics)
require.Error(t, err)
}