chore(sql): Cleanup code (#16624)

This commit is contained in:
Sven Rebhan 2025-03-14 20:47:58 +01:00 committed by GitHub
parent 5b7659383a
commit ca6193190d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 154 additions and 161 deletions

View File

@ -461,14 +461,12 @@ func (q *query) parse(acc telegraf.Accumulator, rows *dbsql.Rows, t time.Time, l
if q.fieldFilterInt.Match(name) {
v, err := internal.ToInt64(columnData[i])
if err != nil {
if err != nil {
if !errors.Is(err, internal.ErrOutOfRange) {
return 0, fmt.Errorf("converting field column %q to int failed: %w", name, err)
}
logger.Warnf("field column %q: %v", name, err)
}
}
fields[name] = v
continue
}

View File

@ -2,7 +2,6 @@ package sql
import (
"fmt"
"math/rand"
"path/filepath"
"testing"
"time"
@ -16,18 +15,6 @@ import (
"github.com/influxdata/telegraf/testutil"
)
func pwgen(n int) string {
charset := []byte("abcdedfghijklmnopqrstABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
nchars := len(charset)
buffer := make([]byte, 0, n)
for i := 0; i < n; i++ {
buffer = append(buffer, charset[rand.Intn(nchars)])
}
return string(buffer)
}
func TestMariaDBIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
@ -36,7 +23,7 @@ func TestMariaDBIntegration(t *testing.T) {
logger := testutil.Logger{}
port := "3306"
passwd := pwgen(32)
password := testutil.GetRandomString(32)
database := "foo"
// Determine the test-data mountpoint
@ -47,7 +34,7 @@ func TestMariaDBIntegration(t *testing.T) {
Image: "mariadb",
ExposedPorts: []string{port},
Env: map[string]string{
"MYSQL_ROOT_PASSWORD": passwd,
"MYSQL_ROOT_PASSWORD": password,
"MYSQL_DATABASE": database,
},
Files: map[string]string{
@ -58,8 +45,7 @@ func TestMariaDBIntegration(t *testing.T) {
wait.ForListeningPort(nat.Port(port)),
),
}
err = container.Start()
require.NoError(t, err, "failed to start container")
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Define the testset
@ -99,7 +85,7 @@ func TestMariaDBIntegration(t *testing.T) {
for _, tt := range testset {
t.Run(tt.name, func(t *testing.T) {
// Setup the plugin-under-test
dsn := fmt.Sprintf("root:%s@tcp(%s:%s)/%s", passwd, container.Address, container.Ports[port], database)
dsn := fmt.Sprintf("root:%s@tcp(%s:%s)/%s", password, container.Address, container.Ports[port], database)
secret := config.NewSecret([]byte(dsn))
plugin := &SQL{
Driver: "maria",
@ -135,7 +121,7 @@ func TestPostgreSQLIntegration(t *testing.T) {
logger := testutil.Logger{}
port := "5432"
passwd := pwgen(32)
password := testutil.GetRandomString(32)
database := "foo"
// Determine the test-data mountpoint
@ -146,7 +132,7 @@ func TestPostgreSQLIntegration(t *testing.T) {
Image: "postgres",
ExposedPorts: []string{port},
Env: map[string]string{
"POSTGRES_PASSWORD": passwd,
"POSTGRES_PASSWORD": password,
"POSTGRES_DB": database,
},
Files: map[string]string{
@ -157,8 +143,7 @@ func TestPostgreSQLIntegration(t *testing.T) {
wait.ForListeningPort(nat.Port(port)),
),
}
err = container.Start()
require.NoError(t, err, "failed to start container")
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Define the testset
@ -198,7 +183,7 @@ func TestPostgreSQLIntegration(t *testing.T) {
for _, tt := range testset {
t.Run(tt.name, func(t *testing.T) {
// Setup the plugin-under-test
dsn := fmt.Sprintf("postgres://postgres:%v@%v:%v/%v", passwd, container.Address, container.Ports[port], database)
dsn := fmt.Sprintf("postgres://postgres:%s@%s:%s/%s", password, container.Address, container.Ports[port], database)
secret := config.NewSecret([]byte(dsn))
plugin := &SQL{
Driver: "pgx",
@ -252,8 +237,7 @@ func TestClickHouseIntegration(t *testing.T) {
wait.ForLog("Saved preprocessed configuration to '/var/lib/clickhouse/preprocessed_configs/users.xml'.").WithOccurrence(2),
),
}
err = container.Start()
require.NoError(t, err, "failed to start container")
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Define the testset
@ -293,7 +277,7 @@ func TestClickHouseIntegration(t *testing.T) {
for _, tt := range testset {
t.Run(tt.name, func(t *testing.T) {
// Setup the plugin-under-test
dsn := fmt.Sprintf("tcp://%v:%v?username=%v", container.Address, container.Ports[port], user)
dsn := fmt.Sprintf("tcp://%s:%s?username=%s", container.Address, container.Ports[port], user)
secret := config.NewSecret([]byte(dsn))
plugin := &SQL{
Driver: "clickhouse",

View File

@ -84,12 +84,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Database driver
## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres),
## sqlite (SQLite3), snowflake (snowflake.com) clickhouse (ClickHouse)
# driver = ""
driver = ""
## Data source name
## The format of the data source name is different for each database driver.
## See the plugin readme for details.
# data_source_name = ""
data_source_name = ""
## Timestamp column name
# timestamp_column = "timestamp"

View File

@ -3,12 +3,12 @@
## Database driver
## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres),
## sqlite (SQLite3), snowflake (snowflake.com) clickhouse (ClickHouse)
# driver = ""
driver = ""
## Data source name
## The format of the data source name is different for each database driver.
## See the plugin readme for details.
# data_source_name = ""
data_source_name = ""
## Timestamp column name
# timestamp_column = "timestamp"

View File

@ -25,6 +25,17 @@ import (
//go:embed sample.conf
var sampleConfig string
var defaultConvert = ConvertStruct{
Integer: "INT",
Real: "DOUBLE",
Text: "TEXT",
Timestamp: "TIMESTAMP",
Defaultvalue: "TEXT",
Unsigned: "UNSIGNED",
Bool: "BOOL",
ConversionStyle: "unsigned_suffix",
}
type ConvertStruct struct {
Integer string `toml:"integer"`
Real string `toml:"real"`
@ -58,20 +69,46 @@ func (*SQL) SampleConfig() string {
return sampleConfig
}
func (p *SQL) Connect() error {
dsn := p.DataSourceName
func (p *SQL) Init() error {
// Set defaults
if p.TableExistsTemplate == "" {
p.TableExistsTemplate = "SELECT 1 FROM {TABLE} LIMIT 1"
}
if p.TimestampColumn == "" {
p.TimestampColumn = "timestamp"
}
if p.TableTemplate == "" {
if p.Driver == "clickhouse" {
dsn = convertClickHouseDsn(dsn, p.Log)
p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS}) ORDER BY ({TAG_COLUMN_NAMES}, {TIMESTAMP_COLUMN_NAME})"
} else {
p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS})"
}
}
db, err := gosql.Open(p.Driver, dsn)
if err != nil {
return err
// Check for a valid driver
switch p.Driver {
case "clickhouse":
// Convert v1-style Clickhouse DSN to v2-style
p.convertClickHouseDsn()
case "mssql", "mysql", "pgx", "snowflake", "sqlite":
// Do nothing, those are valid
default:
return fmt.Errorf("unknown driver %q", p.Driver)
}
err = db.Ping()
return nil
}
func (p *SQL) Connect() error {
db, err := gosql.Open(p.Driver, p.DataSourceName)
if err != nil {
return err
return fmt.Errorf("creating database client failed: %w", err)
}
if err := db.Ping(); err != nil {
return fmt.Errorf("pinging database failed: %w", err)
}
db.SetConnMaxIdleTime(time.Duration(p.ConnectionMaxIdleTime))
@ -80,9 +117,8 @@ func (p *SQL) Connect() error {
db.SetMaxOpenConns(p.ConnectionMaxOpen)
if p.InitSQL != "" {
_, err = db.Exec(p.InitSQL)
if err != nil {
return err
if _, err = db.Exec(p.InitSQL); err != nil {
return fmt.Errorf("initializing database failed: %w", err)
}
}
@ -275,40 +311,58 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
return nil
}
func (p *SQL) Init() error {
if p.TableExistsTemplate == "" {
p.TableExistsTemplate = "SELECT 1 FROM {TABLE} LIMIT 1"
// Convert a DSN possibly using v1 parameters to clickhouse-go v2 format
func (p *SQL) convertClickHouseDsn() {
u, err := url.Parse(p.DataSourceName)
if err != nil {
return
}
if p.TimestampColumn == "" {
p.TimestampColumn = "timestamp"
query := u.Query()
// Log warnings for parameters no longer supported in clickhouse-go v2
unsupported := []string{"tls_config", "no_delay", "write_timeout", "block_size", "check_connection_liveness"}
for _, paramName := range unsupported {
if query.Has(paramName) {
p.Log.Warnf("DSN parameter '%s' is no longer supported by clickhouse-go v2", paramName)
query.Del(paramName)
}
if p.TableTemplate == "" {
if p.Driver == "clickhouse" {
p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS}) ORDER BY ({TAG_COLUMN_NAMES}, {TIMESTAMP_COLUMN_NAME})"
} else {
p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS})"
}
if query.Get("connection_open_strategy") == "time_random" {
p.Log.Warn("DSN parameter 'connection_open_strategy' can no longer be 'time_random'")
}
// Convert the read_timeout parameter to a duration string
if d := query.Get("read_timeout"); d != "" {
if _, err := strconv.ParseFloat(d, 64); err == nil {
p.Log.Warn("Legacy DSN parameter 'read_timeout' interpreted as seconds")
query.Set("read_timeout", d+"s")
}
}
return nil
// Move database to the path
if d := query.Get("database"); d != "" {
p.Log.Warn("Legacy DSN parameter 'database' converted to new format")
query.Del("database")
u.Path = d
}
// Move alt_hosts to the host part
if altHosts := query.Get("alt_hosts"); altHosts != "" {
p.Log.Warn("Legacy DSN parameter 'alt_hosts' converted to new format")
query.Del("alt_hosts")
u.Host = u.Host + "," + altHosts
}
u.RawQuery = query.Encode()
p.DataSourceName = u.String()
}
func init() {
outputs.Add("sql", func() telegraf.Output { return newSQL() })
}
func newSQL() *SQL {
outputs.Add("sql", func() telegraf.Output {
return &SQL{
Convert: ConvertStruct{
Integer: "INT",
Real: "DOUBLE",
Text: "TEXT",
Timestamp: "TIMESTAMP",
Defaultvalue: "TEXT",
Unsigned: "UNSIGNED",
Bool: "BOOL",
ConversionStyle: "unsigned_suffix",
},
Convert: defaultConvert,
// Defaults for the connection settings (ConnectionMaxIdleTime,
// ConnectionMaxLifetime, ConnectionMaxIdle, and ConnectionMaxOpen)
// mirror the golang defaults. As of go 1.18 all of them default to 0
@ -316,53 +370,5 @@ func newSQL() *SQL {
// https://pkg.go.dev/database/sql#DB.SetMaxIdleConns
ConnectionMaxIdle: 2,
}
}
// Convert a DSN possibly using v1 parameters to clickhouse-go v2 format
func convertClickHouseDsn(dsn string, log telegraf.Logger) string {
p, err := url.Parse(dsn)
if err != nil {
return dsn
}
query := p.Query()
// Log warnings for parameters no longer supported in clickhouse-go v2
unsupported := []string{"tls_config", "no_delay", "write_timeout", "block_size", "check_connection_liveness"}
for _, paramName := range unsupported {
if query.Has(paramName) {
log.Warnf("DSN parameter '%s' is no longer supported by clickhouse-go v2", paramName)
query.Del(paramName)
}
}
if query.Get("connection_open_strategy") == "time_random" {
log.Warn("DSN parameter 'connection_open_strategy' can no longer be 'time_random'")
}
// Convert the read_timeout parameter to a duration string
if d := query.Get("read_timeout"); d != "" {
if _, err := strconv.ParseFloat(d, 64); err == nil {
log.Warn("Legacy DSN parameter 'read_timeout' interpreted as seconds")
query.Set("read_timeout", d+"s")
}
}
// Move database to the path
if d := query.Get("database"); d != "" {
log.Warn("Legacy DSN parameter 'database' converted to new format")
query.Del("database")
p.Path = d
}
// Move alt_hosts to the host part
if altHosts := query.Get("alt_hosts"); altHosts != "" {
log.Warn("Legacy DSN parameter 'alt_hosts' converted to new format")
query.Del("alt_hosts")
p.Host = p.Host + "," + altHosts
}
p.RawQuery = query.Encode()
dsn = p.String()
return dsn
})
}

View File

@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"testing"
@ -19,36 +18,6 @@ import (
"github.com/influxdata/telegraf/testutil"
)
func TestSqlQuoteIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
}
func TestSqlCreateStatementIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
}
func TestSqlInsertStatementIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
}
func pwgen(n int) string {
charset := []byte("abcdedfghijklmnopqrstABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
nchars := len(charset)
buffer := make([]byte, 0, n)
for i := 0; i < n; i++ {
buffer = append(buffer, charset[rand.Intn(nchars)])
}
return string(buffer)
}
func stableMetric(
name string,
tags []telegraf.Tag,
@ -169,7 +138,7 @@ func TestMysqlIntegration(t *testing.T) {
// var. We'll use root to insert and query test data.
const username = "root"
password := pwgen(32)
password := testutil.GetRandomString(32)
outDir := t.TempDir()
servicePort := "3306"
@ -195,11 +164,14 @@ func TestMysqlIntegration(t *testing.T) {
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
username, password, container.Address, container.Ports[servicePort], dbname,
)
p := newSQL()
p.Log = testutil.Logger{}
p.Driver = "mysql"
p.DataSourceName = address
p.InitSQL = "SET sql_mode='ANSI_QUOTES';"
p := &SQL{
Driver: "mysql",
DataSourceName: address,
Convert: defaultConvert,
InitSQL: "SET sql_mode='ANSI_QUOTES';",
ConnectionMaxIdle: 2,
Log: testutil.Logger{},
}
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
@ -249,7 +221,7 @@ func TestPostgresIntegration(t *testing.T) {
// default username for postgres is postgres
const username = "postgres"
password := pwgen(32)
password := testutil.GetRandomString(32)
outDir := t.TempDir()
servicePort := "5432"
@ -276,10 +248,13 @@ func TestPostgresIntegration(t *testing.T) {
address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
username, password, container.Address, container.Ports[servicePort], dbname,
)
p := newSQL()
p.Log = testutil.Logger{}
p.Driver = "pgx"
p.DataSourceName = address
p := &SQL{
Driver: "pgx",
DataSourceName: address,
Convert: defaultConvert,
ConnectionMaxIdle: 2,
Log: testutil.Logger{},
}
p.Convert.Real = "double precision"
p.Convert.Unsigned = "bigint"
p.Convert.ConversionStyle = "literal"
@ -335,7 +310,7 @@ func TestClickHouseIntegration(t *testing.T) {
// username for connecting to clickhouse
const username = "clickhouse"
password := pwgen(32)
password := testutil.GetRandomString(32)
outDir := t.TempDir()
servicePort := "9000"
@ -364,10 +339,13 @@ func TestClickHouseIntegration(t *testing.T) {
// host, port, username, password, dbname
address := fmt.Sprintf("tcp://%s:%s/%s?username=%s&password=%s",
container.Address, container.Ports[servicePort], dbname, username, password)
p := newSQL()
p.Log = testutil.Logger{}
p.Driver = "clickhouse"
p.DataSourceName = address
p := &SQL{
Driver: "clickhouse",
DataSourceName: address,
Convert: defaultConvert,
ConnectionMaxIdle: 2,
Log: testutil.Logger{},
}
p.Convert.Integer = "Int64"
p.Convert.Text = "String"
p.Convert.Timestamp = "DateTime"
@ -446,8 +424,13 @@ func TestClickHouseDsnConvert(t *testing.T) {
},
}
log := testutil.Logger{}
for _, test := range tests {
require.Equal(t, test.expected, convertClickHouseDsn(test.input, log))
for _, tt := range tests {
plugin := &SQL{
Driver: "clickhouse",
DataSourceName: tt.input,
Log: testutil.Logger{},
}
require.NoError(t, plugin.Init())
require.Equal(t, tt.expected, plugin.DataSourceName)
}
}

View File

@ -21,10 +21,13 @@ func TestSqlite(t *testing.T) {
// Use the plugin to write to the database address :=
// fmt.Sprintf("file:%v", dbfile)
address := dbfile // accepts a path or a file: URI
p := newSQL()
p.Log = testutil.Logger{}
p.Driver = "sqlite"
p.DataSourceName = address
p := &SQL{
Driver: "sqlite",
DataSourceName: address,
Convert: defaultConvert,
ConnectionMaxIdle: 2,
Log: testutil.Logger{},
}
require.NoError(t, p.Init())
require.NoError(t, p.Connect())

View File

@ -2,6 +2,7 @@ package testutil
import (
"fmt"
"math/rand"
"net"
"net/url"
"os"
@ -42,6 +43,24 @@ func GetLocalHost() string {
return localhost
}
// GetRandomString returns a random alphanumerical string of the given length.
// Please note, this function is different to `internal.RandomString` as it will
// not use `crypto.Rand` and will therefore not rely on the entropy-pool of the
// host which might be drained e.g. in CI pipelines. This is useful to e.g.
// create random passwords for tests where security is not a concern.
func GetRandomString(chars int) string {
charset := []byte("abcdefghijklmnopqrstABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
nchars := len(charset)
buffer := make([]byte, chars)
for i := range chars {
//nolint:gosec // Using a weak random number generator on purpose to not drain entropy
buffer[i] = charset[rand.Intn(nchars)]
}
return string(buffer)
}
// MockMetrics returns a mock []telegraf.Metric object for using in unit tests
// of telegraf output sinks.
func MockMetrics() []telegraf.Metric {