feat(inputs.sql): Add 'disconnected_servers_behavior' field in the configuration (#13289)

This commit is contained in:
Neelay Upadhyaya 2023-05-22 13:09:16 +05:30 committed by GitHub
parent d06fb73228
commit 2476640d0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 16 deletions

View File

@ -56,6 +56,12 @@ to use them.
# connection_max_open = 0
# connection_max_idle = auto
## Specifies plugin behavior regarding disconnected servers
## Available choices :
## - error: telegraf will return an error on startup if one the servers is unreachable
## - ignore: telegraf will ignore unreachable servers on both startup and gather
# disconnected_servers_behavior = "error"
[[inputs.sql.query]]
## Query to perform on the server
query="SELECT user,state,latency,score FROM Scoreboard WHERE application > 0"

View File

@ -28,6 +28,12 @@
# connection_max_open = 0
# connection_max_idle = auto
## Specifies plugin behavior regarding disconnected servers
## Available choices :
## - error: telegraf will return an error on startup if one the servers is unreachable
## - ignore: telegraf will ignore unreachable servers on both startup and gather
# disconnected_servers_behavior = "error"
[[inputs.sql.query]]
## Query to perform on the server
query="SELECT user,state,latency,score FROM Scoreboard WHERE application > 0"

View File

@ -26,6 +26,8 @@ var sampleConfig string
const magicIdleCount = -int(^uint(0) >> 1)
var disconnectedServersBehavior = []string{"error", "ignore"}
type Query struct {
Query string `toml:"query"`
Script string `toml:"query_script"`
@ -205,18 +207,20 @@ func (q *Query) parse(acc telegraf.Accumulator, rows *dbsql.Rows, t time.Time) (
}
type SQL struct {
Driver string `toml:"driver"`
Dsn config.Secret `toml:"dsn"`
Timeout config.Duration `toml:"timeout"`
MaxIdleTime config.Duration `toml:"connection_max_idle_time"`
MaxLifetime config.Duration `toml:"connection_max_life_time"`
MaxOpenConnections int `toml:"connection_max_open"`
MaxIdleConnections int `toml:"connection_max_idle"`
Queries []Query `toml:"query"`
Log telegraf.Logger `toml:"-"`
Driver string `toml:"driver"`
Dsn config.Secret `toml:"dsn"`
Timeout config.Duration `toml:"timeout"`
MaxIdleTime config.Duration `toml:"connection_max_idle_time"`
MaxLifetime config.Duration `toml:"connection_max_life_time"`
MaxOpenConnections int `toml:"connection_max_open"`
MaxIdleConnections int `toml:"connection_max_idle"`
Queries []Query `toml:"query"`
Log telegraf.Logger `toml:"-"`
DisconnectedServersBehavior string `toml:"disconnected_servers_behavior"`
driverName string
db *dbsql.DB
driverName string
db *dbsql.DB
serverConnected bool
}
func (*SQL) SampleConfig() string {
@ -351,12 +355,18 @@ func (s *SQL) Init() error {
return fmt.Errorf("driver %q not supported use one of %v", s.Driver, availDrivers)
}
if s.DisconnectedServersBehavior == "" {
s.DisconnectedServersBehavior = "error"
}
if !choice.Contains(s.DisconnectedServersBehavior, disconnectedServersBehavior) {
return fmt.Errorf("%q is not a valid value for disconnected_servers_behavior", s.DisconnectedServersBehavior)
}
return nil
}
func (s *SQL) Start(_ telegraf.Accumulator) error {
var err error
func (s *SQL) setupConnection() error {
// Connect to the database server
dsnSecret, err := s.Dsn.Get()
if err != nil {
@ -364,9 +374,11 @@ func (s *SQL) Start(_ telegraf.Accumulator) error {
}
dsn := string(dsnSecret)
config.ReleaseSecret(dsnSecret)
s.Log.Debug("Connecting...")
s.db, err = dbsql.Open(s.driverName, dsn)
if err != nil {
// should return since the error is most likely with invalid DSN string format
return err
}
@ -375,16 +387,23 @@ func (s *SQL) Start(_ telegraf.Accumulator) error {
s.db.SetConnMaxLifetime(time.Duration(s.MaxLifetime))
s.db.SetMaxOpenConns(s.MaxOpenConnections)
s.db.SetMaxIdleConns(s.MaxIdleConnections)
return nil
}
func (s *SQL) ping() error {
// Test if the connection can be established
s.Log.Debug("Testing connectivity...")
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.Timeout))
err = s.db.PingContext(ctx)
err := s.db.PingContext(ctx)
cancel()
if err != nil {
return fmt.Errorf("connecting to database failed: %w", err)
return fmt.Errorf("unable to connect to database: %w", err)
}
s.serverConnected = true
return nil
}
func (s *SQL) prepareStatements() {
// Prepare the statements
for i, q := range s.Queries {
s.Log.Debugf("Preparing statement %q...", q.Query)
@ -402,6 +421,22 @@ func (s *SQL) Start(_ telegraf.Accumulator) error {
}
s.Queries[i].statement = stmt
}
}
func (s *SQL) Start(_ telegraf.Accumulator) error {
if err := s.setupConnection(); err != nil {
return err
}
if err := s.ping(); err != nil {
if s.DisconnectedServersBehavior == "error" {
return err
}
s.Log.Errorf("unable to connect to database: %s", err)
}
if s.serverConnected {
s.prepareStatements()
}
return nil
}
@ -425,6 +460,16 @@ func (s *SQL) Stop() {
}
func (s *SQL) Gather(acc telegraf.Accumulator) error {
// during plugin startup, it is possible that the server was not reachable.
// we try pinging the server in this collection cycle.
// we are only concerned with `prepareStatements` function to complete(return true), just once.
if !s.serverConnected {
if err := s.ping(); err != nil {
return err
}
s.prepareStatements()
}
var wg sync.WaitGroup
tstart := time.Now()
for _, query := range s.Queries {