feat(outputs.postgresql): Add secret store support (#15041)
This commit is contained in:
parent
a6c3ae5238
commit
57ed9fd330
|
|
@ -12,6 +12,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
|
|
||||||
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
||||||
|
|
||||||
|
## Secret-store support
|
||||||
|
|
||||||
|
This plugin supports secrets from secret-stores for the `connection` option.
|
||||||
|
See the [secret-store documentation][SECRETSTORE] for more details on how
|
||||||
|
to use them.
|
||||||
|
|
||||||
|
[SECRETSTORE]: ../../../docs/CONFIGURATION.md#secret-store-secrets
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
```toml @sample.conf
|
```toml @sample.conf
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ type dbh interface {
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
type Postgresql struct {
|
type Postgresql struct {
|
||||||
Connection string `toml:"connection"`
|
Connection config.Secret `toml:"connection"`
|
||||||
Schema string `toml:"schema"`
|
Schema string `toml:"schema"`
|
||||||
TagsAsForeignKeys bool `toml:"tags_as_foreign_keys"`
|
TagsAsForeignKeys bool `toml:"tags_as_foreign_keys"`
|
||||||
TagTableSuffix string `toml:"tag_table_suffix"`
|
TagTableSuffix string `toml:"tag_table_suffix"`
|
||||||
|
|
@ -130,11 +130,17 @@ func (p *Postgresql) Init() error {
|
||||||
p.fieldsJSONColumn = utils.Column{Name: "fields", Type: PgJSONb, Role: utils.FieldColType}
|
p.fieldsJSONColumn = utils.Column{Name: "fields", Type: PgJSONb, Role: utils.FieldColType}
|
||||||
p.tagsJSONColumn = utils.Column{Name: "tags", Type: PgJSONb, Role: utils.TagColType}
|
p.tagsJSONColumn = utils.Column{Name: "tags", Type: PgJSONb, Role: utils.TagColType}
|
||||||
|
|
||||||
var err error
|
connectionSecret, err := p.Connection.Get()
|
||||||
if p.dbConfig, err = pgxpool.ParseConfig(p.Connection); err != nil {
|
if err != nil {
|
||||||
|
return fmt.Errorf("getting address failed: %w", err)
|
||||||
|
}
|
||||||
|
connection := connectionSecret.String()
|
||||||
|
defer connectionSecret.Destroy()
|
||||||
|
|
||||||
|
if p.dbConfig, err = pgxpool.ParseConfig(connection); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
parsedConfig, _ := pgx.ParseConfig(p.Connection)
|
parsedConfig, _ := pgx.ParseConfig(connection)
|
||||||
if _, ok := parsedConfig.Config.RuntimeParams["pool_max_conns"]; !ok {
|
if _, ok := parsedConfig.Config.RuntimeParams["pool_max_conns"]; !ok {
|
||||||
// The pgx default for pool_max_conns is 4. However we want to default to 1.
|
// The pgx default for pool_max_conns is 4. However we want to default to 1.
|
||||||
p.dbConfig.MaxConns = 1
|
p.dbConfig.MaxConns = 1
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,9 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkPostgresql_sequential(b *testing.B) {
|
func BenchmarkPostgresql_sequential(b *testing.B) {
|
||||||
|
|
@ -23,7 +25,12 @@ func BenchmarkPostgresql_concurrent(b *testing.B) {
|
||||||
|
|
||||||
func benchmarkPostgresql(b *testing.B, gen <-chan []telegraf.Metric, concurrency int, foreignTags bool) {
|
func benchmarkPostgresql(b *testing.B, gen <-chan []telegraf.Metric, concurrency int, foreignTags bool) {
|
||||||
p := newPostgresqlTest(b)
|
p := newPostgresqlTest(b)
|
||||||
p.Connection += fmt.Sprintf(" pool_max_conns=%d", concurrency)
|
|
||||||
|
connection, err := p.Connection.Get()
|
||||||
|
require.NoError(b, err)
|
||||||
|
p.Connection = config.NewSecret([]byte(connection.String() + fmt.Sprintf(" pool_max_conns=%d", concurrency)))
|
||||||
|
connection.Destroy()
|
||||||
|
|
||||||
p.TagsAsForeignKeys = foreignTags
|
p.TagsAsForeignKeys = foreignTags
|
||||||
p.LogLevel = ""
|
p.LogLevel = ""
|
||||||
_ = p.Init()
|
_ = p.Init()
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
@ -231,7 +232,7 @@ func newPostgresqlTest(tb testing.TB) *PostgresqlTest {
|
||||||
require.NoError(tb, err, "failed to start container")
|
require.NoError(tb, err, "failed to start container")
|
||||||
|
|
||||||
p := newPostgresql()
|
p := newPostgresql()
|
||||||
p.Connection = fmt.Sprintf(
|
connection := fmt.Sprintf(
|
||||||
"host=%s port=%s user=%s password=%s dbname=%s",
|
"host=%s port=%s user=%s password=%s dbname=%s",
|
||||||
container.Address,
|
container.Address,
|
||||||
container.Ports[servicePort],
|
container.Ports[servicePort],
|
||||||
|
|
@ -239,6 +240,7 @@ func newPostgresqlTest(tb testing.TB) *PostgresqlTest {
|
||||||
password,
|
password,
|
||||||
testDatabaseName,
|
testDatabaseName,
|
||||||
)
|
)
|
||||||
|
p.Connection = config.NewSecret([]byte(connection))
|
||||||
logger := NewLogAccumulator(tb)
|
logger := NewLogAccumulator(tb)
|
||||||
p.Logger = logger
|
p.Logger = logger
|
||||||
p.LogLevel = "debug"
|
p.LogLevel = "debug"
|
||||||
|
|
@ -260,7 +262,11 @@ func TestPostgresqlConnectIntegration(t *testing.T) {
|
||||||
require.EqualValues(t, 1, p.db.Stat().MaxConns())
|
require.EqualValues(t, 1, p.db.Stat().MaxConns())
|
||||||
|
|
||||||
p = newPostgresqlTest(t)
|
p = newPostgresqlTest(t)
|
||||||
p.Connection += " pool_max_conns=2"
|
connection, err := p.Connection.Get()
|
||||||
|
require.NoError(t, err)
|
||||||
|
p.Connection = config.NewSecret([]byte(connection.String() + " pool_max_conns=2"))
|
||||||
|
connection.Destroy()
|
||||||
|
|
||||||
_ = p.Init()
|
_ = p.Init()
|
||||||
require.NoError(t, p.Connect())
|
require.NoError(t, p.Connect())
|
||||||
require.EqualValues(t, 2, p.db.Stat().MaxConns())
|
require.EqualValues(t, 2, p.db.Stat().MaxConns())
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue