feat(outputs.cratedb): Allow configuration of startup error handling (#15065)

This commit is contained in:
Sven Rebhan 2024-03-27 17:12:38 +01:00 committed by GitHub
parent 212822e85d
commit 4f3f9c3fcf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 171 additions and 47 deletions

View File

@ -36,15 +36,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf ```toml @sample.conf
# Configuration for CrateDB to send metrics to. # Configuration for CrateDB to send metrics to.
[[outputs.cratedb]] [[outputs.cratedb]]
# A github.com/jackc/pgx/v4 connection string. ## Connection parameters for accessing the database see
# See https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig ## https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig
## for available options
url = "postgres://user:password@localhost/schema?sslmode=disable" url = "postgres://user:password@localhost/schema?sslmode=disable"
# Timeout for all CrateDB queries.
timeout = "5s" ## Timeout for all CrateDB queries.
# Name of the table to store metrics in. # timeout = "5s"
table = "metrics"
# If true, and the metrics table does not exist, create it automatically. ## Name of the table to store metrics in.
table_create = true # table = "metrics"
# The character(s) to replace any '.' in an object key with
key_separator = "_" ## If true, and the metrics table does not exist, create it automatically.
# table_create = false
## The character(s) to replace any '.' in an object key with
# key_separator = "_"
``` ```

View File

@ -17,6 +17,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@ -25,26 +26,8 @@ var sampleConfig string
const MaxInt64 = int64(^uint64(0) >> 1) const MaxInt64 = int64(^uint64(0) >> 1)
type CrateDB struct { const tableCreationQuery = `
URL string CREATE TABLE IF NOT EXISTS %s (
Timeout config.Duration
Table string
TableCreate bool `toml:"table_create"`
KeySeparator string `toml:"key_separator"`
DB *sql.DB
}
func (*CrateDB) SampleConfig() string {
return sampleConfig
}
func (c *CrateDB) Connect() error {
db, err := sql.Open("pgx", c.URL)
if err != nil {
return err
} else if c.TableCreate {
query := `
CREATE TABLE IF NOT EXISTS ` + c.Table + ` (
"hash_id" LONG INDEX OFF, "hash_id" LONG INDEX OFF,
"timestamp" TIMESTAMP, "timestamp" TIMESTAMP,
"name" STRING, "name" STRING,
@ -54,13 +37,52 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` (
PRIMARY KEY ("timestamp", "hash_id","day") PRIMARY KEY ("timestamp", "hash_id","day")
) PARTITIONED BY("day"); ) PARTITIONED BY("day");
` `
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout))
defer cancel() type CrateDB struct {
if _, err := db.ExecContext(ctx, query); err != nil { URL string `toml:"url"`
Timeout config.Duration `toml:"timeout"`
Table string `toml:"table"`
TableCreate bool `toml:"table_create"`
KeySeparator string `toml:"key_separator"`
db *sql.DB
}
func (*CrateDB) SampleConfig() string {
return sampleConfig
}
func (c *CrateDB) Init() error {
// Set defaults
if c.KeySeparator == "" {
c.KeySeparator = "_"
}
if c.Table == "" {
c.Table = "metrics"
}
return nil
}
func (c *CrateDB) Connect() error {
if c.db == nil {
db, err := sql.Open("pgx", c.URL)
if err != nil {
return err return err
} }
c.db = db
} }
c.DB = db
if c.TableCreate {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout))
defer cancel()
query := fmt.Sprintf(tableCreationQuery, c.Table)
if _, err := c.db.ExecContext(ctx, query); err != nil {
return &internal.StartupError{Err: err, Retry: true}
}
}
return nil return nil
} }
@ -73,7 +95,7 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error {
return err return err
} }
_, err = c.DB.ExecContext(ctx, generatedSQL) _, err = c.db.ExecContext(ctx, generatedSQL)
if err != nil { if err != nil {
return err return err
} }
@ -225,7 +247,10 @@ func hashID(m telegraf.Metric) int64 {
} }
func (c *CrateDB) Close() error { func (c *CrateDB) Close() error {
return c.DB.Close() if c.db == nil {
return nil
}
return c.db.Close()
} }
func init() { func init() {

View File

@ -12,7 +12,9 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -46,7 +48,6 @@ func TestConnectAndWriteIntegration(t *testing.T) {
defer container.Terminate() defer container.Terminate()
url := fmt.Sprintf("postgres://crate@%s:%s/test", container.Address, container.Ports[servicePort]) url := fmt.Sprintf("postgres://crate@%s:%s/test", container.Address, container.Ports[servicePort])
fmt.Println(url)
db, err := sql.Open("pgx", url) db, err := sql.Open("pgx", url)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -77,6 +78,76 @@ func TestConnectAndWriteIntegration(t *testing.T) {
require.NoError(t, c.Close()) require.NoError(t, c.Close())
} }
func TestConnectionIssueAtStartup(t *testing.T) {
// Test case for https://github.com/influxdata/telegraf/issues/13278
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := testutil.Container{
Image: "crate",
ExposedPorts: []string{servicePort},
Entrypoint: []string{
"/docker-entrypoint.sh",
"-Cdiscovery.type=single-node",
},
WaitingFor: wait.ForAll(
wait.ForListeningPort(servicePort),
wait.ForLog("recovered [0] indices into cluster_state"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
url := fmt.Sprintf("postgres://crate@%s:%s/test", container.Address, container.Ports[servicePort])
// Pause the container for connectivity issues
require.NoError(t, container.Pause())
// Create a model to be able to use the startup retry strategy
plugin := &CrateDB{
URL: url,
Table: "testing",
Timeout: config.Duration(time.Second * 5),
TableCreate: true,
}
model := models.NewRunningOutput(
plugin,
&models.OutputConfig{
Name: "cratedb",
StartupErrorBehavior: "retry",
},
1000, 1000,
)
require.NoError(t, model.Init())
// The connect call should succeed even though the table creation was not
// successful due to the "retry" strategy
require.NoError(t, model.Connect())
// Writing the metrics in this state should fail because we are not fully
// started up
metrics := testutil.MockMetrics()
for _, m := range metrics {
model.AddMetric(m)
}
require.ErrorIs(t, model.WriteBatch(), internal.ErrNotConnected)
// Unpause the container, now writes should succeed
require.NoError(t, container.Resume())
require.NoError(t, model.WriteBatch())
defer model.Close()
// Verify that the metrics were actually written
for _, m := range metrics {
mid := hashID(m)
row := plugin.db.QueryRow("SELECT hash_id FROM testing WHERE hash_id = ? AND timestamp = ?", mid, m.Time())
var id int64
require.NoError(t, row.Scan(&id))
require.Equal(t, id, mid)
}
}
func TestInsertSQL(t *testing.T) { func TestInsertSQL(t *testing.T) {
tests := []struct { tests := []struct {
Metrics []telegraf.Metric Metrics []telegraf.Metric

View File

@ -1,13 +1,18 @@
# Configuration for CrateDB to send metrics to. # Configuration for CrateDB to send metrics to.
[[outputs.cratedb]] [[outputs.cratedb]]
# A github.com/jackc/pgx/v4 connection string. ## Connection parameters for accessing the database see
# See https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig ## https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig
## for available options
url = "postgres://user:password@localhost/schema?sslmode=disable" url = "postgres://user:password@localhost/schema?sslmode=disable"
# Timeout for all CrateDB queries.
timeout = "5s" ## Timeout for all CrateDB queries.
# Name of the table to store metrics in. # timeout = "5s"
table = "metrics"
# If true, and the metrics table does not exist, create it automatically. ## Name of the table to store metrics in.
table_create = true # table = "metrics"
# The character(s) to replace any '.' in an object key with
key_separator = "_" ## If true, and the metrics table does not exist, create it automatically.
# table_create = false
## The character(s) to replace any '.' in an object key with
# key_separator = "_"

View File

@ -156,3 +156,21 @@ func (c *Container) Terminate() {
c.PrintLogs() c.PrintLogs()
} }
func (c *Container) Pause() error {
provider, err := testcontainers.NewDockerProvider()
if err != nil {
return fmt.Errorf("getting provider failed: %w", err)
}
return provider.Client().ContainerPause(c.ctx, c.container.GetContainerID())
}
func (c *Container) Resume() error {
provider, err := testcontainers.NewDockerProvider()
if err != nil {
return fmt.Errorf("getting provider failed: %w", err)
}
return provider.Client().ContainerUnpause(c.ctx, c.container.GetContainerID())
}