feat(outputs.influxdb_v2): Support secrets in http_headers values (#16746)
This commit is contained in:
parent
8c5114b067
commit
73d092a566
|
|
@ -19,9 +19,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
|
|
||||||
## Secret-store support
|
## Secret-store support
|
||||||
|
|
||||||
This plugin supports secrets from secret-stores for the `token` option.
|
This plugin supports secrets from secret-stores for the `token` and
|
||||||
See the [secret-store documentation][SECRETSTORE] for more details on how
|
`http_headers` option. See the [secret-store documentation][SECRETSTORE] for
|
||||||
to use them.
|
more details on how to use them.
|
||||||
|
|
||||||
[SECRETSTORE]: ../../../docs/CONFIGURATION.md#secret-store-secrets
|
[SECRETSTORE]: ../../../docs/CONFIGURATION.md#secret-store-secrets
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ type httpClient struct {
|
||||||
bucketTag string
|
bucketTag string
|
||||||
excludeBucketTag bool
|
excludeBucketTag bool
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
headers map[string]string
|
headers map[string]*config.Secret
|
||||||
proxy *url.URL
|
proxy *url.URL
|
||||||
userAgent string
|
userAgent string
|
||||||
contentEncoding string
|
contentEncoding string
|
||||||
|
|
@ -72,11 +72,12 @@ type httpClient struct {
|
||||||
|
|
||||||
func (c *httpClient) Init() error {
|
func (c *httpClient) Init() error {
|
||||||
if c.headers == nil {
|
if c.headers == nil {
|
||||||
c.headers = make(map[string]string, 1)
|
c.headers = make(map[string]*config.Secret, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := c.headers["User-Agent"]; !ok {
|
if _, ok := c.headers["User-Agent"]; !ok {
|
||||||
c.headers["User-Agent"] = c.userAgent
|
sec := config.NewSecret([]byte(c.userAgent))
|
||||||
|
c.headers["User-Agent"] = &sec
|
||||||
}
|
}
|
||||||
|
|
||||||
var proxy func(*http.Request) (*url.URL, error)
|
var proxy func(*http.Request) (*url.URL, error)
|
||||||
|
|
@ -279,7 +280,9 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
|
||||||
req.Header.Set("Authorization", "Token "+token.String())
|
req.Header.Set("Authorization", "Token "+token.String())
|
||||||
token.Destroy()
|
token.Destroy()
|
||||||
|
|
||||||
c.addHeaders(req)
|
if err := c.addHeaders(req); err != nil {
|
||||||
|
return fmt.Errorf("adding headers failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Execute the request
|
// Execute the request
|
||||||
c.rateLimiter.Accept(ratets, used)
|
c.rateLimiter.Accept(ratets, used)
|
||||||
|
|
@ -395,14 +398,23 @@ func (c *httpClient) getRetryDuration(headers http.Header) time.Duration {
|
||||||
return time.Duration(retry*1000) * time.Millisecond
|
return time.Duration(retry*1000) * time.Millisecond
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) addHeaders(req *http.Request) {
|
func (c *httpClient) addHeaders(req *http.Request) error {
|
||||||
for header, value := range c.headers {
|
for header, value := range c.headers {
|
||||||
|
secret, err := value.Get()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
headerVal := secret.String()
|
||||||
|
secret.Destroy()
|
||||||
if strings.EqualFold(header, "host") {
|
if strings.EqualFold(header, "host") {
|
||||||
req.Host = value
|
req.Host = headerVal
|
||||||
} else {
|
} else {
|
||||||
req.Header.Set(header, value)
|
req.Header.Set(header, headerVal)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeWriteURL(loc url.URL, params url.Values, bucket string) string {
|
func makeWriteURL(loc url.URL, params url.Values, bucket string) string {
|
||||||
|
|
|
||||||
|
|
@ -190,17 +190,19 @@ func TestExponentialBackoffCalculationWithRetryAfter(t *testing.T) {
|
||||||
func TestHeadersDoNotOverrideConfig(t *testing.T) {
|
func TestHeadersDoNotOverrideConfig(t *testing.T) {
|
||||||
testURL, err := url.Parse("https://localhost:8181")
|
testURL, err := url.Parse("https://localhost:8181")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
authHeader := config.NewSecret([]byte("Bearer foo"))
|
||||||
|
userAgentHeader := config.NewSecret([]byte("foo"))
|
||||||
c := &httpClient{
|
c := &httpClient{
|
||||||
headers: map[string]string{
|
headers: map[string]*config.Secret{
|
||||||
"Authorization": "Bearer foo",
|
"Authorization": &authHeader,
|
||||||
"User-Agent": "foo",
|
"User-Agent": &userAgentHeader,
|
||||||
},
|
},
|
||||||
// URL to make Init() happy
|
// URL to make Init() happy
|
||||||
url: testURL,
|
url: testURL,
|
||||||
}
|
}
|
||||||
require.NoError(t, c.Init())
|
require.NoError(t, c.Init())
|
||||||
require.Equal(t, "Bearer foo", c.headers["Authorization"])
|
require.Equal(t, &authHeader, c.headers["Authorization"])
|
||||||
require.Equal(t, "foo", c.headers["User-Agent"])
|
require.Equal(t, &userAgentHeader, c.headers["User-Agent"])
|
||||||
}
|
}
|
||||||
|
|
||||||
// goos: linux
|
// goos: linux
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ type InfluxDB struct {
|
||||||
BucketTag string `toml:"bucket_tag"`
|
BucketTag string `toml:"bucket_tag"`
|
||||||
ExcludeBucketTag bool `toml:"exclude_bucket_tag"`
|
ExcludeBucketTag bool `toml:"exclude_bucket_tag"`
|
||||||
Timeout config.Duration `toml:"timeout"`
|
Timeout config.Duration `toml:"timeout"`
|
||||||
HTTPHeaders map[string]string `toml:"http_headers"`
|
HTTPHeaders map[string]*config.Secret `toml:"http_headers"`
|
||||||
HTTPProxy string `toml:"http_proxy"`
|
HTTPProxy string `toml:"http_proxy"`
|
||||||
UserAgent string `toml:"user_agent"`
|
UserAgent string `toml:"user_agent"`
|
||||||
ContentEncoding string `toml:"content_encoding"`
|
ContentEncoding string `toml:"content_encoding"`
|
||||||
|
|
|
||||||
|
|
@ -64,13 +64,15 @@ func TestInit(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var headerSecret = config.NewSecret([]byte("y"))
|
||||||
|
|
||||||
func TestConnectFail(t *testing.T) {
|
func TestConnectFail(t *testing.T) {
|
||||||
tests := []*influxdb.InfluxDB{
|
tests := []*influxdb.InfluxDB{
|
||||||
{
|
{
|
||||||
URLs: []string{"!@#$qwert"},
|
URLs: []string{"!@#$qwert"},
|
||||||
HTTPProxy: "http://localhost:8086",
|
HTTPProxy: "http://localhost:8086",
|
||||||
HTTPHeaders: map[string]string{
|
HTTPHeaders: map[string]*config.Secret{
|
||||||
"x": "y",
|
"x": &headerSecret,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
@ -78,8 +80,8 @@ func TestConnectFail(t *testing.T) {
|
||||||
|
|
||||||
URLs: []string{"http://localhost:1234"},
|
URLs: []string{"http://localhost:1234"},
|
||||||
HTTPProxy: "!@#$%^&*()_+",
|
HTTPProxy: "!@#$%^&*()_+",
|
||||||
HTTPHeaders: map[string]string{
|
HTTPHeaders: map[string]*config.Secret{
|
||||||
"x": "y",
|
"x": &headerSecret,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
@ -87,8 +89,8 @@ func TestConnectFail(t *testing.T) {
|
||||||
|
|
||||||
URLs: []string{"!@#$%^&*()_+"},
|
URLs: []string{"!@#$%^&*()_+"},
|
||||||
HTTPProxy: "http://localhost:8086",
|
HTTPProxy: "http://localhost:8086",
|
||||||
HTTPHeaders: map[string]string{
|
HTTPHeaders: map[string]*config.Secret{
|
||||||
"x": "y",
|
"x": &headerSecret,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
@ -96,8 +98,8 @@ func TestConnectFail(t *testing.T) {
|
||||||
|
|
||||||
URLs: []string{":::@#$qwert"},
|
URLs: []string{":::@#$qwert"},
|
||||||
HTTPProxy: "http://localhost:8086",
|
HTTPProxy: "http://localhost:8086",
|
||||||
HTTPHeaders: map[string]string{
|
HTTPHeaders: map[string]*config.Secret{
|
||||||
"x": "y",
|
"x": &headerSecret,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -115,8 +117,8 @@ func TestConnect(t *testing.T) {
|
||||||
{
|
{
|
||||||
URLs: []string{"http://localhost:1234"},
|
URLs: []string{"http://localhost:1234"},
|
||||||
HTTPProxy: "http://localhost:8086",
|
HTTPProxy: "http://localhost:8086",
|
||||||
HTTPHeaders: map[string]string{
|
HTTPHeaders: map[string]*config.Secret{
|
||||||
"x": "y",
|
"x": &headerSecret,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue