feat(inputs.pgbouncer): Added show_commands to select the collected pgbouncer metrics (#13436)

This commit is contained in:
Aleks Vagachev 2023-07-05 17:11:11 +03:00 committed by GitHub
parent ae163536e6
commit be2f950a3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 334 additions and 73 deletions

View File

@ -30,6 +30,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## All connection parameters are optional.
##
address = "host=localhost user=pgbouncer sslmode=disable"
## Specify which "show" commands to gather metrics for.
## Choose from: "stats", "pools", "lists", "databases"
# show_commands = ["stats", "pools"]
```
### `address`
@ -89,9 +93,45 @@ the server and doesn't restrict the databases we are trying to grab metrics for.
- sv_tested
- sv_used
- pgbouncer_lists
- tags:
- db
- server
- user
- fields:
- databases
- users
- pools
- free_clients
- used_clients
- login_clients
- free_servers
- used_servers
- dns_names
- dns_zones
- dns_queries
- pgbouncer_databases
- tags:
- db
- pg_dbname
- server
- user
- fields:
- current_connections
- pool_size
- min_pool_size
- reserve_pool
- max_connections
- paused
- disabled
## Example Output
```text
pgbouncer,db=pgbouncer,server=host\=debian-buster-postgres\ user\=dbn\ port\=6432\ dbname\=pgbouncer\ avg_query_count=0i,avg_query_time=0i,avg_wait_time=0i,avg_xact_count=0i,avg_xact_time=0i,total_query_count=26i,total_query_time=0i,total_received=0i,total_sent=0i,total_wait_time=0i,total_xact_count=26i,total_xact_time=0i 1581569936000000000
pgbouncer_pools,db=pgbouncer,pool_mode=statement,server=host\=debian-buster-postgres\ user\=dbn\ port\=6432\ dbname\=pgbouncer\ ,user=pgbouncer cl_active=1i,cl_waiting=0i,maxwait=0i,maxwait_us=0i,sv_active=0i,sv_idle=0i,sv_login=0i,sv_tested=0i,sv_used=0i 1581569936000000000
pgbouncer_lists,db=pgbouncer,server=host\=debian-buster-postgres\ user\=dbn\ port\=6432\ dbname\=pgbouncer\ ,user=pgbouncer databases=1i,dns_names=0i,dns_queries=0i,dns_zones=0i,free_clients=47i,free_servers=0i,login_clients=0i,pools=1i,used_clients=3i,used_servers=0i,users=4i 1581569936000000000
pgbouncer_databases,db=pgbouncer,pg_dbname=pgbouncer,server=host\=debian-buster-postgres\ user\=dbn\ port\=6432\ dbname\=pgbouncer\ name=pgbouncer disabled=0i,pool_size=2i,current_connections=0i,min_pool_size=0i,reserve_pool=0i,max_connections=0i,paused=0i 1581569936000000000
pgbouncer_databases,db=postgres,pg_dbname=postgres,server=host\=debian-buster-postgres\ user\=dbn\ port\=6432\ dbname\=pgbouncer\ name=postgres current_connections=0i,disabled=0i,pool_size=20i,min_pool_size=0i,reserve_pool=0i,paused=0i,max_connections=0i 1581569936000000000
```

View File

@ -4,6 +4,7 @@ package pgbouncer
import (
"bytes"
_ "embed"
"fmt"
"strconv"
// Required for SQL framework driver
@ -20,10 +21,12 @@ var sampleConfig string
type PgBouncer struct {
postgresql.Service
ShowCommands []string `toml:"show_commands"`
}
var ignoredColumns = map[string]bool{"user": true, "database": true, "pool_mode": true,
"avg_req": true, "avg_recv": true, "avg_sent": true, "avg_query": true,
"force_user": true, "host": true, "port": true, "name": true,
}
func (*PgBouncer) SampleConfig() string {
@ -31,29 +34,108 @@ func (*PgBouncer) SampleConfig() string {
}
func (p *PgBouncer) Gather(acc telegraf.Accumulator) error {
var (
err error
query string
columns []string
)
if len(p.ShowCommands) == 0 {
if err := p.showStats(acc); err != nil {
return err
}
query = `SHOW STATS`
if err := p.showPools(acc); err != nil {
return err
}
} else {
for _, cmd := range p.ShowCommands {
switch {
case cmd == "stats":
if err := p.showStats(acc); err != nil {
return err
}
case cmd == "pools":
if err := p.showPools(acc); err != nil {
return err
}
case cmd == "lists":
if err := p.showLists(acc); err != nil {
return err
}
case cmd == "databases":
if err := p.showDatabase(acc); err != nil {
return err
}
}
}
}
rows, err := p.DB.Query(query)
return nil
}
type scanner interface {
Scan(dest ...interface{}) error
}
func (p *PgBouncer) accRow(row scanner, columns []string) (map[string]string, map[string]*interface{}, error) {
var dbname bytes.Buffer
// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})
for _, column := range columns {
columnMap[column] = new(interface{})
}
columnVars := make([]interface{}, 0, len(columnMap))
// populate the array of interface{} with the pointers in the right order
for i := 0; i < len(columnMap); i++ {
columnVars = append(columnVars, columnMap[columns[i]])
}
// deconstruct array of variables and send to Scan
err := row.Scan(columnVars...)
if err != nil {
return err
return nil, nil, fmt.Errorf("couldn't copy the data: %w", err)
}
if columnMap["database"] != nil {
// extract the database name from the column map
name, ok := (*columnMap["database"]).(string)
if !ok {
return nil, nil, fmt.Errorf("database not a string, but %T", *columnMap["database"])
}
_, err := dbname.WriteString(name)
if err != nil {
return nil, nil, fmt.Errorf("writing database name failed: %w", err)
}
} else {
_, err := dbname.WriteString("postgres")
if err != nil {
return nil, nil, fmt.Errorf("writing 'postgres' failed: %w", err)
}
}
var tagAddress string
tagAddress, err = p.SanitizedAddress()
if err != nil {
return nil, nil, fmt.Errorf("couldn't get connection data: %w", err)
}
// Return basic tags and the mapped columns
return map[string]string{"server": tagAddress, "db": dbname.String()}, columnMap, nil
}
func (p *PgBouncer) showStats(acc telegraf.Accumulator) error {
// STATS
rows, err := p.DB.Query(`SHOW STATS`)
if err != nil {
return fmt.Errorf("execution error 'show stats': %w", err)
}
defer rows.Close()
// grab the column information from the result
if columns, err = rows.Columns(); err != nil {
return err
columns, err := rows.Columns()
if err != nil {
return fmt.Errorf("don't get column names 'show stats': %w", err)
}
for rows.Next() {
tags, columnMap, err := p.accRow(rows, columns)
if err != nil {
return err
}
@ -73,7 +155,7 @@ func (p *PgBouncer) Gather(acc telegraf.Accumulator) error {
// Integer fields are returned in pgbouncer 1.12
integer, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return err
return fmt.Errorf("couldn't convert metrics 'show stats': %w", err)
}
fields[col] = integer
@ -82,23 +164,22 @@ func (p *PgBouncer) Gather(acc telegraf.Accumulator) error {
acc.AddFields("pgbouncer", fields, tags)
}
err = rows.Err()
if err != nil {
return err
}
return rows.Err()
}
query = `SHOW POOLS`
poolRows, err := p.DB.Query(query)
func (p *PgBouncer) showPools(acc telegraf.Accumulator) error {
// POOLS
poolRows, err := p.DB.Query(`SHOW POOLS`)
if err != nil {
return err
return fmt.Errorf("execution error 'show pools': %w", err)
}
defer poolRows.Close()
// grab the column information from the result
if columns, err = poolRows.Columns(); err != nil {
return err
columns, err := poolRows.Columns()
if err != nil {
return fmt.Errorf("don't get column names 'show pools': %w", err)
}
for poolRows.Next() {
@ -132,52 +213,92 @@ func (p *PgBouncer) Gather(acc telegraf.Accumulator) error {
return poolRows.Err()
}
type scanner interface {
Scan(dest ...interface{}) error
func (p *PgBouncer) showLists(acc telegraf.Accumulator) error {
// LISTS
rows, err := p.DB.Query(`SHOW LISTS`)
if err != nil {
return fmt.Errorf("execution error 'show lists': %w", err)
}
defer rows.Close()
// grab the column information from the result
columns, err := rows.Columns()
if err != nil {
return fmt.Errorf("don't get column names 'show lists': %w", err)
}
fields := make(map[string]interface{})
tags := make(map[string]string)
for rows.Next() {
tag, columnMap, err := p.accRow(rows, columns)
if err != nil {
return err
}
name, ok := (*columnMap["list"]).(string)
if !ok {
return fmt.Errorf("metric name(show lists) not a string, but %T", *columnMap["list"])
}
if name != "dns_pending" {
value, ok := (*columnMap["items"]).(int64)
if !ok {
return fmt.Errorf("metric value(show lists) not a int64, but %T", *columnMap["items"])
}
fields[name] = value
tags = tag
}
}
acc.AddFields("pgbouncer_lists", fields, tags)
return rows.Err()
}
func (p *PgBouncer) accRow(row scanner, columns []string) (map[string]string,
map[string]*interface{}, error) {
var dbname bytes.Buffer
// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})
for _, column := range columns {
columnMap[column] = new(interface{})
}
columnVars := make([]interface{}, 0, len(columnMap))
// populate the array of interface{} with the pointers in the right order
for i := 0; i < len(columnMap); i++ {
columnVars = append(columnVars, columnMap[columns[i]])
}
// deconstruct array of variables and send to Scan
err := row.Scan(columnVars...)
func (p *PgBouncer) showDatabase(acc telegraf.Accumulator) error {
// DATABASES
rows, err := p.DB.Query(`SHOW DATABASES`)
if err != nil {
return nil, nil, err
}
if columnMap["database"] != nil {
// extract the database name from the column map
if _, err := dbname.WriteString((*columnMap["database"]).(string)); err != nil {
return nil, nil, err
}
} else {
if _, err := dbname.WriteString("postgres"); err != nil {
return nil, nil, err
}
return fmt.Errorf("execution error 'show database': %w", err)
}
defer rows.Close()
var tagAddress string
tagAddress, err = p.SanitizedAddress()
// grab the column information from the result
columns, err := rows.Columns()
if err != nil {
return nil, nil, err
return fmt.Errorf("don't get column names 'show database': %w", err)
}
// Return basic tags and the mapped columns
return map[string]string{"server": tagAddress, "db": dbname.String()}, columnMap, nil
for rows.Next() {
tags, columnMap, err := p.accRow(rows, columns)
if err != nil {
return err
}
// SHOW DATABASES displays pgbouncer database name under name column,
// while using database column to store Postgres database name.
if database, ok := columnMap["database"]; ok {
if s, ok := (*database).(string); ok && s != "" {
tags["pg_dbname"] = s
}
}
// pass it under db tag to be compatible with the rest of the measurements
if name, ok := columnMap["name"]; ok {
if s, ok := (*name).(string); ok && s != "" {
tags["db"] = s
}
}
fields := make(map[string]interface{})
for col, val := range columnMap {
_, ignore := ignoredColumns[col]
if !ignore {
fields[col] = *val
}
}
acc.AddFields("pgbouncer_databases", fields, tags)
}
return rows.Err()
}
func init() {

View File

@ -34,7 +34,7 @@ func TestPgBouncerGeneratesMetricsIntegration(t *testing.T) {
defer backend.Terminate()
container := testutil.Container{
Image: "z9pascal/pgbouncer-container:1.17.0-latest",
Image: "z9pascal/pgbouncer-container:1.18.0-latest",
ExposedPorts: []string{pgBouncerServicePort},
Env: map[string]string{
"PG_ENV_POSTGRESQL_USER": "pgbouncer",
@ -66,13 +66,6 @@ func TestPgBouncerGeneratesMetricsIntegration(t *testing.T) {
require.NoError(t, p.Start(&acc))
require.NoError(t, p.Gather(&acc))
// Return value of pgBouncer
// [pgbouncer map[db:pgbouncer server:host=localhost user=pgbouncer dbname=pgbouncer port=6432 ]
// map[avg_query_count:0 avg_query_time:0 avg_wait_time:0 avg_xact_count:0 avg_xact_time:0 total_query_count:3 total_query_time:0 total_received:0
// total_sent:0 total_wait_time:0 total_xact_count:3 total_xact_time:0] 1620163750039747891 pgbouncer_pools map[db:pgbouncer pool_mode:statement
// server:host=localhost user=pgbouncer dbname=pgbouncer port=6432 user:pgbouncer] map[cl_active:1 cl_waiting:0 maxwait:0 maxwait_us:0
// sv_active:0 sv_idle:0 sv_login:0 sv_tested:0 sv_used:0] 1620163750041444466]
intMetricsPgBouncer := []string{
"total_received",
"total_sent",
@ -93,8 +86,6 @@ func TestPgBouncerGeneratesMetricsIntegration(t *testing.T) {
"maxwait",
}
int32Metrics := []string{}
metricsCounted := 0
for _, metric := range intMetricsPgBouncer {
@ -107,11 +98,116 @@ func TestPgBouncerGeneratesMetricsIntegration(t *testing.T) {
metricsCounted++
}
for _, metric := range int32Metrics {
require.True(t, acc.HasInt32Field("pgbouncer", metric))
require.True(t, metricsCounted > 0)
require.Equal(t, len(intMetricsPgBouncer)+len(intMetricsPgBouncerPools), metricsCounted)
}
func TestPgBouncerGeneratesMetricsIntegrationShowCommands(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
postgresServicePort := "5432"
pgBouncerServicePort := "6432"
backend := testutil.Container{
Image: "postgres:alpine",
ExposedPorts: []string{postgresServicePort},
Env: map[string]string{
"POSTGRES_HOST_AUTH_METHOD": "trust",
},
WaitingFor: wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
}
err := backend.Start()
require.NoError(t, err, "failed to start container")
defer backend.Terminate()
container := testutil.Container{
Image: "z9pascal/pgbouncer-container:1.18.0-latest",
ExposedPorts: []string{pgBouncerServicePort},
Env: map[string]string{
"PG_ENV_POSTGRESQL_USER": "pgbouncer",
"PG_ENV_POSTGRESQL_PASS": "pgbouncer",
},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(pgBouncerServicePort)),
wait.ForLog("LOG process up"),
),
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer container.Terminate()
addr := fmt.Sprintf(
"host=%s user=pgbouncer password=pgbouncer dbname=pgbouncer port=%s sslmode=disable",
container.Address,
container.Ports[pgBouncerServicePort],
)
p := &PgBouncer{
Service: postgresql.Service{
Address: config.NewSecret([]byte(addr)),
IsPgBouncer: true,
},
ShowCommands: []string{"pools", "lists", "databases"},
}
var acc testutil.Accumulator
require.NoError(t, p.Start(&acc))
require.NoError(t, p.Gather(&acc))
intMetricsPgBouncerPools := []string{
"cl_active",
"cl_waiting",
"sv_active",
"sv_idle",
"sv_used",
"sv_tested",
"sv_login",
"maxwait",
}
intMetricsPgBouncerLists := []string{
"databases",
"users",
"pools",
"free_clients",
"used_clients",
"login_clients",
"free_servers",
"used_servers",
"dns_names",
"dns_zones",
"dns_queries",
}
intMetricsPgBouncerDatabases := []string{
"pool_size",
"min_pool_size",
"reserve_pool",
"max_connections",
"current_connections",
"paused",
"disabled",
}
metricsCounted := 0
for _, metric := range intMetricsPgBouncerPools {
require.True(t, acc.HasInt64Field("pgbouncer_pools", metric))
metricsCounted++
}
for _, metric := range intMetricsPgBouncerLists {
require.True(t, acc.HasInt64Field("pgbouncer_lists", metric))
metricsCounted++
}
for _, metric := range intMetricsPgBouncerDatabases {
require.True(t, acc.HasInt64Field("pgbouncer_databases", metric))
metricsCounted++
}
require.True(t, metricsCounted > 0)
require.Equal(t, len(intMetricsPgBouncer)+len(intMetricsPgBouncerPools)+len(int32Metrics), metricsCounted)
require.Equal(t, len(intMetricsPgBouncerPools)+len(intMetricsPgBouncerLists)+len(intMetricsPgBouncerDatabases), metricsCounted)
}

View File

@ -9,3 +9,7 @@
## All connection parameters are optional.
##
address = "host=localhost user=pgbouncer sslmode=disable"
## Specify which "show" commands to gather metrics for.
## Choose from: "stats", "pools", "lists", "databases"
# show_commands = ["stats", "pools"]