feat(outputs.influxdb_v2): Preserve custom query parameters on write (#15475)

This commit is contained in:
Greg 2024-06-07 09:51:33 -06:00 committed by GitHub
parent b79299975d
commit a758311ac4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 94 additions and 17 deletions

View File

@ -75,6 +75,7 @@ type httpClient struct {
client *http.Client client *http.Client
serializer *influx.Serializer serializer *influx.Serializer
url *url.URL url *url.URL
params url.Values
retryTime time.Time retryTime time.Time
retryCount int retryCount int
log telegraf.Logger log telegraf.Logger
@ -157,13 +158,19 @@ func NewHTTPClient(cfg *HTTPConfig) (*httpClient, error) {
return nil, fmt.Errorf("unsupported scheme %q", cfg.URL.Scheme) return nil, fmt.Errorf("unsupported scheme %q", cfg.URL.Scheme)
} }
preppedURL, params, err := prepareWriteURL(*cfg.URL, cfg.Organization)
if err != nil {
return nil, err
}
client := &httpClient{ client := &httpClient{
serializer: serializer, serializer: serializer,
client: &http.Client{ client: &http.Client{
Timeout: timeout, Timeout: timeout,
Transport: transport, Transport: transport,
}, },
url: cfg.URL, url: preppedURL,
params: params,
ContentEncoding: cfg.ContentEncoding, ContentEncoding: cfg.ContentEncoding,
Timeout: timeout, Timeout: timeout,
Headers: headers, Headers: headers,
@ -266,15 +273,10 @@ func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metr
} }
func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error { func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
loc, err := makeWriteURL(*c.url, c.Organization, bucket)
if err != nil {
return err
}
reader := c.requestBodyReader(metrics) reader := c.requestBodyReader(metrics)
defer reader.Close() defer reader.Close()
req, err := c.makeWriteRequest(loc, reader) req, err := c.makeWriteRequest(makeWriteURL(*c.url, c.params, bucket), reader)
if err != nil { if err != nil {
return err return err
} }
@ -426,11 +428,13 @@ func (c *httpClient) addHeaders(req *http.Request) {
} }
} }
func makeWriteURL(loc url.URL, org, bucket string) (string, error) { func makeWriteURL(loc url.URL, params url.Values, bucket string) string {
params := url.Values{}
params.Set("bucket", bucket) params.Set("bucket", bucket)
params.Set("org", org) loc.RawQuery = params.Encode()
return loc.String()
}
func prepareWriteURL(loc url.URL, org string) (*url.URL, url.Values, error) {
switch loc.Scheme { switch loc.Scheme {
case "unix": case "unix":
loc.Scheme = "http" loc.Scheme = "http"
@ -439,10 +443,13 @@ func makeWriteURL(loc url.URL, org, bucket string) (string, error) {
case "http", "https": case "http", "https":
loc.Path = path.Join(loc.Path, "/api/v2/write") loc.Path = path.Join(loc.Path, "/api/v2/write")
default: default:
return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) return nil, nil, fmt.Errorf("unsupported scheme: %q", loc.Scheme)
} }
loc.RawQuery = params.Encode()
return loc.String(), nil params := loc.Query()
params.Set("org", org)
return &loc, params, nil
} }
func (c *httpClient) Close() { func (c *httpClient) Close() {

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"path"
"testing" "testing"
"time" "time"
@ -20,14 +21,26 @@ func TestMakeWriteURL(t *testing.T) {
err bool err bool
url *url.URL url *url.URL
act string act string
bkt string
org string
}{ }{
{ {
url: genURL("http://localhost:9999"), url: genURL("http://localhost:9999"),
act: "http://localhost:9999/api/v2/write?bucket=telegraf&org=influx", act: "http://localhost:9999/api/v2/write?bucket=telegraf0&org=influx0",
bkt: "telegraf0",
org: "influx0",
},
{
url: genURL("http://localhost:9999?id=abc"),
act: "http://localhost:9999/api/v2/write?bucket=telegraf1&id=abc&org=influx1",
bkt: "telegraf1",
org: "influx1",
}, },
{ {
url: genURL("unix://var/run/influxd.sock"), url: genURL("unix://var/run/influxd.sock"),
act: "http://127.0.0.1/api/v2/write?bucket=telegraf&org=influx", act: "http://127.0.0.1/api/v2/write?bucket=telegraf2&org=influx2",
bkt: "telegraf2",
org: "influx2",
}, },
{ {
err: true, err: true,
@ -36,7 +49,7 @@ func TestMakeWriteURL(t *testing.T) {
} }
for i := range tests { for i := range tests {
rURL, err := makeWriteURL(*tests[i].url, "influx", "telegraf") rURL, params, err := prepareWriteURL(*tests[i].url, tests[i].org)
if !tests[i].err { if !tests[i].err {
require.NoError(t, err) require.NoError(t, err)
} else { } else {
@ -44,7 +57,9 @@ func TestMakeWriteURL(t *testing.T) {
t.Log(err) t.Log(err)
} }
if err == nil { if err == nil {
require.Equal(t, tests[i].act, rURL) for j := 0; j < 2; j++ {
require.Equal(t, tests[i].act, makeWriteURL(*rURL, params, tests[i].bkt))
}
} }
} }
} }
@ -98,3 +113,58 @@ func TestExponentialBackoffCalculationWithRetryAfter(t *testing.T) {
}) })
} }
} }
var (
bucket = "bkt"
org = "org"
loc, params, _ = prepareWriteURL(*genURL("http://localhost:8086"), org)
)
// goos: linux
// goarch: amd64
// pkg: github.com/influxdata/telegraf/plugins/outputs/influxdb_v2
// cpu: 11th Gen Intel(R) Core(TM) i7-11850H @ 2.50GHz
// BenchmarkOldMakeWriteURL
// BenchmarkOldMakeWriteURL-16 1556631 683.2 ns/op 424 B/op 14 allocs/op
// PASS
// ok github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 1.851s
func BenchmarkOldMakeWriteURL(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
oldMakeWriteURL(*loc, org, bucket)
}
}
// goos: linux
// goarch: amd64
// pkg: github.com/influxdata/telegraf/plugins/outputs/influxdb_v2
// cpu: 11th Gen Intel(R) Core(TM) i7-11850H @ 2.50GHz
// BenchmarkNewMakeWriteURL
// BenchmarkNewMakeWriteURL-16 2084415 496.5 ns/op 280 B/op 9 allocs/op
// PASS
// ok github.com/influxdata/telegraf/plugins/outputs/influxdb_v2 1.626s
func BenchmarkNewMakeWriteURL(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
makeWriteURL(*loc, params, bucket)
}
}
func oldMakeWriteURL(loc url.URL, org, bucket string) (string, error) {
params := url.Values{}
params.Set("bucket", bucket)
params.Set("org", org)
switch loc.Scheme {
case "unix":
loc.Scheme = "http"
loc.Host = "127.0.0.1"
loc.Path = "/api/v2/write"
case "http", "https":
loc.Path = path.Join(loc.Path, "/api/v2/write")
default:
return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme)
}
loc.RawQuery = params.Encode()
return loc.String(), nil
}