From fe8467539aecbcee17ef08841f9a13a8e3175aa4 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Tue, 25 Jul 2023 00:08:16 +0200 Subject: [PATCH] feat(inputs.influxdb_listener): Add token based authentication (#13610) --- go.mod | 3 +- go.sum | 2 + internal/http.go | 108 ++++++++++++++++-- plugins/inputs/dcos/client.go | 2 +- plugins/inputs/dcos/client_test.go | 5 +- plugins/inputs/dcos/dcos.go | 2 +- plugins/inputs/dcos/dcos_test.go | 3 +- plugins/inputs/influxdb_listener/README.md | 14 ++- .../influxdb_listener/influxdb_listener.go | 29 ++++- .../influxdb_listener_test.go | 99 ++++++++++++++++ plugins/inputs/influxdb_listener/sample.conf | 14 ++- plugins/outputs/health/health.go | 2 +- .../prometheus_client/prometheus_client.go | 2 +- 13 files changed, 258 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 3928447d6..f40d6822d 100644 --- a/go.mod +++ b/go.mod @@ -79,7 +79,7 @@ require ( github.com/go-stomp/stomp v2.1.4+incompatible github.com/gobwas/glob v0.2.3 github.com/gofrs/uuid/v5 v5.0.0 - github.com/golang-jwt/jwt/v4 v4.5.0 + github.com/golang-jwt/jwt/v5 v5.0.0 github.com/golang/geo v0.0.0-20190916061304-5b978397cfec github.com/golang/snappy v0.0.4 github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc @@ -308,6 +308,7 @@ require ( github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gofrs/uuid v4.2.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect diff --git a/go.sum b/go.sum index 34806087d..9cfa1ef19 100644 --- a/go.sum +++ b/go.sum @@ -599,6 +599,8 @@ github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzw github.com/golang-jwt/jwt/v4 v4.4.3/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= +github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= diff --git a/internal/http.go b/internal/http.go index a2b3d6293..832f82e37 100644 --- a/internal/http.go +++ b/internal/http.go @@ -3,16 +3,88 @@ package internal import ( "crypto/subtle" "errors" + "fmt" "net" "net/http" "net/url" + "strings" + + "github.com/golang-jwt/jwt/v5" ) type BasicAuthErrorFunc func(rw http.ResponseWriter) -// AuthHandler returns a http handler that requires HTTP basic auth +// JWTAuthHandler returns a http handler that requires the HTTP bearer auth +// token to be valid and match the given user. +func JWTAuthHandler(secret, username string, onError BasicAuthErrorFunc) func(h http.Handler) http.Handler { + return func(h http.Handler) http.Handler { + return &jwtAuthHandler{ + secret: []byte(secret), + username: []byte(username), + onError: onError, + next: h, + } + } +} + +type jwtAuthHandler struct { + secret []byte + username []byte + onError BasicAuthErrorFunc + next http.Handler +} + +func (h *jwtAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + authHeader := req.Header.Get("Authentication") + if !strings.HasPrefix(authHeader, "Bearer ") { + h.onError(rw) + http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + bearer := strings.TrimPrefix(authHeader, "Bearer ") + token, err := jwt.Parse(bearer, func(t *jwt.Token) (interface{}, error) { + if _, ok := t.Method.(*jwt.SigningMethodHMAC); !ok { + return nil, fmt.Errorf("unexpected signing method: %v", t.Method) + } + return h.secret, nil + }) + if err != nil || !token.Valid { + h.onError(rw) + if err != nil && errors.Is(err, jwt.ErrTokenExpired) { + http.Error(rw, "token expired", http.StatusUnauthorized) + } else if err != nil { + http.Error(rw, "invalid token: "+err.Error(), http.StatusUnauthorized) + } else { + http.Error(rw, "invalid token", http.StatusUnauthorized) + } + return + } + + claims, ok := token.Claims.(jwt.MapClaims) + if !ok { + h.onError(rw) + http.Error(rw, "problem authenticating token", http.StatusInternalServerError) + return + } + + username, ok := claims["username"].(string) + if !ok || username == "" { + h.onError(rw) + http.Error(rw, "token must contain a string username", http.StatusUnauthorized) + return + } + if subtle.ConstantTimeCompare([]byte(username), h.username) != 1 { + h.onError(rw) + http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + + h.next.ServeHTTP(rw, req) +} + +// BasicAuthHandler returns a http handler that requires HTTP basic auth // credentials to match the given username and password. -func AuthHandler(username, password, realm string, onError BasicAuthErrorFunc) func(h http.Handler) http.Handler { +func BasicAuthHandler(username, password, realm string, onError BasicAuthErrorFunc) func(h http.Handler) http.Handler { return func(h http.Handler) http.Handler { return &basicAuthHandler{ username: username, @@ -33,16 +105,28 @@ type basicAuthHandler struct { } func (h *basicAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - if h.username != "" || h.password != "" { - reqUsername, reqPassword, ok := req.BasicAuth() - if !ok || - subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.username)) != 1 || - subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.password)) != 1 { - rw.Header().Set("WWW-Authenticate", "Basic realm=\""+h.realm+"\"") - h.onError(rw) - http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) - return - } + if h.username == "" && h.password == "" { + h.next.ServeHTTP(rw, req) + return + } + + var reqUsername, reqPassword string + var ok bool + authHeader := req.Header.Get("Authorization") + if strings.HasPrefix(authHeader, "Token ") { + token := strings.TrimPrefix(authHeader, "Token ") + reqUsername, reqPassword, ok = strings.Cut(token, ":") + } else { + reqUsername, reqPassword, ok = req.BasicAuth() + } + + if !ok || + subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.username)) != 1 || + subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.password)) != 1 { + rw.Header().Set("WWW-Authenticate", "Basic realm=\""+h.realm+"\"") + h.onError(rw) + http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return } h.next.ServeHTTP(rw, req) diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index d4ebd2b03..4719cff2a 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -10,7 +10,7 @@ import ( "net/url" "time" - "github.com/golang-jwt/jwt/v4" + "github.com/golang-jwt/jwt/v5" ) const ( diff --git a/plugins/inputs/dcos/client_test.go b/plugins/inputs/dcos/client_test.go index 70cf9ce7c..1c2b5f156 100644 --- a/plugins/inputs/dcos/client_test.go +++ b/plugins/inputs/dcos/client_test.go @@ -8,9 +8,10 @@ import ( "net/url" "testing" - jwt "github.com/golang-jwt/jwt/v4" - "github.com/influxdata/telegraf/testutil" + jwt "github.com/golang-jwt/jwt/v5" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) var privateKey = testutil.NewPKI("../../../testutil/pki").ReadServerKey() diff --git a/plugins/inputs/dcos/dcos.go b/plugins/inputs/dcos/dcos.go index b9977660f..776134bd0 100644 --- a/plugins/inputs/dcos/dcos.go +++ b/plugins/inputs/dcos/dcos.go @@ -12,7 +12,7 @@ import ( "sync" "time" - "github.com/golang-jwt/jwt/v4" + "github.com/golang-jwt/jwt/v5" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" diff --git a/plugins/inputs/dcos/dcos_test.go b/plugins/inputs/dcos/dcos_test.go index 828fd0af6..9aa78c573 100644 --- a/plugins/inputs/dcos/dcos_test.go +++ b/plugins/inputs/dcos/dcos_test.go @@ -5,8 +5,9 @@ import ( "fmt" "testing" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) type mockClient struct { diff --git a/plugins/inputs/influxdb_listener/README.md b/plugins/inputs/influxdb_listener/README.md index f33c67719..baea59aa2 100644 --- a/plugins/inputs/influxdb_listener/README.md +++ b/plugins/inputs/influxdb_listener/README.md @@ -79,11 +79,23 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## the value of this tag name. # retention_policy_tag = "" - ## Optional username and password to accept for HTTP basic authentication. + ## Optional username and password to accept for HTTP basic authentication + ## or authentication token. ## You probably want to make sure you have TLS configured above for this. + ## Use these options for the authentication token in the form + ## Authentication: Token : # basic_username = "foobar" # basic_password = "barfoo" + ## Optional JWT token authentication for HTTP requests + ## Please see the documentation at + ## https://docs.influxdata.com/influxdb/v1.8/administration/authentication_and_authorization/#authenticate-using-jwt-tokens + ## for further details. + ## Please note: Token authentication and basic authentication cannot be used + ## at the same time. + # token_shared_secret = "" + # token_username = "" + ## Influx line protocol parser ## 'internal' is the default. 'upstream' is a newer parser that is faster ## and more memory efficient. diff --git a/plugins/inputs/influxdb_listener/influxdb_listener.go b/plugins/inputs/influxdb_listener/influxdb_listener.go index e1b8cd84a..d9bc4c30c 100644 --- a/plugins/inputs/influxdb_listener/influxdb_listener.go +++ b/plugins/inputs/influxdb_listener/influxdb_listener.go @@ -43,6 +43,8 @@ type InfluxDBListener struct { MaxLineSize config.Size `toml:"max_line_size" deprecated:"1.14.0;parser now handles lines of unlimited length and option is ignored"` BasicUsername string `toml:"basic_username"` BasicPassword string `toml:"basic_password"` + TokenSharedSecret string `toml:"token_shared_secret"` + TokenUsername string `toml:"token_username"` DatabaseTag string `toml:"database_tag"` RetentionPolicyTag string `toml:"retention_policy_tag"` ParserType string `toml:"parser_type"` @@ -78,11 +80,20 @@ func (h *InfluxDBListener) Gather(_ telegraf.Accumulator) error { } func (h *InfluxDBListener) routes() { - authHandler := internal.AuthHandler(h.BasicUsername, h.BasicPassword, "influxdb", - func(_ http.ResponseWriter) { - h.authFailures.Incr(1) - }, - ) + var authHandler func(http.Handler) http.Handler + if h.TokenSharedSecret != "" { + authHandler = internal.JWTAuthHandler(h.TokenSharedSecret, h.TokenUsername, + func(_ http.ResponseWriter) { + h.authFailures.Incr(1) + }, + ) + } else { + authHandler = internal.BasicAuthHandler(h.BasicUsername, h.BasicPassword, "influxdb", + func(_ http.ResponseWriter) { + h.authFailures.Incr(1) + }, + ) + } h.mux.Handle("/write", authHandler(h.handleWrite())) h.mux.Handle("/query", authHandler(h.handleQuery())) @@ -91,6 +102,14 @@ func (h *InfluxDBListener) routes() { } func (h *InfluxDBListener) Init() error { + // Check the config setting + if (h.BasicUsername != "" || h.BasicPassword != "") && (h.TokenSharedSecret != "" || h.TokenUsername != "") { + return errors.New("cannot use basic-auth and tokens at the same time") + } + if h.TokenSharedSecret != "" && h.TokenUsername == "" || h.TokenSharedSecret == "" && h.TokenUsername != "" { + return errors.New("neither 'token_shared_secret' nor 'token_username' can be empty for token authentication") + } + tags := map[string]string{ "address": h.ServiceAddress, } diff --git a/plugins/inputs/influxdb_listener/influxdb_listener_test.go b/plugins/inputs/influxdb_listener/influxdb_listener_test.go index 1c3ceedaf..1f629f6a2 100644 --- a/plugins/inputs/influxdb_listener/influxdb_listener_test.go +++ b/plugins/inputs/influxdb_listener/influxdb_listener_test.go @@ -5,15 +5,18 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "io" "net/http" "net/url" "os" "runtime" "strconv" + "strings" "sync" "testing" "time" + "github.com/golang-jwt/jwt/v5" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -163,6 +166,102 @@ func TestWriteBasicAuth(t *testing.T) { require.EqualValues(t, http.StatusNoContent, resp.StatusCode) } +func TestWriteToken(t *testing.T) { + plugin := &InfluxDBListener{ + ServiceAddress: "localhost:0", + TokenSharedSecret: "a S3cr3T $sTr1ng", + TokenUsername: "John Doe", + Log: testutil.Logger{}, + timeFunc: time.Now, + } + require.NoError(t, plugin.Init()) + + // Create a valid token + token, err := jwt.NewWithClaims(jwt.SigningMethodHS512, jwt.MapClaims{ + "username": plugin.TokenUsername, + "exp": time.Now().Add(5 * time.Minute).Unix(), + }).SignedString([]byte(plugin.TokenSharedSecret)) + require.NoError(t, err) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + client := &http.Client{} + req, err := http.NewRequest("POST", createURL(plugin, "http", "/write", "db=mydb"), bytes.NewBuffer([]byte(testMsg))) + require.NoError(t, err) + req.Header.Add("Authentication", "Bearer "+token) + resp, err := client.Do(req) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, http.StatusNoContent, resp.StatusCode) +} + +func TestWriteTokenInvalidUser(t *testing.T) { + plugin := &InfluxDBListener{ + ServiceAddress: "localhost:0", + TokenSharedSecret: "a S3cr3T $sTr1ng", + TokenUsername: "John Doe", + Log: testutil.Logger{}, + timeFunc: time.Now, + } + require.NoError(t, plugin.Init()) + + // Create a valid token + token, err := jwt.NewWithClaims(jwt.SigningMethodHS512, jwt.MapClaims{ + "username": "peter", + "exp": time.Now().Add(5 * time.Minute).Unix(), + }).SignedString([]byte(plugin.TokenSharedSecret)) + require.NoError(t, err) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + client := &http.Client{} + req, err := http.NewRequest("POST", createURL(plugin, "http", "/write", "db=mydb"), bytes.NewBuffer([]byte(testMsg))) + require.NoError(t, err) + req.Header.Add("Authentication", "Bearer "+token) + resp, err := client.Do(req) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, http.StatusUnauthorized, resp.StatusCode) +} + +func TestWriteTokenExpired(t *testing.T) { + plugin := &InfluxDBListener{ + ServiceAddress: "localhost:0", + TokenSharedSecret: "a S3cr3T $sTr1ng", + TokenUsername: "John Doe", + Log: testutil.Logger{}, + timeFunc: time.Now, + } + require.NoError(t, plugin.Init()) + + // Create a valid token + token, err := jwt.NewWithClaims(jwt.SigningMethodHS512, jwt.MapClaims{ + "username": plugin.TokenUsername, + "exp": time.Now().Add(-5 * time.Minute).Unix(), + }).SignedString([]byte(plugin.TokenSharedSecret)) + require.NoError(t, err) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + client := &http.Client{} + req, err := http.NewRequest("POST", createURL(plugin, "http", "/write", "db=mydb"), bytes.NewBuffer([]byte(testMsg))) + require.NoError(t, err) + req.Header.Add("Authentication", "Bearer "+token) + resp, err := client.Do(req) + require.NoError(t, err) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, http.StatusUnauthorized, resp.StatusCode) + require.EqualValues(t, "token expired", strings.TrimSpace(string(body))) +} + func TestWriteKeepDatabase(t *testing.T) { testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n" diff --git a/plugins/inputs/influxdb_listener/sample.conf b/plugins/inputs/influxdb_listener/sample.conf index f8d4aa8e3..1d1544632 100644 --- a/plugins/inputs/influxdb_listener/sample.conf +++ b/plugins/inputs/influxdb_listener/sample.conf @@ -36,11 +36,23 @@ ## the value of this tag name. # retention_policy_tag = "" - ## Optional username and password to accept for HTTP basic authentication. + ## Optional username and password to accept for HTTP basic authentication + ## or authentication token. ## You probably want to make sure you have TLS configured above for this. + ## Use these options for the authentication token in the form + ## Authentication: Token : # basic_username = "foobar" # basic_password = "barfoo" + ## Optional JWT token authentication for HTTP requests + ## Please see the documentation at + ## https://docs.influxdata.com/influxdb/v1.8/administration/authentication_and_authorization/#authenticate-using-jwt-tokens + ## for further details. + ## Please note: Token authentication and basic authentication cannot be used + ## at the same time. + # token_shared_secret = "" + # token_username = "" + ## Influx line protocol parser ## 'internal' is the default. 'upstream' is a newer parser that is faster ## and more memory efficient. diff --git a/plugins/outputs/health/health.go b/plugins/outputs/health/health.go index 2906c0baa..419044d4c 100644 --- a/plugins/outputs/health/health.go +++ b/plugins/outputs/health/health.go @@ -99,7 +99,7 @@ func (h *Health) Init() error { // Connect starts the HTTP server. func (h *Health) Connect() error { - authHandler := internal.AuthHandler(h.BasicUsername, h.BasicPassword, "health", onAuthError) + authHandler := internal.BasicAuthHandler(h.BasicUsername, h.BasicPassword, "health", onAuthError) h.server = &http.Server{ Addr: h.ServiceAddress, diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index f91a2406d..610ed9444 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -123,7 +123,7 @@ func (p *PrometheusClient) Init() error { ipRange = append(ipRange, ipNet) } - authHandler := internal.AuthHandler(p.BasicUsername, p.BasicPassword, "prometheus", onAuthError) + authHandler := internal.BasicAuthHandler(p.BasicUsername, p.BasicPassword, "prometheus", onAuthError) rangeHandler := internal.IPRangeHandler(ipRange, onError) promHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}) landingPageHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {