chore(outputs.sql): Adapt default templates for ClickHouse (#16464)

This commit is contained in:
AndreKR 2025-03-03 20:41:33 +01:00 committed by GitHub
parent ccf576174c
commit 9df54ae8ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 34 additions and 4 deletions

View File

@ -99,7 +99,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## {TABLE} - table name as a quoted identifier ## {TABLE} - table name as a quoted identifier
## {TABLELITERAL} - table name as a quoted string literal ## {TABLELITERAL} - table name as a quoted string literal
## {COLUMNS} - column definitions (list of quoted identifiers and types) ## {COLUMNS} - column definitions (list of quoted identifiers and types)
## {TAG_COLUMN_NAMES} - tag column definitions (list of quoted identifiers)
## {TIMESTAMP_COLUMN_NAME} - the name of the time stamp column, as configured above
# table_template = "CREATE TABLE {TABLE}({COLUMNS})" # table_template = "CREATE TABLE {TABLE}({COLUMNS})"
## NOTE: For the clickhouse driver the default is:
# table_template = "CREATE TABLE {TABLE}({COLUMNS}) ORDER BY ({TAG_COLUMN_NAMES}, {TIMESTAMP_COLUMN_NAME})"
## Table existence check template ## Table existence check template
## Available template variables: ## Available template variables:

View File

@ -18,7 +18,11 @@
## {TABLE} - table name as a quoted identifier ## {TABLE} - table name as a quoted identifier
## {TABLELITERAL} - table name as a quoted string literal ## {TABLELITERAL} - table name as a quoted string literal
## {COLUMNS} - column definitions (list of quoted identifiers and types) ## {COLUMNS} - column definitions (list of quoted identifiers and types)
## {TAG_COLUMN_NAMES} - tag column definitions (list of quoted identifiers)
## {TIMESTAMP_COLUMN_NAME} - the name of the time stamp column, as configured above
# table_template = "CREATE TABLE {TABLE}({COLUMNS})" # table_template = "CREATE TABLE {TABLE}({COLUMNS})"
## NOTE: For the clickhouse driver the default is:
# table_template = "CREATE TABLE {TABLE}({COLUMNS}) ORDER BY ({TAG_COLUMN_NAMES}, {TIMESTAMP_COLUMN_NAME})"
## Table existence check template ## Table existence check template
## Available template variables: ## Available template variables:

View File

@ -150,6 +150,7 @@ func (p *SQL) deriveDatatype(value interface{}) string {
func (p *SQL) generateCreateTable(metric telegraf.Metric) string { func (p *SQL) generateCreateTable(metric telegraf.Metric) string {
columns := make([]string, 0, len(metric.TagList())+len(metric.FieldList())+1) columns := make([]string, 0, len(metric.TagList())+len(metric.FieldList())+1)
tagColumnNames := make([]string, 0, len(metric.TagList()))
if p.TimestampColumn != "" { if p.TimestampColumn != "" {
columns = append(columns, fmt.Sprintf("%s %s", quoteIdent(p.TimestampColumn), p.Convert.Timestamp)) columns = append(columns, fmt.Sprintf("%s %s", quoteIdent(p.TimestampColumn), p.Convert.Timestamp))
@ -157,6 +158,7 @@ func (p *SQL) generateCreateTable(metric telegraf.Metric) string {
for _, tag := range metric.TagList() { for _, tag := range metric.TagList() {
columns = append(columns, fmt.Sprintf("%s %s", quoteIdent(tag.Key), p.Convert.Text)) columns = append(columns, fmt.Sprintf("%s %s", quoteIdent(tag.Key), p.Convert.Text))
tagColumnNames = append(tagColumnNames, quoteIdent(tag.Key))
} }
var datatype string var datatype string
@ -169,6 +171,8 @@ func (p *SQL) generateCreateTable(metric telegraf.Metric) string {
query = strings.ReplaceAll(query, "{TABLE}", quoteIdent(metric.Name())) query = strings.ReplaceAll(query, "{TABLE}", quoteIdent(metric.Name()))
query = strings.ReplaceAll(query, "{TABLELITERAL}", quoteStr(metric.Name())) query = strings.ReplaceAll(query, "{TABLELITERAL}", quoteStr(metric.Name()))
query = strings.ReplaceAll(query, "{COLUMNS}", strings.Join(columns, ",")) query = strings.ReplaceAll(query, "{COLUMNS}", strings.Join(columns, ","))
query = strings.ReplaceAll(query, "{TAG_COLUMN_NAMES}", strings.Join(tagColumnNames, ","))
query = strings.ReplaceAll(query, "{TIMESTAMP_COLUMN_NAME}", quoteIdent(p.TimestampColumn))
return query return query
} }
@ -271,15 +275,30 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
func (p *SQL) Init() error {
if p.TableExistsTemplate == "" {
p.TableExistsTemplate = "SELECT 1 FROM {TABLE} LIMIT 1"
}
if p.TimestampColumn == "" {
p.TimestampColumn = "timestamp"
}
if p.TableTemplate == "" {
if p.Driver == "clickhouse" {
p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS}) ORDER BY ({TAG_COLUMN_NAMES}, {TIMESTAMP_COLUMN_NAME})"
} else {
p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS})"
}
}
return nil
}
func init() { func init() {
outputs.Add("sql", func() telegraf.Output { return newSQL() }) outputs.Add("sql", func() telegraf.Output { return newSQL() })
} }
func newSQL() *SQL { func newSQL() *SQL {
return &SQL{ return &SQL{
TableTemplate: "CREATE TABLE {TABLE}({COLUMNS})",
TableExistsTemplate: "SELECT 1 FROM {TABLE} LIMIT 1",
TimestampColumn: "timestamp",
Convert: ConvertStruct{ Convert: ConvertStruct{
Integer: "INT", Integer: "INT",
Real: "DOUBLE", Real: "DOUBLE",

View File

@ -200,6 +200,7 @@ func TestMysqlIntegration(t *testing.T) {
p.Driver = "mysql" p.Driver = "mysql"
p.DataSourceName = address p.DataSourceName = address
p.InitSQL = "SET sql_mode='ANSI_QUOTES';" p.InitSQL = "SET sql_mode='ANSI_QUOTES';"
require.NoError(t, p.Init())
require.NoError(t, p.Connect()) require.NoError(t, p.Connect())
require.NoError(t, p.Write(testMetrics)) require.NoError(t, p.Write(testMetrics))
@ -282,6 +283,7 @@ func TestPostgresIntegration(t *testing.T) {
p.Convert.Real = "double precision" p.Convert.Real = "double precision"
p.Convert.Unsigned = "bigint" p.Convert.Unsigned = "bigint"
p.Convert.ConversionStyle = "literal" p.Convert.ConversionStyle = "literal"
require.NoError(t, p.Init())
require.NoError(t, p.Connect()) require.NoError(t, p.Connect())
defer p.Close() defer p.Close()
@ -366,7 +368,6 @@ func TestClickHouseIntegration(t *testing.T) {
p.Log = testutil.Logger{} p.Log = testutil.Logger{}
p.Driver = "clickhouse" p.Driver = "clickhouse"
p.DataSourceName = address p.DataSourceName = address
p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS}) ENGINE MergeTree() ORDER by timestamp"
p.Convert.Integer = "Int64" p.Convert.Integer = "Int64"
p.Convert.Text = "String" p.Convert.Text = "String"
p.Convert.Timestamp = "DateTime" p.Convert.Timestamp = "DateTime"
@ -374,6 +375,7 @@ func TestClickHouseIntegration(t *testing.T) {
p.Convert.Unsigned = "UInt64" p.Convert.Unsigned = "UInt64"
p.Convert.Bool = "UInt8" p.Convert.Bool = "UInt8"
p.Convert.ConversionStyle = "literal" p.Convert.ConversionStyle = "literal"
require.NoError(t, p.Init())
require.NoError(t, p.Connect()) require.NoError(t, p.Connect())
require.NoError(t, p.Write(testMetrics)) require.NoError(t, p.Write(testMetrics))

View File

@ -25,6 +25,7 @@ func TestSqlite(t *testing.T) {
p.Log = testutil.Logger{} p.Log = testutil.Logger{}
p.Driver = "sqlite" p.Driver = "sqlite"
p.DataSourceName = address p.DataSourceName = address
require.NoError(t, p.Init())
require.NoError(t, p.Connect()) require.NoError(t, p.Connect())
defer p.Close() defer p.Close()