From e46f90e89ca00f422c5941673765bc834799d4a8 Mon Sep 17 00:00:00 2001 From: Wilfried OLLIVIER Date: Wed, 7 Sep 2022 21:25:32 +0200 Subject: [PATCH] fix(inputs.mongodb): add an option to bypass connection errors on start (#11629) --- plugins/inputs/mongodb/README.md | 6 ++ plugins/inputs/mongodb/mongodb.go | 64 +++++++++++++------ plugins/inputs/mongodb/mongodb_server.go | 7 ++ plugins/inputs/mongodb/mongodb_server_test.go | 54 ++++++++++++++++ plugins/inputs/mongodb/sample.conf | 6 ++ 5 files changed, 119 insertions(+), 18 deletions(-) diff --git a/plugins/inputs/mongodb/README.md b/plugins/inputs/mongodb/README.md index 1d011ffc4..74a204a48 100644 --- a/plugins/inputs/mongodb/README.md +++ b/plugins/inputs/mongodb/README.md @@ -43,6 +43,12 @@ All MongoDB server versions from 2.6 and higher are supported. # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false + + ## Specifies plugin behavior regarding disconnected servers + ## Available choices : + ## - error: telegraf will return an error on startup if one the servers is unreachable + ## - skip: telegraf will skip unreachable servers on both startup and gather + # disconnected_servers_behavior = "error" ``` ### Permissions diff --git a/plugins/inputs/mongodb/mongodb.go b/plugins/inputs/mongodb/mongodb.go index 23d115113..c131fe0ac 100644 --- a/plugins/inputs/mongodb/mongodb.go +++ b/plugins/inputs/mongodb/mongodb.go @@ -17,6 +17,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/readpref" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/choice" tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -25,19 +26,23 @@ import ( //go:embed sample.conf var sampleConfig string +var DisconnectedServersBehaviors = []string{"error", "skip"} + type MongoDB struct { - Servers []string - Ssl Ssl - GatherClusterStatus bool - GatherPerdbStats bool - GatherColStats bool - GatherTopStat bool - ColStatsDbs []string + Servers []string + Ssl Ssl + GatherClusterStatus bool + GatherPerdbStats bool + GatherColStats bool + GatherTopStat bool + DisconnectedServersBehavior string + ColStatsDbs []string tlsint.ClientConfig Log telegraf.Logger `toml:"-"` - clients []*Server + clients []*Server + tlsConfig *tls.Config } type Ssl struct { @@ -50,10 +55,17 @@ func (*MongoDB) SampleConfig() string { } func (m *MongoDB) Init() error { - var tlsConfig *tls.Config + if m.DisconnectedServersBehavior == "" { + m.DisconnectedServersBehavior = "error" + } + + if err := choice.Check(m.DisconnectedServersBehavior, DisconnectedServersBehaviors); err != nil { + return fmt.Errorf("disconnected_servers_behavior: %w", err) + } + if m.Ssl.Enabled { // Deprecated TLS config - tlsConfig = &tls.Config{ + m.tlsConfig = &tls.Config{ InsecureSkipVerify: m.ClientConfig.InsecureSkipVerify, } if len(m.Ssl.CaCerts) == 0 { @@ -66,10 +78,10 @@ func (m *MongoDB) Init() error { return fmt.Errorf("failed to parse root certificate") } } - tlsConfig.RootCAs = roots + m.tlsConfig.RootCAs = roots } else { var err error - tlsConfig, err = m.ClientConfig.TLSConfig() + m.tlsConfig, err = m.ClientConfig.TLSConfig() if err != nil { return err } @@ -79,6 +91,11 @@ func (m *MongoDB) Init() error { m.Servers = []string{"mongodb://127.0.0.1:27017"} } + return nil +} + +// Start runs after init and setup mongodb connections +func (m *MongoDB) Start() error { for _, connURL := range m.Servers { if !strings.HasPrefix(connURL, "mongodb://") && !strings.HasPrefix(connURL, "mongodb+srv://") { // Preserve backwards compatibility for hostnames without a @@ -89,15 +106,15 @@ func (m *MongoDB) Init() error { u, err := url.Parse(connURL) if err != nil { - return fmt.Errorf("unable to parse connection URL: %q", err) + return fmt.Errorf("unable to parse connection URL: %w", err) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() //nolint:revive opts := options.Client().ApplyURI(connURL) - if tlsConfig != nil { - opts.TLSConfig = tlsConfig + if m.tlsConfig != nil { + opts.TLSConfig = m.tlsConfig } if opts.ReadPreference == nil { opts.ReadPreference = readpref.Nearest() @@ -105,12 +122,16 @@ func (m *MongoDB) Init() error { client, err := mongo.Connect(ctx, opts) if err != nil { - return fmt.Errorf("unable to connect to MongoDB: %q", err) + return fmt.Errorf("unable to connect to MongoDB: %w", err) } err = client.Ping(ctx, opts.ReadPreference) if err != nil { - return fmt.Errorf("unable to connect to MongoDB: %s", err) + if m.DisconnectedServersBehavior == "error" { + return fmt.Errorf("unable to ping MongoDB: %w", err) + } + + m.Log.Errorf("unable to ping MongoDB: %w", err) } server := &Server{ @@ -132,9 +153,16 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(srv *Server) { defer wg.Done() + if m.DisconnectedServersBehavior == "skip" { + if err := srv.ping(); err != nil { + m.Log.Debugf("failed to ping server: %w", err) + return + } + } + err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDbs) if err != nil { - m.Log.Errorf("failed to gather data: %q", err) + m.Log.Errorf("failed to gather data: %w", err) } }(client) } diff --git a/plugins/inputs/mongodb/mongodb_server.go b/plugins/inputs/mongodb/mongodb_server.go index a60d0e6e8..bca83297b 100644 --- a/plugins/inputs/mongodb/mongodb_server.go +++ b/plugins/inputs/mongodb/mongodb_server.go @@ -37,6 +37,13 @@ func IsAuthorization(err error) bool { return strings.Contains(err.Error(), "not authorized") } +func (s *Server) ping() error { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + return s.client.Ping(ctx, nil) +} + func (s *Server) authLog(err error) { if IsAuthorization(err) { s.Log.Debug(err.Error()) diff --git a/plugins/inputs/mongodb/mongodb_server_test.go b/plugins/inputs/mongodb/mongodb_server_test.go index 62b109a0f..b2d3688ac 100644 --- a/plugins/inputs/mongodb/mongodb_server_test.go +++ b/plugins/inputs/mongodb/mongodb_server_test.go @@ -12,6 +12,7 @@ import ( ) var ServicePort = "27017" +var unreachableMongoEndpoint = "mongodb://user:pass@127.0.0.1:27017/nop" func createTestServer(t *testing.T) *testutil.Container { container := testutil.Container{ @@ -46,6 +47,8 @@ func TestGetDefaultTagsIntegration(t *testing.T) { } err := m.Init() require.NoError(t, err) + err = m.Start() + require.NoError(t, err) server := m.clients[0] @@ -81,6 +84,8 @@ func TestAddDefaultStatsIntegration(t *testing.T) { } err := m.Init() require.NoError(t, err) + err = m.Start() + require.NoError(t, err) server := m.clients[0] @@ -97,6 +102,55 @@ func TestAddDefaultStatsIntegration(t *testing.T) { } } +func TestSkipBehaviorIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + m := &MongoDB{ + Log: &testutil.CaptureLogger{}, + Servers: []string{unreachableMongoEndpoint}, + } + + m.DisconnectedServersBehavior = "skip" + err := m.Init() + require.NoError(t, err) + err = m.Start() + require.NoError(t, err) + + var acc testutil.Accumulator + err = m.Gather(&acc) + require.NoError(t, err) + require.NotContains(t, m.Log.(*testutil.CaptureLogger).LastError, "failed to gather data: ") +} + +func TestErrorBehaviorIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + m := &MongoDB{ + Log: &testutil.CaptureLogger{}, + Servers: []string{unreachableMongoEndpoint}, + } + + err := m.Init() + require.NoError(t, err) + err = m.Start() + require.Error(t, err) + + // set to skip to bypass start error + m.DisconnectedServersBehavior = "skip" + err = m.Start() + require.NoError(t, err) + m.DisconnectedServersBehavior = "error" + + var acc testutil.Accumulator + err = m.Gather(&acc) + require.NoError(t, err) + require.Contains(t, m.Log.(*testutil.CaptureLogger).LastError, "failed to gather data: ") +} + func TestPoolStatsVersionCompatibility(t *testing.T) { tests := []struct { name string diff --git a/plugins/inputs/mongodb/sample.conf b/plugins/inputs/mongodb/sample.conf index 5e9fbba17..d3985e079 100644 --- a/plugins/inputs/mongodb/sample.conf +++ b/plugins/inputs/mongodb/sample.conf @@ -36,3 +36,9 @@ # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false + + ## Specifies plugin behavior regarding disconnected servers + ## Available choices : + ## - error: telegraf will return an error on startup if one the servers is unreachable + ## - skip: telegraf will skip unreachable servers on both startup and gather + # disconnected_servers_behavior = "error"