feat(outputs.sql): Add option to automate table schema updates (#16544)

This commit is contained in:
hautecodure 2025-04-01 21:51:20 +03:00 committed by GitHub
parent 080e9a1338
commit 49dc1270a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 478 additions and 10 deletions

View File

@ -40,8 +40,8 @@ driver selected.
Through the nature of the inputs plugins, the amounts of columns inserted within
rows for a given metric may differ. Since the tables are created based on the
tags and fields available within an input metric, it's possible the created
table won't contain all the necessary columns. You might need to initialize
the schema yourself, to avoid this scenario.
table won't contain all the necessary columns. If you wish to automate table
updates, check out the [Schema updates](#schema-updates) section for more info.
## Advanced options
@ -110,6 +110,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## {TABLE} - tablename as a quoted identifier
# table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"
## Table update template, available template variables:
## {TABLE} - table name as a quoted identifier
## {COLUMN} - column definition (quoted identifier and type)
## NOTE: Ensure the user (you're using to write to the database) has necessary permissions
##
## Use the following setting for automatically adding columns:
## table_update_template = "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}"
# table_update_template = ""
## Initialization SQL
# init_sql = ""
@ -155,6 +164,26 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# # conversion_style = "unsigned_suffix"
```
## Schema updates
The default behavior of this plugin is to create a schema for the table,
based on the current metric (for both fields and tags). However, writing
subsequent metrics with additional fields or tags will result in errors.
If you wish the plugin to sync the column-schema for every metric,
specify the `table_update_template` setting in your config file.
> [!NOTE] The following snippet contains a generic query that your
> database may (or may not) support. Consult your database's
> documentation for proper syntax and table / column options.
```toml
# Save metrics to an SQL Database
[[outputs.sql]]
## Table update template
table_update_template = "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}"
```
## Driver-specific information
### go-sql-driver/mysql

View File

@ -29,6 +29,15 @@
## {TABLE} - tablename as a quoted identifier
# table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"
## Table update template, available template variables:
## {TABLE} - table name as a quoted identifier
## {COLUMN} - column definition (quoted identifier and type)
## NOTE: Ensure the user (you're using to write to the database) has necessary permissions
##
## Use the following setting for automatically adding columns:
## table_update_template = "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}"
# table_update_template = ""
## Initialization SQL
# init_sql = ""

View File

@ -53,6 +53,7 @@ type SQL struct {
TimestampColumn string `toml:"timestamp_column"`
TableTemplate string `toml:"table_template"`
TableExistsTemplate string `toml:"table_exists_template"`
TableUpdateTemplate string `toml:"table_update_template"`
InitSQL string `toml:"init_sql"`
Convert ConvertStruct `toml:"convert"`
ConnectionMaxIdleTime config.Duration `toml:"connection_max_idle_time"`
@ -61,8 +62,9 @@ type SQL struct {
ConnectionMaxOpen int `toml:"connection_max_open"`
Log telegraf.Logger `toml:"-"`
db *gosql.DB
tables map[string]bool
db *gosql.DB
tables map[string]map[string]bool
tableListColumnsTemplate string
}
func (*SQL) SampleConfig() string {
@ -83,6 +85,11 @@ func (p *SQL) Init() error {
}
}
p.tableListColumnsTemplate = "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME={TABLE}"
if p.Driver == "sqlite" {
p.tableListColumnsTemplate = "SELECT name AS column_name FROM pragma_table_info({TABLE})"
}
// Check for a valid driver
switch p.Driver {
case "clickhouse":
@ -119,7 +126,7 @@ func (p *SQL) Connect() error {
}
p.db = db
p.tables = make(map[string]bool)
p.tables = make(map[string]map[string]bool)
return nil
}
@ -209,6 +216,14 @@ func (p *SQL) generateCreateTable(metric telegraf.Metric) string {
return query
}
func (p *SQL) generateAddColumn(tablename, column, columnType string) string {
query := p.TableUpdateTemplate
query = strings.ReplaceAll(query, "{TABLE}", quoteIdent(tablename))
query = strings.ReplaceAll(query, "{COLUMN}", quoteIdent(column)+" "+columnType)
return query
}
func (p *SQL) generateInsert(tablename string, columns []string) string {
placeholders := make([]string, 0, len(columns))
quotedColumns := make([]string, 0, len(columns))
@ -233,6 +248,50 @@ func (p *SQL) generateInsert(tablename string, columns []string) string {
strings.Join(placeholders, ","))
}
func (p *SQL) createTable(metric telegraf.Metric) error {
tablename := metric.Name()
stmt := p.generateCreateTable(metric)
if _, err := p.db.Exec(stmt); err != nil {
return fmt.Errorf("creating table failed: %w", err)
}
// Ensure compatibility: set the table cache to an empty map
p.tables[tablename] = make(map[string]bool)
// Modifying the table schema is opt-in
if p.TableUpdateTemplate != "" {
if err := p.updateTableCache(tablename); err != nil {
return fmt.Errorf("updating table cache failed: %w", err)
}
}
return nil
}
func (p *SQL) createColumn(tablename, column, columnType string) error {
// Ensure table exists in cache before accessing columns
if _, tableExists := p.tables[tablename]; !tableExists {
if err := p.updateTableCache(tablename); err != nil {
return fmt.Errorf("updating table cache failed: %w", err)
}
}
// Ensure column existence check doesn't panic
if _, tableExists := p.tables[tablename]; !tableExists {
return fmt.Errorf("table %s does not exist in cache", tablename)
}
// Column already exists, nothing to do
if exists, colExists := p.tables[tablename][column]; colExists && exists {
return nil
}
// Generate and execute column addition statement
createColumn := p.generateAddColumn(tablename, column, columnType)
if _, err := p.db.Exec(createColumn); err != nil {
return fmt.Errorf("creating column failed: %w", err)
}
// Update cache after adding the column
if err := p.updateTableCache(tablename); err != nil {
return fmt.Errorf("updating table cache failed: %w", err)
}
return nil
}
func (p *SQL) tableExists(tableName string) bool {
stmt := strings.ReplaceAll(p.TableExistsTemplate, "{TABLE}", quoteIdent(tableName))
@ -240,6 +299,33 @@ func (p *SQL) tableExists(tableName string) bool {
return err == nil
}
func (p *SQL) updateTableCache(tablename string) error {
stmt := strings.ReplaceAll(p.tableListColumnsTemplate, "{TABLE}", quoteStr(tablename))
columns, err := p.db.Query(stmt)
if err != nil {
return fmt.Errorf("fetching columns for table(%s) failed: %w", tablename, err)
}
defer columns.Close()
if p.tables[tablename] == nil {
p.tables[tablename] = make(map[string]bool)
}
for columns.Next() {
var columnName string
if err := columns.Scan(&columnName); err != nil {
return err
}
if !p.tables[tablename][columnName] {
p.tables[tablename][columnName] = true
}
}
return nil
}
func (p *SQL) Write(metrics []telegraf.Metric) error {
var err error
@ -247,14 +333,11 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
tablename := metric.Name()
// create table if needed
if !p.tables[tablename] && !p.tableExists(tablename) {
createStmt := p.generateCreateTable(metric)
_, err := p.db.Exec(createStmt)
if err != nil {
if _, found := p.tables[tablename]; !found && !p.tableExists(tablename) {
if err := p.createTable(metric); err != nil {
return err
}
}
p.tables[tablename] = true
var columns []string
var values []interface{}
@ -274,6 +357,15 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
values = append(values, value)
}
// Modifying the table schema is opt-in
if p.TableUpdateTemplate != "" {
for i := range len(columns) {
if err := p.createColumn(tablename, columns[i], p.deriveDatatype(values[i])); err != nil {
return err
}
}
}
sql := p.generateInsert(tablename, columns)
switch p.Driver {

View File

@ -121,6 +121,26 @@ var (
ts,
),
}
postCreateMetrics = []telegraf.Metric{
stableMetric(
"metric_one",
[]telegraf.Tag{
{
Key: "tag_add_after_create",
Value: "tag2",
},
},
[]telegraf.Field{
{
Key: "bool_add_after_create",
Value: true,
},
},
ts,
telegraf.Untyped,
),
}
)
func TestMysqlIntegration(t *testing.T) {
@ -208,6 +228,90 @@ func TestMysqlIntegration(t *testing.T) {
}
}
func TestMysqlUpdateSchemeIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
initdb, err := filepath.Abs("testdata/mariadb/initdb/script.sql")
require.NoError(t, err)
// initdb/script.sql creates this database
const dbname = "foo"
// The mariadb image lets you set the root password through an env
// var. We'll use root to insert and query test data.
const username = "root"
password := testutil.GetRandomString(32)
outDir := t.TempDir()
servicePort := "3306"
container := testutil.Container{
Image: "mariadb",
Env: map[string]string{
"MARIADB_ROOT_PASSWORD": password,
},
Files: map[string]string{
"/docker-entrypoint-initdb.d/script.sql": initdb,
"/out": outDir,
},
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("mariadbd: ready for connections.").WithOccurrence(2),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// use the plugin to write to the database
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
username, password, container.Address, container.Ports[servicePort], dbname,
)
p := &SQL{
Driver: "mysql",
DataSourceName: address,
Convert: defaultConvert,
InitSQL: "SET sql_mode='ANSI_QUOTES';",
TimestampColumn: "timestamp",
ConnectionMaxIdle: 2,
Log: testutil.Logger{},
TableUpdateTemplate: "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}",
}
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
require.NoError(t, p.Write(testMetrics))
// Write a metric that targets the same table but has additional fields
// to test the automatic column update functionality.
require.NoError(t, p.Write(postCreateMetrics))
fields := []string{
"`tag_add_after_create` text DEFAULT NULL",
"`bool_add_after_create` tinyint(1) DEFAULT NULL",
}
for _, column := range fields {
require.Eventually(t, func() bool {
rc, out, err := container.Exec([]string{
"bash",
"-c",
"mariadb-dump --user=" + username +
" --password=" + password +
" --compact" +
" --skip-opt " +
dbname,
})
require.NoError(t, err)
require.Equal(t, 0, rc)
b, err := io.ReadAll(out)
require.NoError(t, err)
return bytes.Contains(b, []byte(column))
}, 10*time.Second, 500*time.Millisecond, column)
}
}
func TestPostgresIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
@ -295,6 +399,100 @@ func TestPostgresIntegration(t *testing.T) {
}, 5*time.Second, 500*time.Millisecond)
}
func TestPostgresUpdateSchemeIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
initdb, err := filepath.Abs("testdata/postgres/initdb/init.sql")
require.NoError(t, err)
// initdb/init.sql creates this database
const dbname = "foo"
// default username for postgres is postgres
const username = "postgres"
password := testutil.GetRandomString(32)
outDir := t.TempDir()
servicePort := "5432"
container := testutil.Container{
Image: "postgres",
Env: map[string]string{
"POSTGRES_PASSWORD": password,
},
Files: map[string]string{
"/docker-entrypoint-initdb.d/script.sql": initdb,
"/out": outDir,
},
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// use the plugin to write to the database
// host, port, username, password, dbname
address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
username, password, container.Address, container.Ports[servicePort], dbname,
)
p := &SQL{
Driver: "pgx",
DataSourceName: address,
Convert: defaultConvert,
TimestampColumn: "timestamp",
ConnectionMaxIdle: 2,
Log: testutil.Logger{},
TableUpdateTemplate: "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}",
}
p.Convert.Real = "double precision"
p.Convert.Unsigned = "bigint"
p.Convert.ConversionStyle = "literal"
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
defer p.Close()
require.NoError(t, p.Write(testMetrics))
// Write a metric that targets the same table but has additional fields
// to test the automatic column update functionality.
require.NoError(t, p.Write(postCreateMetrics))
require.NoError(t, p.Close())
fields := []string{
"tag_add_after_create text",
"bool_add_after_create boolean",
}
for _, column := range fields {
require.Eventually(t, func() bool {
rc, out, err := container.Exec([]string{
"bash",
"-c",
"pg_dump" +
" --username=" + username +
" --no-comments" +
" " + dbname +
// pg_dump's output has comments that include build info
// of postgres and pg_dump. The build info changes with
// each release. To prevent these changes from causing the
// test to fail, we strip out comments. Also strip out
// blank lines.
"|grep -E -v '(^--|^$|^SET )'",
})
require.NoError(t, err)
require.Equal(t, 0, rc)
b, err := io.ReadAll(out)
require.NoError(t, err)
return bytes.Contains(b, []byte(column))
}, 5*time.Second, 500*time.Millisecond, column)
}
}
func TestClickHouseIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
@ -390,6 +588,101 @@ func TestClickHouseIntegration(t *testing.T) {
}
}
func TestClickHouseUpdateSchemeIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
logConfig, err := filepath.Abs("testdata/clickhouse/enable_stdout_log.xml")
require.NoError(t, err)
initdb, err := filepath.Abs("testdata/clickhouse/initdb/init.sql")
require.NoError(t, err)
// initdb/init.sql creates this database
const dbname = "foo"
// username for connecting to clickhouse
const username = "clickhouse"
password := testutil.GetRandomString(32)
outDir := t.TempDir()
servicePort := "9000"
container := testutil.Container{
Image: "clickhouse",
ExposedPorts: []string{servicePort, "8123"},
Env: map[string]string{
"CLICKHOUSE_USER": "clickhouse",
"CLICKHOUSE_PASSWORD": password,
},
Files: map[string]string{
"/docker-entrypoint-initdb.d/script.sql": initdb,
"/etc/clickhouse-server/config.d/enable_stdout_log.xml": logConfig,
"/out": outDir,
},
WaitingFor: wait.ForAll(
wait.NewHTTPStrategy("/").WithPort(nat.Port("8123")),
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("Ready for connections"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// use the plugin to write to the database
// host, port, username, password, dbname
address := fmt.Sprintf("tcp://%s:%s/%s?username=%s&password=%s",
container.Address, container.Ports[servicePort], dbname, username, password)
p := &SQL{
Driver: "clickhouse",
DataSourceName: address,
Convert: defaultConvert,
TimestampColumn: "timestamp",
ConnectionMaxIdle: 2,
Log: testutil.Logger{},
TableUpdateTemplate: "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}",
}
p.Convert.Integer = "Int64"
p.Convert.Text = "String"
p.Convert.Timestamp = "DateTime"
p.Convert.Defaultvalue = "String"
p.Convert.Unsigned = "UInt64"
p.Convert.Bool = "UInt8"
p.Convert.ConversionStyle = "literal"
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
require.NoError(t, p.Write(testMetrics))
// Write a metric that targets the same table but has additional fields
// to test the automatic column update functionality.
require.NoError(t, p.Write(postCreateMetrics))
fields := []string{
"`tag_add_after_create` String",
"`bool_add_after_create` UInt8",
}
for _, column := range fields {
require.Eventually(t, func() bool {
var out io.Reader
_, out, err = container.Exec([]string{
"bash",
"-c",
"clickhouse-client" +
" --user=" + username +
" --database=" + dbname +
" --format=TabSeparatedRaw" +
" --multiquery" +
` --query="SELECT * FROM "metric_one"; SHOW CREATE TABLE "metric_one""`,
})
require.NoError(t, err)
b, err := io.ReadAll(out)
require.NoError(t, err)
return bytes.Contains(b, []byte(column))
}, 5*time.Second, 500*time.Millisecond, column)
}
}
func TestClickHouseDsnConvert(t *testing.T) {
tests := []struct {
input string

View File

@ -133,3 +133,48 @@ func TestSqlite(t *testing.T) {
require.Equal(t, "string2", k)
require.False(t, rows4.Next())
}
func TestSqliteUpdateScheme(t *testing.T) {
dbfile := filepath.Join(t.TempDir(), "db")
defer os.Remove(dbfile)
// Use the plugin to write to the database address :=
// fmt.Sprintf("file:%v", dbfile)
address := dbfile // accepts a path or a file: URI
p := &SQL{
Driver: "sqlite",
DataSourceName: address,
Convert: defaultConvert,
TimestampColumn: "timestamp",
ConnectionMaxIdle: 2,
Log: testutil.Logger{},
TableUpdateTemplate: "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}",
}
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
defer p.Close()
require.NoError(t, p.Write(testMetrics))
// read directly from the database
db, err := gosql.Open("sqlite", address)
require.NoError(t, err)
defer db.Close()
var rows *gosql.Rows
var sql string
require.NoError(t, p.Write(postCreateMetrics))
rows, err = db.Query("select sql from sqlite_master")
require.NoError(t, err)
defer rows.Close()
require.True(t, rows.Next())
require.NoError(t, rows.Scan(&sql))
require.Equal(t,
`CREATE TABLE "metric_one"("timestamp" TIMESTAMP,"tag_one" TEXT,"tag_two" TEXT,"int64_one" INT,`+
`"int64_two" INT,"bool_one" BOOL,"bool_two" BOOL,"uint64_one" INT UNSIGNED,"float64_one" DOUBLE,`+
` "tag_add_after_create" TEXT, "bool_add_after_create" BOOL)`,
sql,
)
}