From 49dc1270a3047873c19beeb6a5c4bdf8f0cbbba3 Mon Sep 17 00:00:00 2001 From: hautecodure <945480+hautecodure@users.noreply.github.com> Date: Tue, 1 Apr 2025 21:51:20 +0300 Subject: [PATCH] feat(outputs.sql): Add option to automate table schema updates (#16544) --- plugins/outputs/sql/README.md | 33 +++- plugins/outputs/sql/sample.conf | 9 + plugins/outputs/sql/sql.go | 108 ++++++++++- plugins/outputs/sql/sql_test.go | 293 +++++++++++++++++++++++++++++ plugins/outputs/sql/sqlite_test.go | 45 +++++ 5 files changed, 478 insertions(+), 10 deletions(-) diff --git a/plugins/outputs/sql/README.md b/plugins/outputs/sql/README.md index dfbb3cedc..3aa1a7b04 100644 --- a/plugins/outputs/sql/README.md +++ b/plugins/outputs/sql/README.md @@ -40,8 +40,8 @@ driver selected. Through the nature of the inputs plugins, the amounts of columns inserted within rows for a given metric may differ. Since the tables are created based on the tags and fields available within an input metric, it's possible the created -table won't contain all the necessary columns. You might need to initialize -the schema yourself, to avoid this scenario. +table won't contain all the necessary columns. If you wish to automate table +updates, check out the [Schema updates](#schema-updates) section for more info. ## Advanced options @@ -110,6 +110,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## {TABLE} - tablename as a quoted identifier # table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1" + ## Table update template, available template variables: + ## {TABLE} - table name as a quoted identifier + ## {COLUMN} - column definition (quoted identifier and type) + ## NOTE: Ensure the user (you're using to write to the database) has necessary permissions + ## + ## Use the following setting for automatically adding columns: + ## table_update_template = "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}" + # table_update_template = "" + ## Initialization SQL # init_sql = "" @@ -155,6 +164,26 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # # conversion_style = "unsigned_suffix" ``` +## Schema updates + +The default behavior of this plugin is to create a schema for the table, +based on the current metric (for both fields and tags). However, writing +subsequent metrics with additional fields or tags will result in errors. + +If you wish the plugin to sync the column-schema for every metric, +specify the `table_update_template` setting in your config file. + +> [!NOTE] The following snippet contains a generic query that your +> database may (or may not) support. Consult your database's +> documentation for proper syntax and table / column options. + +```toml +# Save metrics to an SQL Database +[[outputs.sql]] + ## Table update template + table_update_template = "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}" +``` + ## Driver-specific information ### go-sql-driver/mysql diff --git a/plugins/outputs/sql/sample.conf b/plugins/outputs/sql/sample.conf index 59f727afd..6aca68ddc 100644 --- a/plugins/outputs/sql/sample.conf +++ b/plugins/outputs/sql/sample.conf @@ -29,6 +29,15 @@ ## {TABLE} - tablename as a quoted identifier # table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1" + ## Table update template, available template variables: + ## {TABLE} - table name as a quoted identifier + ## {COLUMN} - column definition (quoted identifier and type) + ## NOTE: Ensure the user (you're using to write to the database) has necessary permissions + ## + ## Use the following setting for automatically adding columns: + ## table_update_template = "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}" + # table_update_template = "" + ## Initialization SQL # init_sql = "" diff --git a/plugins/outputs/sql/sql.go b/plugins/outputs/sql/sql.go index ee9a6b557..07f843bc6 100644 --- a/plugins/outputs/sql/sql.go +++ b/plugins/outputs/sql/sql.go @@ -53,6 +53,7 @@ type SQL struct { TimestampColumn string `toml:"timestamp_column"` TableTemplate string `toml:"table_template"` TableExistsTemplate string `toml:"table_exists_template"` + TableUpdateTemplate string `toml:"table_update_template"` InitSQL string `toml:"init_sql"` Convert ConvertStruct `toml:"convert"` ConnectionMaxIdleTime config.Duration `toml:"connection_max_idle_time"` @@ -61,8 +62,9 @@ type SQL struct { ConnectionMaxOpen int `toml:"connection_max_open"` Log telegraf.Logger `toml:"-"` - db *gosql.DB - tables map[string]bool + db *gosql.DB + tables map[string]map[string]bool + tableListColumnsTemplate string } func (*SQL) SampleConfig() string { @@ -83,6 +85,11 @@ func (p *SQL) Init() error { } } + p.tableListColumnsTemplate = "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME={TABLE}" + if p.Driver == "sqlite" { + p.tableListColumnsTemplate = "SELECT name AS column_name FROM pragma_table_info({TABLE})" + } + // Check for a valid driver switch p.Driver { case "clickhouse": @@ -119,7 +126,7 @@ func (p *SQL) Connect() error { } p.db = db - p.tables = make(map[string]bool) + p.tables = make(map[string]map[string]bool) return nil } @@ -209,6 +216,14 @@ func (p *SQL) generateCreateTable(metric telegraf.Metric) string { return query } +func (p *SQL) generateAddColumn(tablename, column, columnType string) string { + query := p.TableUpdateTemplate + query = strings.ReplaceAll(query, "{TABLE}", quoteIdent(tablename)) + query = strings.ReplaceAll(query, "{COLUMN}", quoteIdent(column)+" "+columnType) + + return query +} + func (p *SQL) generateInsert(tablename string, columns []string) string { placeholders := make([]string, 0, len(columns)) quotedColumns := make([]string, 0, len(columns)) @@ -233,6 +248,50 @@ func (p *SQL) generateInsert(tablename string, columns []string) string { strings.Join(placeholders, ",")) } +func (p *SQL) createTable(metric telegraf.Metric) error { + tablename := metric.Name() + stmt := p.generateCreateTable(metric) + if _, err := p.db.Exec(stmt); err != nil { + return fmt.Errorf("creating table failed: %w", err) + } + // Ensure compatibility: set the table cache to an empty map + p.tables[tablename] = make(map[string]bool) + // Modifying the table schema is opt-in + if p.TableUpdateTemplate != "" { + if err := p.updateTableCache(tablename); err != nil { + return fmt.Errorf("updating table cache failed: %w", err) + } + } + return nil +} + +func (p *SQL) createColumn(tablename, column, columnType string) error { + // Ensure table exists in cache before accessing columns + if _, tableExists := p.tables[tablename]; !tableExists { + if err := p.updateTableCache(tablename); err != nil { + return fmt.Errorf("updating table cache failed: %w", err) + } + } + // Ensure column existence check doesn't panic + if _, tableExists := p.tables[tablename]; !tableExists { + return fmt.Errorf("table %s does not exist in cache", tablename) + } + // Column already exists, nothing to do + if exists, colExists := p.tables[tablename][column]; colExists && exists { + return nil + } + // Generate and execute column addition statement + createColumn := p.generateAddColumn(tablename, column, columnType) + if _, err := p.db.Exec(createColumn); err != nil { + return fmt.Errorf("creating column failed: %w", err) + } + // Update cache after adding the column + if err := p.updateTableCache(tablename); err != nil { + return fmt.Errorf("updating table cache failed: %w", err) + } + return nil +} + func (p *SQL) tableExists(tableName string) bool { stmt := strings.ReplaceAll(p.TableExistsTemplate, "{TABLE}", quoteIdent(tableName)) @@ -240,6 +299,33 @@ func (p *SQL) tableExists(tableName string) bool { return err == nil } +func (p *SQL) updateTableCache(tablename string) error { + stmt := strings.ReplaceAll(p.tableListColumnsTemplate, "{TABLE}", quoteStr(tablename)) + + columns, err := p.db.Query(stmt) + if err != nil { + return fmt.Errorf("fetching columns for table(%s) failed: %w", tablename, err) + } + defer columns.Close() + + if p.tables[tablename] == nil { + p.tables[tablename] = make(map[string]bool) + } + + for columns.Next() { + var columnName string + if err := columns.Scan(&columnName); err != nil { + return err + } + + if !p.tables[tablename][columnName] { + p.tables[tablename][columnName] = true + } + } + + return nil +} + func (p *SQL) Write(metrics []telegraf.Metric) error { var err error @@ -247,14 +333,11 @@ func (p *SQL) Write(metrics []telegraf.Metric) error { tablename := metric.Name() // create table if needed - if !p.tables[tablename] && !p.tableExists(tablename) { - createStmt := p.generateCreateTable(metric) - _, err := p.db.Exec(createStmt) - if err != nil { + if _, found := p.tables[tablename]; !found && !p.tableExists(tablename) { + if err := p.createTable(metric); err != nil { return err } } - p.tables[tablename] = true var columns []string var values []interface{} @@ -274,6 +357,15 @@ func (p *SQL) Write(metrics []telegraf.Metric) error { values = append(values, value) } + // Modifying the table schema is opt-in + if p.TableUpdateTemplate != "" { + for i := range len(columns) { + if err := p.createColumn(tablename, columns[i], p.deriveDatatype(values[i])); err != nil { + return err + } + } + } + sql := p.generateInsert(tablename, columns) switch p.Driver { diff --git a/plugins/outputs/sql/sql_test.go b/plugins/outputs/sql/sql_test.go index 8c3b38e05..9ea2350b5 100644 --- a/plugins/outputs/sql/sql_test.go +++ b/plugins/outputs/sql/sql_test.go @@ -121,6 +121,26 @@ var ( ts, ), } + + postCreateMetrics = []telegraf.Metric{ + stableMetric( + "metric_one", + []telegraf.Tag{ + { + Key: "tag_add_after_create", + Value: "tag2", + }, + }, + []telegraf.Field{ + { + Key: "bool_add_after_create", + Value: true, + }, + }, + ts, + telegraf.Untyped, + ), + } ) func TestMysqlIntegration(t *testing.T) { @@ -208,6 +228,90 @@ func TestMysqlIntegration(t *testing.T) { } } +func TestMysqlUpdateSchemeIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + initdb, err := filepath.Abs("testdata/mariadb/initdb/script.sql") + require.NoError(t, err) + + // initdb/script.sql creates this database + const dbname = "foo" + + // The mariadb image lets you set the root password through an env + // var. We'll use root to insert and query test data. + const username = "root" + + password := testutil.GetRandomString(32) + outDir := t.TempDir() + + servicePort := "3306" + container := testutil.Container{ + Image: "mariadb", + Env: map[string]string{ + "MARIADB_ROOT_PASSWORD": password, + }, + Files: map[string]string{ + "/docker-entrypoint-initdb.d/script.sql": initdb, + "/out": outDir, + }, + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("mariadbd: ready for connections.").WithOccurrence(2), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // use the plugin to write to the database + address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v", + username, password, container.Address, container.Ports[servicePort], dbname, + ) + p := &SQL{ + Driver: "mysql", + DataSourceName: address, + Convert: defaultConvert, + InitSQL: "SET sql_mode='ANSI_QUOTES';", + TimestampColumn: "timestamp", + ConnectionMaxIdle: 2, + Log: testutil.Logger{}, + TableUpdateTemplate: "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}", + } + require.NoError(t, p.Init()) + require.NoError(t, p.Connect()) + require.NoError(t, p.Write(testMetrics)) + // Write a metric that targets the same table but has additional fields + // to test the automatic column update functionality. + require.NoError(t, p.Write(postCreateMetrics)) + + fields := []string{ + "`tag_add_after_create` text DEFAULT NULL", + "`bool_add_after_create` tinyint(1) DEFAULT NULL", + } + for _, column := range fields { + require.Eventually(t, func() bool { + rc, out, err := container.Exec([]string{ + "bash", + "-c", + "mariadb-dump --user=" + username + + " --password=" + password + + " --compact" + + " --skip-opt " + + dbname, + }) + require.NoError(t, err) + require.Equal(t, 0, rc) + + b, err := io.ReadAll(out) + require.NoError(t, err) + + return bytes.Contains(b, []byte(column)) + }, 10*time.Second, 500*time.Millisecond, column) + } +} + func TestPostgresIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -295,6 +399,100 @@ func TestPostgresIntegration(t *testing.T) { }, 5*time.Second, 500*time.Millisecond) } +func TestPostgresUpdateSchemeIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + initdb, err := filepath.Abs("testdata/postgres/initdb/init.sql") + require.NoError(t, err) + + // initdb/init.sql creates this database + const dbname = "foo" + + // default username for postgres is postgres + const username = "postgres" + + password := testutil.GetRandomString(32) + outDir := t.TempDir() + + servicePort := "5432" + container := testutil.Container{ + Image: "postgres", + Env: map[string]string{ + "POSTGRES_PASSWORD": password, + }, + Files: map[string]string{ + "/docker-entrypoint-initdb.d/script.sql": initdb, + "/out": outDir, + }, + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("database system is ready to accept connections").WithOccurrence(2), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // use the plugin to write to the database + // host, port, username, password, dbname + address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v", + username, password, container.Address, container.Ports[servicePort], dbname, + ) + p := &SQL{ + Driver: "pgx", + DataSourceName: address, + Convert: defaultConvert, + TimestampColumn: "timestamp", + ConnectionMaxIdle: 2, + Log: testutil.Logger{}, + TableUpdateTemplate: "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}", + } + p.Convert.Real = "double precision" + p.Convert.Unsigned = "bigint" + p.Convert.ConversionStyle = "literal" + require.NoError(t, p.Init()) + + require.NoError(t, p.Connect()) + defer p.Close() + require.NoError(t, p.Write(testMetrics)) + // Write a metric that targets the same table but has additional fields + // to test the automatic column update functionality. + require.NoError(t, p.Write(postCreateMetrics)) + require.NoError(t, p.Close()) + + fields := []string{ + "tag_add_after_create text", + "bool_add_after_create boolean", + } + for _, column := range fields { + require.Eventually(t, func() bool { + rc, out, err := container.Exec([]string{ + "bash", + "-c", + "pg_dump" + + " --username=" + username + + " --no-comments" + + " " + dbname + + // pg_dump's output has comments that include build info + // of postgres and pg_dump. The build info changes with + // each release. To prevent these changes from causing the + // test to fail, we strip out comments. Also strip out + // blank lines. + "|grep -E -v '(^--|^$|^SET )'", + }) + require.NoError(t, err) + require.Equal(t, 0, rc) + + b, err := io.ReadAll(out) + require.NoError(t, err) + + return bytes.Contains(b, []byte(column)) + }, 5*time.Second, 500*time.Millisecond, column) + } +} + func TestClickHouseIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -390,6 +588,101 @@ func TestClickHouseIntegration(t *testing.T) { } } +func TestClickHouseUpdateSchemeIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + logConfig, err := filepath.Abs("testdata/clickhouse/enable_stdout_log.xml") + require.NoError(t, err) + + initdb, err := filepath.Abs("testdata/clickhouse/initdb/init.sql") + require.NoError(t, err) + + // initdb/init.sql creates this database + const dbname = "foo" + + // username for connecting to clickhouse + const username = "clickhouse" + + password := testutil.GetRandomString(32) + outDir := t.TempDir() + + servicePort := "9000" + container := testutil.Container{ + Image: "clickhouse", + ExposedPorts: []string{servicePort, "8123"}, + Env: map[string]string{ + "CLICKHOUSE_USER": "clickhouse", + "CLICKHOUSE_PASSWORD": password, + }, + Files: map[string]string{ + "/docker-entrypoint-initdb.d/script.sql": initdb, + "/etc/clickhouse-server/config.d/enable_stdout_log.xml": logConfig, + "/out": outDir, + }, + WaitingFor: wait.ForAll( + wait.NewHTTPStrategy("/").WithPort(nat.Port("8123")), + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("Ready for connections"), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // use the plugin to write to the database + // host, port, username, password, dbname + address := fmt.Sprintf("tcp://%s:%s/%s?username=%s&password=%s", + container.Address, container.Ports[servicePort], dbname, username, password) + p := &SQL{ + Driver: "clickhouse", + DataSourceName: address, + Convert: defaultConvert, + TimestampColumn: "timestamp", + ConnectionMaxIdle: 2, + Log: testutil.Logger{}, + TableUpdateTemplate: "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}", + } + p.Convert.Integer = "Int64" + p.Convert.Text = "String" + p.Convert.Timestamp = "DateTime" + p.Convert.Defaultvalue = "String" + p.Convert.Unsigned = "UInt64" + p.Convert.Bool = "UInt8" + p.Convert.ConversionStyle = "literal" + require.NoError(t, p.Init()) + + require.NoError(t, p.Connect()) + require.NoError(t, p.Write(testMetrics)) + // Write a metric that targets the same table but has additional fields + // to test the automatic column update functionality. + require.NoError(t, p.Write(postCreateMetrics)) + + fields := []string{ + "`tag_add_after_create` String", + "`bool_add_after_create` UInt8", + } + for _, column := range fields { + require.Eventually(t, func() bool { + var out io.Reader + _, out, err = container.Exec([]string{ + "bash", + "-c", + "clickhouse-client" + + " --user=" + username + + " --database=" + dbname + + " --format=TabSeparatedRaw" + + " --multiquery" + + ` --query="SELECT * FROM "metric_one"; SHOW CREATE TABLE "metric_one""`, + }) + require.NoError(t, err) + b, err := io.ReadAll(out) + require.NoError(t, err) + return bytes.Contains(b, []byte(column)) + }, 5*time.Second, 500*time.Millisecond, column) + } +} + func TestClickHouseDsnConvert(t *testing.T) { tests := []struct { input string diff --git a/plugins/outputs/sql/sqlite_test.go b/plugins/outputs/sql/sqlite_test.go index 1c6d416d0..4c0c1594d 100644 --- a/plugins/outputs/sql/sqlite_test.go +++ b/plugins/outputs/sql/sqlite_test.go @@ -133,3 +133,48 @@ func TestSqlite(t *testing.T) { require.Equal(t, "string2", k) require.False(t, rows4.Next()) } + +func TestSqliteUpdateScheme(t *testing.T) { + dbfile := filepath.Join(t.TempDir(), "db") + defer os.Remove(dbfile) + + // Use the plugin to write to the database address := + // fmt.Sprintf("file:%v", dbfile) + address := dbfile // accepts a path or a file: URI + p := &SQL{ + Driver: "sqlite", + DataSourceName: address, + Convert: defaultConvert, + TimestampColumn: "timestamp", + ConnectionMaxIdle: 2, + Log: testutil.Logger{}, + TableUpdateTemplate: "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}", + } + require.NoError(t, p.Init()) + + require.NoError(t, p.Connect()) + defer p.Close() + require.NoError(t, p.Write(testMetrics)) + + // read directly from the database + db, err := gosql.Open("sqlite", address) + require.NoError(t, err) + defer db.Close() + + var rows *gosql.Rows + var sql string + + require.NoError(t, p.Write(postCreateMetrics)) + + rows, err = db.Query("select sql from sqlite_master") + require.NoError(t, err) + defer rows.Close() + require.True(t, rows.Next()) + require.NoError(t, rows.Scan(&sql)) + require.Equal(t, + `CREATE TABLE "metric_one"("timestamp" TIMESTAMP,"tag_one" TEXT,"tag_two" TEXT,"int64_one" INT,`+ + `"int64_two" INT,"bool_one" BOOL,"bool_two" BOOL,"uint64_one" INT UNSIGNED,"float64_one" DOUBLE,`+ + ` "tag_add_after_create" TEXT, "bool_add_after_create" BOOL)`, + sql, + ) +}