feat(outputs.postgresql): Allow configuration of startup error handling (#15073)
This commit is contained in:
parent
4613c81be7
commit
506698056e
|
|
@ -12,6 +12,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
|
|
||||||
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
||||||
|
|
||||||
|
## Startup error behavior options <!-- @/docs/includes/startup_error_behavior.md -->
|
||||||
|
|
||||||
|
In addition to the plugin-specific and global configuration settings the plugin
|
||||||
|
supports options for specifying the behavior when experiencing startup errors
|
||||||
|
using the `startup_error_behavior` setting. Available values are:
|
||||||
|
|
||||||
|
- `error`: Telegraf with stop and exit in case of startup errors. This is the
|
||||||
|
default behavior.
|
||||||
|
- `ignore`: Telegraf will ignore startup errors for this plugin and disables it
|
||||||
|
but continues processing for all other plugins.
|
||||||
|
- `retry`: Telegraf will try to startup the plugin in every gather or write
|
||||||
|
cycle in case of startup errors. The plugin is disabled until
|
||||||
|
the startup succeeds.
|
||||||
|
|
||||||
## Secret-store support
|
## Secret-store support
|
||||||
|
|
||||||
This plugin supports secrets from secret-stores for the `connection` option.
|
This plugin supports secrets from secret-stores for the `connection` option.
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/models"
|
"github.com/influxdata/telegraf/models"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate"
|
"github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate"
|
||||||
|
|
@ -72,33 +73,8 @@ type Postgresql struct {
|
||||||
tagsJSONColumn utils.Column
|
tagsJSONColumn utils.Column
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func (p *Postgresql) SampleConfig() string {
|
||||||
outputs.Add("postgresql", func() telegraf.Output { return newPostgresql() })
|
return sampleConfig
|
||||||
}
|
|
||||||
|
|
||||||
func newPostgresql() *Postgresql {
|
|
||||||
p := &Postgresql{
|
|
||||||
Schema: "public",
|
|
||||||
TagTableSuffix: "_tag",
|
|
||||||
TagCacheSize: 100000,
|
|
||||||
Uint64Type: PgNumeric,
|
|
||||||
CreateTemplates: []*sqltemplate.Template{{}},
|
|
||||||
AddColumnTemplates: []*sqltemplate.Template{{}},
|
|
||||||
TagTableCreateTemplates: []*sqltemplate.Template{{}},
|
|
||||||
TagTableAddColumnTemplates: []*sqltemplate.Template{{}},
|
|
||||||
RetryMaxBackoff: config.Duration(time.Second * 15),
|
|
||||||
Logger: models.NewLogger("outputs", "postgresql", ""),
|
|
||||||
LogLevel: "warn",
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = p.CreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }})`))
|
|
||||||
_ = p.AddColumnTemplates[0].UnmarshalText([]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`))
|
|
||||||
_ = p.TagTableCreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))`))
|
|
||||||
_ = p.TagTableAddColumnTemplates[0].UnmarshalText(
|
|
||||||
[]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`),
|
|
||||||
)
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Postgresql) Init() error {
|
func (p *Postgresql) Init() error {
|
||||||
|
|
@ -169,8 +145,6 @@ func (p *Postgresql) Init() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Postgresql) SampleConfig() string { return sampleConfig }
|
|
||||||
|
|
||||||
// Connect establishes a connection to the target database and prepares the cache
|
// Connect establishes a connection to the target database and prepares the cache
|
||||||
func (p *Postgresql) Connect() error {
|
func (p *Postgresql) Connect() error {
|
||||||
// Yes, we're not supposed to store the context. However since we don't receive a context, we have to.
|
// Yes, we're not supposed to store the context. However since we don't receive a context, we have to.
|
||||||
|
|
@ -178,8 +152,11 @@ func (p *Postgresql) Connect() error {
|
||||||
var err error
|
var err error
|
||||||
p.db, err = pgxpool.ConnectConfig(p.dbContext, p.dbConfig)
|
p.db, err = pgxpool.ConnectConfig(p.dbContext, p.dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.Logger.Errorf("Couldn't connect to server\n%v", err)
|
p.dbContextCancel()
|
||||||
return err
|
return &internal.StartupError{
|
||||||
|
Err: err,
|
||||||
|
Retry: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
p.tableManager = NewTableManager(p)
|
p.tableManager = NewTableManager(p)
|
||||||
|
|
||||||
|
|
@ -234,7 +211,9 @@ func (p *Postgresql) Close() error {
|
||||||
|
|
||||||
// Die!
|
// Die!
|
||||||
p.dbContextCancel()
|
p.dbContextCancel()
|
||||||
p.db.Close()
|
if p.db != nil {
|
||||||
|
p.db.Close()
|
||||||
|
}
|
||||||
p.tableManager = nil
|
p.tableManager = nil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -493,3 +472,32 @@ func (p *Postgresql) writeTagTable(ctx context.Context, db dbh, tableSource *Tab
|
||||||
ttsrc.UpdateCache()
|
ttsrc.UpdateCache()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newPostgresql() *Postgresql {
|
||||||
|
p := &Postgresql{
|
||||||
|
Schema: "public",
|
||||||
|
TagTableSuffix: "_tag",
|
||||||
|
TagCacheSize: 100000,
|
||||||
|
Uint64Type: PgNumeric,
|
||||||
|
CreateTemplates: []*sqltemplate.Template{{}},
|
||||||
|
AddColumnTemplates: []*sqltemplate.Template{{}},
|
||||||
|
TagTableCreateTemplates: []*sqltemplate.Template{{}},
|
||||||
|
TagTableAddColumnTemplates: []*sqltemplate.Template{{}},
|
||||||
|
RetryMaxBackoff: config.Duration(time.Second * 15),
|
||||||
|
Logger: models.NewLogger("outputs", "postgresql", ""),
|
||||||
|
LogLevel: "warn",
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = p.CreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }})`))
|
||||||
|
_ = p.AddColumnTemplates[0].UnmarshalText([]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`))
|
||||||
|
_ = p.TagTableCreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))`))
|
||||||
|
_ = p.TagTableAddColumnTemplates[0].UnmarshalText(
|
||||||
|
[]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`),
|
||||||
|
)
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("postgresql", func() telegraf.Output { return newPostgresql() })
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,9 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/models"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
@ -272,6 +274,80 @@ func TestPostgresqlConnectIntegration(t *testing.T) {
|
||||||
require.EqualValues(t, 2, p.db.Stat().MaxConns())
|
require.EqualValues(t, 2, p.db.Stat().MaxConns())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConnectionIssueAtStartup(t *testing.T) {
|
||||||
|
// Test case for https://github.com/influxdata/telegraf/issues/14365
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
servicePort := "5432"
|
||||||
|
username := "postgres"
|
||||||
|
password := "postgres"
|
||||||
|
testDatabaseName := "telegraf_test"
|
||||||
|
|
||||||
|
container := testutil.Container{
|
||||||
|
Image: "postgres:alpine",
|
||||||
|
ExposedPorts: []string{servicePort},
|
||||||
|
Env: map[string]string{
|
||||||
|
"POSTGRES_USER": username,
|
||||||
|
"POSTGRES_PASSWORD": password,
|
||||||
|
"POSTGRES_DB": "telegraf_test",
|
||||||
|
},
|
||||||
|
WaitingFor: wait.ForAll(
|
||||||
|
// the database comes up twice, once right away, then again a second
|
||||||
|
// time after the docker entrypoint starts configuration
|
||||||
|
wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
|
||||||
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
require.NoError(t, container.Start(), "failed to start container")
|
||||||
|
defer container.Terminate()
|
||||||
|
|
||||||
|
// Pause the container for connectivity issues
|
||||||
|
require.NoError(t, container.Pause())
|
||||||
|
|
||||||
|
// Create a model to be able to use the startup retry strategy
|
||||||
|
dsn := config.NewSecret([]byte(fmt.Sprintf(
|
||||||
|
"host=%s port=%s user=%s password=%s dbname=%s connect_timeout=1",
|
||||||
|
container.Address,
|
||||||
|
container.Ports[servicePort],
|
||||||
|
username,
|
||||||
|
password,
|
||||||
|
testDatabaseName,
|
||||||
|
)))
|
||||||
|
defer dsn.Destroy()
|
||||||
|
plugin := newPostgresql()
|
||||||
|
plugin.Connection = dsn
|
||||||
|
plugin.Logger = NewLogAccumulator(t)
|
||||||
|
plugin.LogLevel = "debug"
|
||||||
|
model := models.NewRunningOutput(
|
||||||
|
plugin,
|
||||||
|
&models.OutputConfig{
|
||||||
|
Name: "postgres",
|
||||||
|
StartupErrorBehavior: "retry",
|
||||||
|
},
|
||||||
|
1000, 1000,
|
||||||
|
)
|
||||||
|
require.NoError(t, model.Init())
|
||||||
|
|
||||||
|
// The connect call should succeed even though the table creation was not
|
||||||
|
// successful due to the "retry" strategy
|
||||||
|
require.NoError(t, model.Connect())
|
||||||
|
|
||||||
|
// Writing the metrics in this state should fail because we are not fully
|
||||||
|
// started up
|
||||||
|
metrics := testutil.MockMetrics()
|
||||||
|
for _, m := range metrics {
|
||||||
|
model.AddMetric(m)
|
||||||
|
}
|
||||||
|
require.ErrorIs(t, model.WriteBatch(), internal.ErrNotConnected)
|
||||||
|
|
||||||
|
// Unpause the container, now writes should succeed
|
||||||
|
require.NoError(t, container.Resume())
|
||||||
|
require.NoError(t, model.WriteBatch())
|
||||||
|
model.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func newMetric(
|
func newMetric(
|
||||||
t *testing.T,
|
t *testing.T,
|
||||||
suffix string,
|
suffix string,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue