fix(inputs.postgresql): Default database definition (#13602)
This commit is contained in:
parent
f001b29eee
commit
b20c0d7fad
|
|
@ -103,9 +103,9 @@ func (p *PgBouncer) accRow(row scanner, columns []string) (map[string]string, ma
|
||||||
return nil, nil, fmt.Errorf("writing database name failed: %w", err)
|
return nil, nil, fmt.Errorf("writing database name failed: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
_, err := dbname.WriteString("postgres")
|
_, err := dbname.WriteString("pgbouncer")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("writing 'postgres' failed: %w", err)
|
return nil, nil, fmt.Errorf("writing 'pgbouncer' failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -120,8 +120,13 @@ func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator, columns []str
|
||||||
columnVars = append(columnVars, columnMap[columns[i]])
|
columnVars = append(columnVars, columnMap[columns[i]])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tagAddress, err := p.SanitizedAddress()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// deconstruct array of variables and send to Scan
|
// deconstruct array of variables and send to Scan
|
||||||
err := row.Scan(columnVars...)
|
err = row.Scan(columnVars...)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -139,15 +144,13 @@ func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator, columns []str
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if _, err := dbname.WriteString("postgres"); err != nil {
|
database, err := p.GetConnectDatabase(tagAddress)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := dbname.WriteString(database); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
var tagAddress string
|
|
||||||
tagAddress, err = p.SanitizedAddress()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"server": tagAddress, "db": dbname.String()}
|
tags := map[string]string{"server": tagAddress, "db": dbname.String()}
|
||||||
|
|
|
||||||
|
|
@ -174,3 +174,17 @@ func (p *Service) SanitizedAddress() (sanitizedAddress string, err error) {
|
||||||
|
|
||||||
return kvMatcher.ReplaceAllString(canonicalizedAddress, ""), nil
|
return kvMatcher.ReplaceAllString(canonicalizedAddress, ""), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetConnectDatabase utility function for getting the database to which the connection was made
|
||||||
|
func (p *Service) GetConnectDatabase(connectionString string) (string, error) {
|
||||||
|
connConfig, err := pgx.ParseConfig(connectionString)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("connection string parsing failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(connConfig.Database) != 0 {
|
||||||
|
return connConfig.Database, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return "postgres", nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -186,6 +186,10 @@ func (p *Postgresql) accRow(measName string, row scanner, acc telegraf.Accumulat
|
||||||
columnVars = append(columnVars, columnMap[columns[i]])
|
columnVars = append(columnVars, columnMap[columns[i]])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tagAddress, err = p.SanitizedAddress(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// deconstruct array of variables and send to Scan
|
// deconstruct array of variables and send to Scan
|
||||||
if err = row.Scan(columnVars...); err != nil {
|
if err = row.Scan(columnVars...); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -199,18 +203,22 @@ func (p *Postgresql) accRow(measName string, row scanner, acc telegraf.Accumulat
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
if _, err := dbname.WriteString("postgres"); err != nil {
|
database, err := p.GetConnectDatabase(tagAddress)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := dbname.WriteString(database); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if _, err := dbname.WriteString("postgres"); err != nil {
|
database, err := p.GetConnectDatabase(tagAddress)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := dbname.WriteString(database); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if tagAddress, err = p.SanitizedAddress(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process the additional tags
|
// Process the additional tags
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue