From 2476640d0fcfbf4868d9ba4e54a4df02bdf0a1a1 Mon Sep 17 00:00:00 2001 From: Neelay Upadhyaya Date: Mon, 22 May 2023 13:09:16 +0530 Subject: [PATCH] feat(inputs.sql): Add 'disconnected_servers_behavior' field in the configuration (#13289) --- plugins/inputs/sql/README.md | 6 +++ plugins/inputs/sql/sample.conf | 6 +++ plugins/inputs/sql/sql.go | 77 +++++++++++++++++++++++++++------- 3 files changed, 73 insertions(+), 16 deletions(-) diff --git a/plugins/inputs/sql/README.md b/plugins/inputs/sql/README.md index 3c99d2fa7..5573a3001 100644 --- a/plugins/inputs/sql/README.md +++ b/plugins/inputs/sql/README.md @@ -56,6 +56,12 @@ to use them. # connection_max_open = 0 # connection_max_idle = auto + ## Specifies plugin behavior regarding disconnected servers + ## Available choices : + ## - error: telegraf will return an error on startup if one the servers is unreachable + ## - ignore: telegraf will ignore unreachable servers on both startup and gather + # disconnected_servers_behavior = "error" + [[inputs.sql.query]] ## Query to perform on the server query="SELECT user,state,latency,score FROM Scoreboard WHERE application > 0" diff --git a/plugins/inputs/sql/sample.conf b/plugins/inputs/sql/sample.conf index af7a9df8c..c17cbee41 100644 --- a/plugins/inputs/sql/sample.conf +++ b/plugins/inputs/sql/sample.conf @@ -28,6 +28,12 @@ # connection_max_open = 0 # connection_max_idle = auto + ## Specifies plugin behavior regarding disconnected servers + ## Available choices : + ## - error: telegraf will return an error on startup if one the servers is unreachable + ## - ignore: telegraf will ignore unreachable servers on both startup and gather + # disconnected_servers_behavior = "error" + [[inputs.sql.query]] ## Query to perform on the server query="SELECT user,state,latency,score FROM Scoreboard WHERE application > 0" diff --git a/plugins/inputs/sql/sql.go b/plugins/inputs/sql/sql.go index 1008ed293..1d833310d 100644 --- a/plugins/inputs/sql/sql.go +++ b/plugins/inputs/sql/sql.go @@ -26,6 +26,8 @@ var sampleConfig string const magicIdleCount = -int(^uint(0) >> 1) +var disconnectedServersBehavior = []string{"error", "ignore"} + type Query struct { Query string `toml:"query"` Script string `toml:"query_script"` @@ -205,18 +207,20 @@ func (q *Query) parse(acc telegraf.Accumulator, rows *dbsql.Rows, t time.Time) ( } type SQL struct { - Driver string `toml:"driver"` - Dsn config.Secret `toml:"dsn"` - Timeout config.Duration `toml:"timeout"` - MaxIdleTime config.Duration `toml:"connection_max_idle_time"` - MaxLifetime config.Duration `toml:"connection_max_life_time"` - MaxOpenConnections int `toml:"connection_max_open"` - MaxIdleConnections int `toml:"connection_max_idle"` - Queries []Query `toml:"query"` - Log telegraf.Logger `toml:"-"` + Driver string `toml:"driver"` + Dsn config.Secret `toml:"dsn"` + Timeout config.Duration `toml:"timeout"` + MaxIdleTime config.Duration `toml:"connection_max_idle_time"` + MaxLifetime config.Duration `toml:"connection_max_life_time"` + MaxOpenConnections int `toml:"connection_max_open"` + MaxIdleConnections int `toml:"connection_max_idle"` + Queries []Query `toml:"query"` + Log telegraf.Logger `toml:"-"` + DisconnectedServersBehavior string `toml:"disconnected_servers_behavior"` - driverName string - db *dbsql.DB + driverName string + db *dbsql.DB + serverConnected bool } func (*SQL) SampleConfig() string { @@ -351,12 +355,18 @@ func (s *SQL) Init() error { return fmt.Errorf("driver %q not supported use one of %v", s.Driver, availDrivers) } + if s.DisconnectedServersBehavior == "" { + s.DisconnectedServersBehavior = "error" + } + + if !choice.Contains(s.DisconnectedServersBehavior, disconnectedServersBehavior) { + return fmt.Errorf("%q is not a valid value for disconnected_servers_behavior", s.DisconnectedServersBehavior) + } + return nil } -func (s *SQL) Start(_ telegraf.Accumulator) error { - var err error - +func (s *SQL) setupConnection() error { // Connect to the database server dsnSecret, err := s.Dsn.Get() if err != nil { @@ -364,9 +374,11 @@ func (s *SQL) Start(_ telegraf.Accumulator) error { } dsn := string(dsnSecret) config.ReleaseSecret(dsnSecret) + s.Log.Debug("Connecting...") s.db, err = dbsql.Open(s.driverName, dsn) if err != nil { + // should return since the error is most likely with invalid DSN string format return err } @@ -375,16 +387,23 @@ func (s *SQL) Start(_ telegraf.Accumulator) error { s.db.SetConnMaxLifetime(time.Duration(s.MaxLifetime)) s.db.SetMaxOpenConns(s.MaxOpenConnections) s.db.SetMaxIdleConns(s.MaxIdleConnections) + return nil +} +func (s *SQL) ping() error { // Test if the connection can be established s.Log.Debug("Testing connectivity...") ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.Timeout)) - err = s.db.PingContext(ctx) + err := s.db.PingContext(ctx) cancel() if err != nil { - return fmt.Errorf("connecting to database failed: %w", err) + return fmt.Errorf("unable to connect to database: %w", err) } + s.serverConnected = true + return nil +} +func (s *SQL) prepareStatements() { // Prepare the statements for i, q := range s.Queries { s.Log.Debugf("Preparing statement %q...", q.Query) @@ -402,6 +421,22 @@ func (s *SQL) Start(_ telegraf.Accumulator) error { } s.Queries[i].statement = stmt } +} + +func (s *SQL) Start(_ telegraf.Accumulator) error { + if err := s.setupConnection(); err != nil { + return err + } + + if err := s.ping(); err != nil { + if s.DisconnectedServersBehavior == "error" { + return err + } + s.Log.Errorf("unable to connect to database: %s", err) + } + if s.serverConnected { + s.prepareStatements() + } return nil } @@ -425,6 +460,16 @@ func (s *SQL) Stop() { } func (s *SQL) Gather(acc telegraf.Accumulator) error { + // during plugin startup, it is possible that the server was not reachable. + // we try pinging the server in this collection cycle. + // we are only concerned with `prepareStatements` function to complete(return true), just once. + if !s.serverConnected { + if err := s.ping(); err != nil { + return err + } + s.prepareStatements() + } + var wg sync.WaitGroup tstart := time.Now() for _, query := range s.Queries {