diff --git a/plugins/common/http/config.go b/plugins/common/http/config.go new file mode 100644 index 000000000..b61a346be --- /dev/null +++ b/plugins/common/http/config.go @@ -0,0 +1,54 @@ +package httpconfig + +import ( + "context" + "net/http" + "time" + + "github.com/influxdata/telegraf/config" + oauthConfig "github.com/influxdata/telegraf/plugins/common/oauth" + "github.com/influxdata/telegraf/plugins/common/proxy" + "github.com/influxdata/telegraf/plugins/common/tls" +) + +// Common HTTP client struct. +type HTTPClientConfig struct { + Timeout config.Duration `toml:"timeout"` + IdleConnTimeout config.Duration `toml:"idle_conn_timeout"` + + proxy.HTTPProxy + tls.ClientConfig + oauthConfig.OAuth2Config +} + +func (h *HTTPClientConfig) CreateClient(ctx context.Context) (*http.Client, error) { + tlsCfg, err := h.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + prox, err := h.HTTPProxy.Proxy() + if err != nil { + return nil, err + } + + transport := &http.Transport{ + TLSClientConfig: tlsCfg, + Proxy: prox, + IdleConnTimeout: time.Duration(h.IdleConnTimeout), + } + + timeout := h.Timeout + if timeout == 0 { + timeout = config.Duration(time.Second * 5) + } + + client := &http.Client{ + Transport: transport, + Timeout: time.Duration(timeout), + } + + client = h.OAuth2Config.CreateOauth2Client(ctx, client) + + return client, nil +} diff --git a/plugins/common/oauth/config.go b/plugins/common/oauth/config.go new file mode 100644 index 000000000..aa42a7a65 --- /dev/null +++ b/plugins/common/oauth/config.go @@ -0,0 +1,32 @@ +package oauth + +import ( + "context" + "net/http" + + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" +) + +type OAuth2Config struct { + // OAuth2 Credentials + ClientID string `toml:"client_id"` + ClientSecret string `toml:"client_secret"` + TokenURL string `toml:"token_url"` + Scopes []string `toml:"scopes"` +} + +func (o *OAuth2Config) CreateOauth2Client(ctx context.Context, client *http.Client) *http.Client { + if o.ClientID != "" && o.ClientSecret != "" && o.TokenURL != "" { + oauthConfig := clientcredentials.Config{ + ClientID: o.ClientID, + ClientSecret: o.ClientSecret, + TokenURL: o.TokenURL, + Scopes: o.Scopes, + } + ctx = context.WithValue(ctx, oauth2.HTTPClient, client) + client = oauthConfig.Client(ctx) + } + + return client +} diff --git a/plugins/inputs/http/README.md b/plugins/inputs/http/README.md index a9c554cad..4b799043b 100644 --- a/plugins/inputs/http/README.md +++ b/plugins/inputs/http/README.md @@ -34,6 +34,12 @@ The HTTP input plugin collects metrics from one or more HTTP(S) endpoints. The # username = "username" # password = "pa$$word" + ## OAuth2 Client Credentials. The options 'client_id', 'client_secret', and 'token_url' are required to use OAuth2. + # client_id = "clientid" + # client_secret = "secret" + # token_url = "https://indentityprovider/oauth2/v1/token" + # scopes = ["urn:opc:idm:__myscopes__"] + ## HTTP Proxy support # http_proxy_url = "" diff --git a/plugins/inputs/http/http.go b/plugins/inputs/http/http.go index 3c5b80a8e..a0cffd07d 100644 --- a/plugins/inputs/http/http.go +++ b/plugins/inputs/http/http.go @@ -1,19 +1,17 @@ package http import ( + "context" "fmt" "io" "io/ioutil" "net/http" "strings" "sync" - "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/plugins/common/proxy" - "github.com/influxdata/telegraf/plugins/common/tls" + httpconfig "github.com/influxdata/telegraf/plugins/common/http" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" ) @@ -29,18 +27,14 @@ type HTTP struct { // HTTP Basic Auth Credentials Username string `toml:"username"` Password string `toml:"password"` - tls.ClientConfig - - proxy.HTTPProxy // Absolute path to file with Bearer token BearerToken string `toml:"bearer_token"` SuccessStatusCodes []int `toml:"success_status_codes"` - Timeout config.Duration `toml:"timeout"` - client *http.Client + httpconfig.HTTPClientConfig // The parser will automatically be set by Telegraf core code because // this plugin implements the ParserInput interface (i.e. the SetParser method) @@ -77,6 +71,12 @@ var sampleConfig = ` ## HTTP Proxy support # http_proxy_url = "" + ## OAuth2 Client Credentials Grant + # client_id = "clientid" + # client_secret = "secret" + # token_url = "https://indentityprovider/oauth2/v1/token" + # scopes = ["urn:opc:idm:__myscopes__"] + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" @@ -108,25 +108,13 @@ func (*HTTP) Description() string { } func (h *HTTP) Init() error { - tlsCfg, err := h.ClientConfig.TLSConfig() + ctx := context.Background() + client, err := h.HTTPClientConfig.CreateClient(ctx) if err != nil { return err } - proxy, err := h.HTTPProxy.Proxy() - if err != nil { - return err - } - - transport := &http.Transport{ - TLSClientConfig: tlsCfg, - Proxy: proxy, - } - - h.client = &http.Client{ - Transport: transport, - Timeout: time.Duration(h.Timeout), - } + h.client = client // Set default as [200] if len(h.SuccessStatusCodes) == 0 { @@ -262,8 +250,7 @@ func makeRequestBodyReader(contentEncoding, body string) (io.ReadCloser, error) func init() { inputs.Add("http", func() telegraf.Input { return &HTTP{ - Timeout: config.Duration(time.Second * 5), - Method: "GET", + Method: "GET", } }) } diff --git a/plugins/inputs/http/http_test.go b/plugins/inputs/http/http_test.go index edd0b2004..02351effc 100644 --- a/plugins/inputs/http/http_test.go +++ b/plugins/inputs/http/http_test.go @@ -6,8 +6,11 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "net/url" "testing" + httpconfig "github.com/influxdata/telegraf/plugins/common/http" + oauth "github.com/influxdata/telegraf/plugins/common/oauth" plugin "github.com/influxdata/telegraf/plugins/inputs/http" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -252,3 +255,82 @@ func TestBodyAndContentEncoding(t *testing.T) { }) } } + +type TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) + +func TestOAuthClientCredentialsGrant(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + var token = "2YotnFZFEjr1zCsicMWpAA" + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + plugin *plugin.HTTP + tokenHandler TestHandlerFunc + handler TestHandlerFunc + }{ + { + name: "no credentials", + plugin: &plugin.HTTP{ + URLs: []string{u.String()}, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Len(t, r.Header["Authorization"], 0) + w.WriteHeader(http.StatusOK) + }, + }, + { + name: "success", + plugin: &plugin.HTTP{ + URLs: []string{u.String() + "/write"}, + HTTPClientConfig: httpconfig.HTTPClientConfig{ + OAuth2Config: oauth.OAuth2Config{ + ClientID: "howdy", + ClientSecret: "secret", + TokenURL: u.String() + "/token", + Scopes: []string{"urn:opc:idm:__myscopes__"}, + }, + }, + }, + tokenHandler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + values := url.Values{} + values.Add("access_token", token) + values.Add("token_type", "bearer") + values.Add("expires_in", "3600") + _, err := w.Write([]byte(values.Encode())) + require.NoError(t, err) + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"]) + w.WriteHeader(http.StatusOK) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + tt.handler(t, w, r) + case "/token": + tt.tokenHandler(t, w, r) + } + }) + + parser, _ := parsers.NewValueParser("metric", "string", "", nil) + tt.plugin.SetParser(parser) + err = tt.plugin.Init() + require.NoError(t, err) + + var acc testutil.Accumulator + err = tt.plugin.Gather(&acc) + require.NoError(t, err) + }) + } +} diff --git a/plugins/outputs/http/http.go b/plugins/outputs/http/http.go index 82ae230ec..5da273f2d 100644 --- a/plugins/outputs/http/http.go +++ b/plugins/outputs/http/http.go @@ -11,13 +11,11 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" + httpconfig "github.com/influxdata/telegraf/plugins/common/http" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" - "golang.org/x/oauth2" - "golang.org/x/oauth2/clientcredentials" ) const ( @@ -80,18 +78,13 @@ const ( type HTTP struct { URL string `toml:"url"` - Timeout config.Duration `toml:"timeout"` Method string `toml:"method"` Username string `toml:"username"` Password string `toml:"password"` Headers map[string]string `toml:"headers"` - ClientID string `toml:"client_id"` - ClientSecret string `toml:"client_secret"` - TokenURL string `toml:"token_url"` - Scopes []string `toml:"scopes"` ContentEncoding string `toml:"content_encoding"` - IdleConnTimeout config.Duration `toml:"idle_conn_timeout"` tls.ClientConfig + httpconfig.HTTPClientConfig client *http.Client serializer serializers.Serializer @@ -101,35 +94,6 @@ func (h *HTTP) SetSerializer(serializer serializers.Serializer) { h.serializer = serializer } -func (h *HTTP) createClient(ctx context.Context) (*http.Client, error) { - tlsCfg, err := h.ClientConfig.TLSConfig() - if err != nil { - return nil, err - } - - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsCfg, - Proxy: http.ProxyFromEnvironment, - IdleConnTimeout: time.Duration(h.IdleConnTimeout), - }, - Timeout: time.Duration(h.Timeout), - } - - if h.ClientID != "" && h.ClientSecret != "" && h.TokenURL != "" { - oauthConfig := clientcredentials.Config{ - ClientID: h.ClientID, - ClientSecret: h.ClientSecret, - TokenURL: h.TokenURL, - Scopes: h.Scopes, - } - ctx = context.WithValue(ctx, oauth2.HTTPClient, client) - client = oauthConfig.Client(ctx) - } - - return client, nil -} - func (h *HTTP) Connect() error { if h.Method == "" { h.Method = http.MethodPost @@ -139,12 +103,8 @@ func (h *HTTP) Connect() error { return fmt.Errorf("invalid method [%s] %s", h.URL, h.Method) } - if h.Timeout == 0 { - h.Timeout = config.Duration(defaultClientTimeout) - } - ctx := context.Background() - client, err := h.createClient(ctx) + client, err := h.HTTPClientConfig.CreateClient(ctx) if err != nil { return err } @@ -229,9 +189,8 @@ func (h *HTTP) write(reqBody []byte) error { func init() { outputs.Add("http", func() telegraf.Output { return &HTTP{ - Timeout: config.Duration(defaultClientTimeout), - Method: defaultMethod, - URL: defaultURL, + Method: defaultMethod, + URL: defaultURL, } }) } diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go index a09f7dd7e..8089f45f5 100644 --- a/plugins/outputs/http/http_test.go +++ b/plugins/outputs/http/http_test.go @@ -13,6 +13,8 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" + httpconfig "github.com/influxdata/telegraf/plugins/common/http" + oauth "github.com/influxdata/telegraf/plugins/common/oauth" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/stretchr/testify/require" ) @@ -379,11 +381,15 @@ func TestOAuthClientCredentialsGrant(t *testing.T) { { name: "success", plugin: &HTTP{ - URL: u.String() + "/write", - ClientID: "howdy", - ClientSecret: "secret", - TokenURL: u.String() + "/token", - Scopes: []string{"urn:opc:idm:__myscopes__"}, + URL: u.String() + "/write", + HTTPClientConfig: httpconfig.HTTPClientConfig{ + OAuth2Config: oauth.OAuth2Config{ + ClientID: "howdy", + ClientSecret: "secret", + TokenURL: u.String() + "/token", + Scopes: []string{"urn:opc:idm:__myscopes__"}, + }, + }, }, tokenHandler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK)