From a758311ac45df398e48ec8d384d478b6e748721c Mon Sep 17 00:00:00 2001 From: Greg <2653109+glinton@users.noreply.github.com> Date: Fri, 7 Jun 2024 09:51:33 -0600 Subject: [PATCH] feat(outputs.influxdb_v2): Preserve custom query parameters on write (#15475) --- plugins/outputs/influxdb_v2/http.go | 33 ++++---- .../outputs/influxdb_v2/http_internal_test.go | 78 ++++++++++++++++++- 2 files changed, 94 insertions(+), 17 deletions(-) diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 4cfd19727..ec99af4ec 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -75,6 +75,7 @@ type httpClient struct { client *http.Client serializer *influx.Serializer url *url.URL + params url.Values retryTime time.Time retryCount int log telegraf.Logger @@ -157,13 +158,19 @@ func NewHTTPClient(cfg *HTTPConfig) (*httpClient, error) { return nil, fmt.Errorf("unsupported scheme %q", cfg.URL.Scheme) } + preppedURL, params, err := prepareWriteURL(*cfg.URL, cfg.Organization) + if err != nil { + return nil, err + } + client := &httpClient{ serializer: serializer, client: &http.Client{ Timeout: timeout, Transport: transport, }, - url: cfg.URL, + url: preppedURL, + params: params, ContentEncoding: cfg.ContentEncoding, Timeout: timeout, Headers: headers, @@ -266,15 +273,10 @@ func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metr } func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error { - loc, err := makeWriteURL(*c.url, c.Organization, bucket) - if err != nil { - return err - } - reader := c.requestBodyReader(metrics) defer reader.Close() - req, err := c.makeWriteRequest(loc, reader) + req, err := c.makeWriteRequest(makeWriteURL(*c.url, c.params, bucket), reader) if err != nil { return err } @@ -426,11 +428,13 @@ func (c *httpClient) addHeaders(req *http.Request) { } } -func makeWriteURL(loc url.URL, org, bucket string) (string, error) { - params := url.Values{} +func makeWriteURL(loc url.URL, params url.Values, bucket string) string { params.Set("bucket", bucket) - params.Set("org", org) + loc.RawQuery = params.Encode() + return loc.String() +} +func prepareWriteURL(loc url.URL, org string) (*url.URL, url.Values, error) { switch loc.Scheme { case "unix": loc.Scheme = "http" @@ -439,10 +443,13 @@ func makeWriteURL(loc url.URL, org, bucket string) (string, error) { case "http", "https": loc.Path = path.Join(loc.Path, "/api/v2/write") default: - return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) + return nil, nil, fmt.Errorf("unsupported scheme: %q", loc.Scheme) } - loc.RawQuery = params.Encode() - return loc.String(), nil + + params := loc.Query() + params.Set("org", org) + + return &loc, params, nil } func (c *httpClient) Close() { diff --git a/plugins/outputs/influxdb_v2/http_internal_test.go b/plugins/outputs/influxdb_v2/http_internal_test.go index 96a11324c..e9d9bfe79 100644 --- a/plugins/outputs/influxdb_v2/http_internal_test.go +++ b/plugins/outputs/influxdb_v2/http_internal_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "net/url" + "path" "testing" "time" @@ -20,14 +21,26 @@ func TestMakeWriteURL(t *testing.T) { err bool url *url.URL act string + bkt string + org string }{ { url: genURL("http://localhost:9999"), - act: "http://localhost:9999/api/v2/write?bucket=telegraf&org=influx", + act: "http://localhost:9999/api/v2/write?bucket=telegraf0&org=influx0", + bkt: "telegraf0", + org: "influx0", + }, + { + url: genURL("http://localhost:9999?id=abc"), + act: "http://localhost:9999/api/v2/write?bucket=telegraf1&id=abc&org=influx1", + bkt: "telegraf1", + org: "influx1", }, { url: genURL("unix://var/run/influxd.sock"), - act: "http://127.0.0.1/api/v2/write?bucket=telegraf&org=influx", + act: "http://127.0.0.1/api/v2/write?bucket=telegraf2&org=influx2", + bkt: "telegraf2", + org: "influx2", }, { err: true, @@ -36,7 +49,7 @@ func TestMakeWriteURL(t *testing.T) { } for i := range tests { - rURL, err := makeWriteURL(*tests[i].url, "influx", "telegraf") + rURL, params, err := prepareWriteURL(*tests[i].url, tests[i].org) if !tests[i].err { require.NoError(t, err) } else { @@ -44,7 +57,9 @@ func TestMakeWriteURL(t *testing.T) { t.Log(err) } if err == nil { - require.Equal(t, tests[i].act, rURL) + for j := 0; j < 2; j++ { + require.Equal(t, tests[i].act, makeWriteURL(*rURL, params, tests[i].bkt)) + } } } } @@ -98,3 +113,58 @@ func TestExponentialBackoffCalculationWithRetryAfter(t *testing.T) { }) } } + +var ( + bucket = "bkt" + org = "org" + loc, params, _ = prepareWriteURL(*genURL("http://localhost:8086"), org) +) + +// goos: linux +// goarch: amd64 +// pkg: github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 +// cpu: 11th Gen Intel(R) Core(TM) i7-11850H @ 2.50GHz +// BenchmarkOldMakeWriteURL +// BenchmarkOldMakeWriteURL-16 1556631 683.2 ns/op 424 B/op 14 allocs/op +// PASS +// ok github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 1.851s +func BenchmarkOldMakeWriteURL(b *testing.B) { + b.ReportAllocs() + for n := 0; n < b.N; n++ { + oldMakeWriteURL(*loc, org, bucket) + } +} + +// goos: linux +// goarch: amd64 +// pkg: github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 +// cpu: 11th Gen Intel(R) Core(TM) i7-11850H @ 2.50GHz +// BenchmarkNewMakeWriteURL +// BenchmarkNewMakeWriteURL-16 2084415 496.5 ns/op 280 B/op 9 allocs/op +// PASS +// ok github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 1.626s +func BenchmarkNewMakeWriteURL(b *testing.B) { + b.ReportAllocs() + for n := 0; n < b.N; n++ { + makeWriteURL(*loc, params, bucket) + } +} + +func oldMakeWriteURL(loc url.URL, org, bucket string) (string, error) { + params := url.Values{} + params.Set("bucket", bucket) + params.Set("org", org) + + switch loc.Scheme { + case "unix": + loc.Scheme = "http" + loc.Host = "127.0.0.1" + loc.Path = "/api/v2/write" + case "http", "https": + loc.Path = path.Join(loc.Path, "/api/v2/write") + default: + return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) + } + loc.RawQuery = params.Encode() + return loc.String(), nil +}