Add OAuth2 to HTTP input (#9138)

* add oauth2 to http input

* linter fixes

* add http config to common plugin

* address linter changes

* Update README.md

* add log for user if fields are missing

* add correct logger

* alter output plugin as well

* fix formatting

* add oauth2 separate package

* fix package naming

* remove unnecessary logger
This commit is contained in:
David Bennett 2021-04-23 09:37:27 -04:00 committed by GitHub
parent 1bc87ccc3c
commit 8bb388584d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 203 additions and 77 deletions

View File

@ -0,0 +1,54 @@
package httpconfig
import (
"context"
"net/http"
"time"
"github.com/influxdata/telegraf/config"
oauthConfig "github.com/influxdata/telegraf/plugins/common/oauth"
"github.com/influxdata/telegraf/plugins/common/proxy"
"github.com/influxdata/telegraf/plugins/common/tls"
)
// Common HTTP client struct.
type HTTPClientConfig struct {
Timeout config.Duration `toml:"timeout"`
IdleConnTimeout config.Duration `toml:"idle_conn_timeout"`
proxy.HTTPProxy
tls.ClientConfig
oauthConfig.OAuth2Config
}
func (h *HTTPClientConfig) CreateClient(ctx context.Context) (*http.Client, error) {
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
prox, err := h.HTTPProxy.Proxy()
if err != nil {
return nil, err
}
transport := &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: prox,
IdleConnTimeout: time.Duration(h.IdleConnTimeout),
}
timeout := h.Timeout
if timeout == 0 {
timeout = config.Duration(time.Second * 5)
}
client := &http.Client{
Transport: transport,
Timeout: time.Duration(timeout),
}
client = h.OAuth2Config.CreateOauth2Client(ctx, client)
return client, nil
}

View File

@ -0,0 +1,32 @@
package oauth
import (
"context"
"net/http"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
type OAuth2Config struct {
// OAuth2 Credentials
ClientID string `toml:"client_id"`
ClientSecret string `toml:"client_secret"`
TokenURL string `toml:"token_url"`
Scopes []string `toml:"scopes"`
}
func (o *OAuth2Config) CreateOauth2Client(ctx context.Context, client *http.Client) *http.Client {
if o.ClientID != "" && o.ClientSecret != "" && o.TokenURL != "" {
oauthConfig := clientcredentials.Config{
ClientID: o.ClientID,
ClientSecret: o.ClientSecret,
TokenURL: o.TokenURL,
Scopes: o.Scopes,
}
ctx = context.WithValue(ctx, oauth2.HTTPClient, client)
client = oauthConfig.Client(ctx)
}
return client
}

View File

@ -34,6 +34,12 @@ The HTTP input plugin collects metrics from one or more HTTP(S) endpoints. The
# username = "username"
# password = "pa$$word"
## OAuth2 Client Credentials. The options 'client_id', 'client_secret', and 'token_url' are required to use OAuth2.
# client_id = "clientid"
# client_secret = "secret"
# token_url = "https://indentityprovider/oauth2/v1/token"
# scopes = ["urn:opc:idm:__myscopes__"]
## HTTP Proxy support
# http_proxy_url = ""

View File

@ -1,19 +1,17 @@
package http
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/proxy"
"github.com/influxdata/telegraf/plugins/common/tls"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
@ -29,18 +27,14 @@ type HTTP struct {
// HTTP Basic Auth Credentials
Username string `toml:"username"`
Password string `toml:"password"`
tls.ClientConfig
proxy.HTTPProxy
// Absolute path to file with Bearer token
BearerToken string `toml:"bearer_token"`
SuccessStatusCodes []int `toml:"success_status_codes"`
Timeout config.Duration `toml:"timeout"`
client *http.Client
httpconfig.HTTPClientConfig
// The parser will automatically be set by Telegraf core code because
// this plugin implements the ParserInput interface (i.e. the SetParser method)
@ -77,6 +71,12 @@ var sampleConfig = `
## HTTP Proxy support
# http_proxy_url = ""
## OAuth2 Client Credentials Grant
# client_id = "clientid"
# client_secret = "secret"
# token_url = "https://indentityprovider/oauth2/v1/token"
# scopes = ["urn:opc:idm:__myscopes__"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
@ -108,25 +108,13 @@ func (*HTTP) Description() string {
}
func (h *HTTP) Init() error {
tlsCfg, err := h.ClientConfig.TLSConfig()
ctx := context.Background()
client, err := h.HTTPClientConfig.CreateClient(ctx)
if err != nil {
return err
}
proxy, err := h.HTTPProxy.Proxy()
if err != nil {
return err
}
transport := &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: proxy,
}
h.client = &http.Client{
Transport: transport,
Timeout: time.Duration(h.Timeout),
}
h.client = client
// Set default as [200]
if len(h.SuccessStatusCodes) == 0 {
@ -262,8 +250,7 @@ func makeRequestBodyReader(contentEncoding, body string) (io.ReadCloser, error)
func init() {
inputs.Add("http", func() telegraf.Input {
return &HTTP{
Timeout: config.Duration(time.Second * 5),
Method: "GET",
Method: "GET",
}
})
}

View File

@ -6,8 +6,11 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"testing"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
oauth "github.com/influxdata/telegraf/plugins/common/oauth"
plugin "github.com/influxdata/telegraf/plugins/inputs/http"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
@ -252,3 +255,82 @@ func TestBodyAndContentEncoding(t *testing.T) {
})
}
}
type TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
func TestOAuthClientCredentialsGrant(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
var token = "2YotnFZFEjr1zCsicMWpAA"
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
tests := []struct {
name string
plugin *plugin.HTTP
tokenHandler TestHandlerFunc
handler TestHandlerFunc
}{
{
name: "no credentials",
plugin: &plugin.HTTP{
URLs: []string{u.String()},
},
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Len(t, r.Header["Authorization"], 0)
w.WriteHeader(http.StatusOK)
},
},
{
name: "success",
plugin: &plugin.HTTP{
URLs: []string{u.String() + "/write"},
HTTPClientConfig: httpconfig.HTTPClientConfig{
OAuth2Config: oauth.OAuth2Config{
ClientID: "howdy",
ClientSecret: "secret",
TokenURL: u.String() + "/token",
Scopes: []string{"urn:opc:idm:__myscopes__"},
},
},
},
tokenHandler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
values := url.Values{}
values.Add("access_token", token)
values.Add("token_type", "bearer")
values.Add("expires_in", "3600")
_, err := w.Write([]byte(values.Encode()))
require.NoError(t, err)
},
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"])
w.WriteHeader(http.StatusOK)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
tt.handler(t, w, r)
case "/token":
tt.tokenHandler(t, w, r)
}
})
parser, _ := parsers.NewValueParser("metric", "string", "", nil)
tt.plugin.SetParser(parser)
err = tt.plugin.Init()
require.NoError(t, err)
var acc testutil.Accumulator
err = tt.plugin.Gather(&acc)
require.NoError(t, err)
})
}
}

View File

@ -11,13 +11,11 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
const (
@ -80,18 +78,13 @@ const (
type HTTP struct {
URL string `toml:"url"`
Timeout config.Duration `toml:"timeout"`
Method string `toml:"method"`
Username string `toml:"username"`
Password string `toml:"password"`
Headers map[string]string `toml:"headers"`
ClientID string `toml:"client_id"`
ClientSecret string `toml:"client_secret"`
TokenURL string `toml:"token_url"`
Scopes []string `toml:"scopes"`
ContentEncoding string `toml:"content_encoding"`
IdleConnTimeout config.Duration `toml:"idle_conn_timeout"`
tls.ClientConfig
httpconfig.HTTPClientConfig
client *http.Client
serializer serializers.Serializer
@ -101,35 +94,6 @@ func (h *HTTP) SetSerializer(serializer serializers.Serializer) {
h.serializer = serializer
}
func (h *HTTP) createClient(ctx context.Context) (*http.Client, error) {
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
IdleConnTimeout: time.Duration(h.IdleConnTimeout),
},
Timeout: time.Duration(h.Timeout),
}
if h.ClientID != "" && h.ClientSecret != "" && h.TokenURL != "" {
oauthConfig := clientcredentials.Config{
ClientID: h.ClientID,
ClientSecret: h.ClientSecret,
TokenURL: h.TokenURL,
Scopes: h.Scopes,
}
ctx = context.WithValue(ctx, oauth2.HTTPClient, client)
client = oauthConfig.Client(ctx)
}
return client, nil
}
func (h *HTTP) Connect() error {
if h.Method == "" {
h.Method = http.MethodPost
@ -139,12 +103,8 @@ func (h *HTTP) Connect() error {
return fmt.Errorf("invalid method [%s] %s", h.URL, h.Method)
}
if h.Timeout == 0 {
h.Timeout = config.Duration(defaultClientTimeout)
}
ctx := context.Background()
client, err := h.createClient(ctx)
client, err := h.HTTPClientConfig.CreateClient(ctx)
if err != nil {
return err
}
@ -229,9 +189,8 @@ func (h *HTTP) write(reqBody []byte) error {
func init() {
outputs.Add("http", func() telegraf.Output {
return &HTTP{
Timeout: config.Duration(defaultClientTimeout),
Method: defaultMethod,
URL: defaultURL,
Method: defaultMethod,
URL: defaultURL,
}
})
}

View File

@ -13,6 +13,8 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
oauth "github.com/influxdata/telegraf/plugins/common/oauth"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/stretchr/testify/require"
)
@ -379,11 +381,15 @@ func TestOAuthClientCredentialsGrant(t *testing.T) {
{
name: "success",
plugin: &HTTP{
URL: u.String() + "/write",
ClientID: "howdy",
ClientSecret: "secret",
TokenURL: u.String() + "/token",
Scopes: []string{"urn:opc:idm:__myscopes__"},
URL: u.String() + "/write",
HTTPClientConfig: httpconfig.HTTPClientConfig{
OAuth2Config: oauth.OAuth2Config{
ClientID: "howdy",
ClientSecret: "secret",
TokenURL: u.String() + "/token",
Scopes: []string{"urn:opc:idm:__myscopes__"},
},
},
},
tokenHandler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)