diff --git a/plugins/inputs/sql/sql.go b/plugins/inputs/sql/sql.go index 5c34ac7a2..1bcb0cb7b 100644 --- a/plugins/inputs/sql/sql.go +++ b/plugins/inputs/sql/sql.go @@ -462,12 +462,10 @@ func (q *query) parse(acc telegraf.Accumulator, rows *dbsql.Rows, t time.Time, l if q.fieldFilterInt.Match(name) { v, err := internal.ToInt64(columnData[i]) if err != nil { - if err != nil { - if !errors.Is(err, internal.ErrOutOfRange) { - return 0, fmt.Errorf("converting field column %q to int failed: %w", name, err) - } - logger.Warnf("field column %q: %v", name, err) + if !errors.Is(err, internal.ErrOutOfRange) { + return 0, fmt.Errorf("converting field column %q to int failed: %w", name, err) } + logger.Warnf("field column %q: %v", name, err) } fields[name] = v continue diff --git a/plugins/inputs/sql/sql_test.go b/plugins/inputs/sql/sql_test.go index f74d60b8a..2bca70528 100644 --- a/plugins/inputs/sql/sql_test.go +++ b/plugins/inputs/sql/sql_test.go @@ -2,7 +2,6 @@ package sql import ( "fmt" - "math/rand" "path/filepath" "testing" "time" @@ -16,18 +15,6 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func pwgen(n int) string { - charset := []byte("abcdedfghijklmnopqrstABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") - nchars := len(charset) - - buffer := make([]byte, 0, n) - for i := 0; i < n; i++ { - buffer = append(buffer, charset[rand.Intn(nchars)]) - } - - return string(buffer) -} - func TestMariaDBIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -36,7 +23,7 @@ func TestMariaDBIntegration(t *testing.T) { logger := testutil.Logger{} port := "3306" - passwd := pwgen(32) + password := testutil.GetRandomString(32) database := "foo" // Determine the test-data mountpoint @@ -47,7 +34,7 @@ func TestMariaDBIntegration(t *testing.T) { Image: "mariadb", ExposedPorts: []string{port}, Env: map[string]string{ - "MYSQL_ROOT_PASSWORD": passwd, + "MYSQL_ROOT_PASSWORD": password, "MYSQL_DATABASE": database, }, Files: map[string]string{ @@ -58,8 +45,7 @@ func TestMariaDBIntegration(t *testing.T) { wait.ForListeningPort(nat.Port(port)), ), } - err = container.Start() - require.NoError(t, err, "failed to start container") + require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() // Define the testset @@ -99,7 +85,7 @@ func TestMariaDBIntegration(t *testing.T) { for _, tt := range testset { t.Run(tt.name, func(t *testing.T) { // Setup the plugin-under-test - dsn := fmt.Sprintf("root:%s@tcp(%s:%s)/%s", passwd, container.Address, container.Ports[port], database) + dsn := fmt.Sprintf("root:%s@tcp(%s:%s)/%s", password, container.Address, container.Ports[port], database) secret := config.NewSecret([]byte(dsn)) plugin := &SQL{ Driver: "maria", @@ -135,7 +121,7 @@ func TestPostgreSQLIntegration(t *testing.T) { logger := testutil.Logger{} port := "5432" - passwd := pwgen(32) + password := testutil.GetRandomString(32) database := "foo" // Determine the test-data mountpoint @@ -146,7 +132,7 @@ func TestPostgreSQLIntegration(t *testing.T) { Image: "postgres", ExposedPorts: []string{port}, Env: map[string]string{ - "POSTGRES_PASSWORD": passwd, + "POSTGRES_PASSWORD": password, "POSTGRES_DB": database, }, Files: map[string]string{ @@ -157,8 +143,7 @@ func TestPostgreSQLIntegration(t *testing.T) { wait.ForListeningPort(nat.Port(port)), ), } - err = container.Start() - require.NoError(t, err, "failed to start container") + require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() // Define the testset @@ -198,7 +183,7 @@ func TestPostgreSQLIntegration(t *testing.T) { for _, tt := range testset { t.Run(tt.name, func(t *testing.T) { // Setup the plugin-under-test - dsn := fmt.Sprintf("postgres://postgres:%v@%v:%v/%v", passwd, container.Address, container.Ports[port], database) + dsn := fmt.Sprintf("postgres://postgres:%s@%s:%s/%s", password, container.Address, container.Ports[port], database) secret := config.NewSecret([]byte(dsn)) plugin := &SQL{ Driver: "pgx", @@ -252,8 +237,7 @@ func TestClickHouseIntegration(t *testing.T) { wait.ForLog("Saved preprocessed configuration to '/var/lib/clickhouse/preprocessed_configs/users.xml'.").WithOccurrence(2), ), } - err = container.Start() - require.NoError(t, err, "failed to start container") + require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() // Define the testset @@ -293,7 +277,7 @@ func TestClickHouseIntegration(t *testing.T) { for _, tt := range testset { t.Run(tt.name, func(t *testing.T) { // Setup the plugin-under-test - dsn := fmt.Sprintf("tcp://%v:%v?username=%v", container.Address, container.Ports[port], user) + dsn := fmt.Sprintf("tcp://%s:%s?username=%s", container.Address, container.Ports[port], user) secret := config.NewSecret([]byte(dsn)) plugin := &SQL{ Driver: "clickhouse", diff --git a/plugins/outputs/sql/README.md b/plugins/outputs/sql/README.md index f1f66f803..3a4cefe44 100644 --- a/plugins/outputs/sql/README.md +++ b/plugins/outputs/sql/README.md @@ -84,12 +84,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Database driver ## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres), ## sqlite (SQLite3), snowflake (snowflake.com) clickhouse (ClickHouse) - # driver = "" + driver = "" ## Data source name ## The format of the data source name is different for each database driver. ## See the plugin readme for details. - # data_source_name = "" + data_source_name = "" ## Timestamp column name # timestamp_column = "timestamp" diff --git a/plugins/outputs/sql/sample.conf b/plugins/outputs/sql/sample.conf index f1b7f297f..9535b861a 100644 --- a/plugins/outputs/sql/sample.conf +++ b/plugins/outputs/sql/sample.conf @@ -3,12 +3,12 @@ ## Database driver ## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres), ## sqlite (SQLite3), snowflake (snowflake.com) clickhouse (ClickHouse) - # driver = "" + driver = "" ## Data source name ## The format of the data source name is different for each database driver. ## See the plugin readme for details. - # data_source_name = "" + data_source_name = "" ## Timestamp column name # timestamp_column = "timestamp" diff --git a/plugins/outputs/sql/sql.go b/plugins/outputs/sql/sql.go index b3b1f6943..ec7e90806 100644 --- a/plugins/outputs/sql/sql.go +++ b/plugins/outputs/sql/sql.go @@ -25,6 +25,17 @@ import ( //go:embed sample.conf var sampleConfig string +var defaultConvert = ConvertStruct{ + Integer: "INT", + Real: "DOUBLE", + Text: "TEXT", + Timestamp: "TIMESTAMP", + Defaultvalue: "TEXT", + Unsigned: "UNSIGNED", + Bool: "BOOL", + ConversionStyle: "unsigned_suffix", +} + type ConvertStruct struct { Integer string `toml:"integer"` Real string `toml:"real"` @@ -58,20 +69,46 @@ func (*SQL) SampleConfig() string { return sampleConfig } +func (p *SQL) Init() error { + // Set defaults + if p.TableExistsTemplate == "" { + p.TableExistsTemplate = "SELECT 1 FROM {TABLE} LIMIT 1" + } + + if p.TimestampColumn == "" { + p.TimestampColumn = "timestamp" + } + + if p.TableTemplate == "" { + if p.Driver == "clickhouse" { + p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS}) ORDER BY ({TAG_COLUMN_NAMES}, {TIMESTAMP_COLUMN_NAME})" + } else { + p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS})" + } + } + + // Check for a valid driver + switch p.Driver { + case "clickhouse": + // Convert v1-style Clickhouse DSN to v2-style + p.convertClickHouseDsn() + case "mssql", "mysql", "pgx", "snowflake", "sqlite": + // Do nothing, those are valid + default: + return fmt.Errorf("unknown driver %q", p.Driver) + } + + return nil +} + func (p *SQL) Connect() error { - dsn := p.DataSourceName - if p.Driver == "clickhouse" { - dsn = convertClickHouseDsn(dsn, p.Log) + db, err := gosql.Open(p.Driver, p.DataSourceName) + if err != nil { + return fmt.Errorf("creating database client failed: %w", err) } - db, err := gosql.Open(p.Driver, dsn) - if err != nil { - return err - } - - err = db.Ping() - if err != nil { - return err + if err := db.Ping(); err != nil { + return fmt.Errorf("pinging database failed: %w", err) } db.SetConnMaxIdleTime(time.Duration(p.ConnectionMaxIdleTime)) @@ -80,9 +117,8 @@ func (p *SQL) Connect() error { db.SetMaxOpenConns(p.ConnectionMaxOpen) if p.InitSQL != "" { - _, err = db.Exec(p.InitSQL) - if err != nil { - return err + if _, err = db.Exec(p.InitSQL); err != nil { + return fmt.Errorf("initializing database failed: %w", err) } } @@ -275,94 +311,64 @@ func (p *SQL) Write(metrics []telegraf.Metric) error { return nil } -func (p *SQL) Init() error { - if p.TableExistsTemplate == "" { - p.TableExistsTemplate = "SELECT 1 FROM {TABLE} LIMIT 1" - } - if p.TimestampColumn == "" { - p.TimestampColumn = "timestamp" - } - if p.TableTemplate == "" { - if p.Driver == "clickhouse" { - p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS}) ORDER BY ({TAG_COLUMN_NAMES}, {TIMESTAMP_COLUMN_NAME})" - } else { - p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS})" - } - } - - return nil -} - -func init() { - outputs.Add("sql", func() telegraf.Output { return newSQL() }) -} - -func newSQL() *SQL { - return &SQL{ - Convert: ConvertStruct{ - Integer: "INT", - Real: "DOUBLE", - Text: "TEXT", - Timestamp: "TIMESTAMP", - Defaultvalue: "TEXT", - Unsigned: "UNSIGNED", - Bool: "BOOL", - ConversionStyle: "unsigned_suffix", - }, - // Defaults for the connection settings (ConnectionMaxIdleTime, - // ConnectionMaxLifetime, ConnectionMaxIdle, and ConnectionMaxOpen) - // mirror the golang defaults. As of go 1.18 all of them default to 0 - // except max idle connections which is 2. See - // https://pkg.go.dev/database/sql#DB.SetMaxIdleConns - ConnectionMaxIdle: 2, - } -} - // Convert a DSN possibly using v1 parameters to clickhouse-go v2 format -func convertClickHouseDsn(dsn string, log telegraf.Logger) string { - p, err := url.Parse(dsn) +func (p *SQL) convertClickHouseDsn() { + u, err := url.Parse(p.DataSourceName) if err != nil { - return dsn + return } - query := p.Query() + query := u.Query() // Log warnings for parameters no longer supported in clickhouse-go v2 unsupported := []string{"tls_config", "no_delay", "write_timeout", "block_size", "check_connection_liveness"} for _, paramName := range unsupported { if query.Has(paramName) { - log.Warnf("DSN parameter '%s' is no longer supported by clickhouse-go v2", paramName) + p.Log.Warnf("DSN parameter '%s' is no longer supported by clickhouse-go v2", paramName) query.Del(paramName) } } if query.Get("connection_open_strategy") == "time_random" { - log.Warn("DSN parameter 'connection_open_strategy' can no longer be 'time_random'") + p.Log.Warn("DSN parameter 'connection_open_strategy' can no longer be 'time_random'") } // Convert the read_timeout parameter to a duration string if d := query.Get("read_timeout"); d != "" { if _, err := strconv.ParseFloat(d, 64); err == nil { - log.Warn("Legacy DSN parameter 'read_timeout' interpreted as seconds") + p.Log.Warn("Legacy DSN parameter 'read_timeout' interpreted as seconds") query.Set("read_timeout", d+"s") } } // Move database to the path if d := query.Get("database"); d != "" { - log.Warn("Legacy DSN parameter 'database' converted to new format") + p.Log.Warn("Legacy DSN parameter 'database' converted to new format") query.Del("database") - p.Path = d + u.Path = d } // Move alt_hosts to the host part if altHosts := query.Get("alt_hosts"); altHosts != "" { - log.Warn("Legacy DSN parameter 'alt_hosts' converted to new format") + p.Log.Warn("Legacy DSN parameter 'alt_hosts' converted to new format") query.Del("alt_hosts") - p.Host = p.Host + "," + altHosts + u.Host = u.Host + "," + altHosts } - p.RawQuery = query.Encode() - dsn = p.String() - - return dsn + u.RawQuery = query.Encode() + p.DataSourceName = u.String() +} + +func init() { + outputs.Add("sql", func() telegraf.Output { + return &SQL{ + Convert: defaultConvert, + + // Defaults for the connection settings (ConnectionMaxIdleTime, + // ConnectionMaxLifetime, ConnectionMaxIdle, and ConnectionMaxOpen) + // mirror the golang defaults. As of go 1.18 all of them default to 0 + // except max idle connections which is 2. See + // https://pkg.go.dev/database/sql#DB.SetMaxIdleConns + ConnectionMaxIdle: 2, + } + }) } diff --git a/plugins/outputs/sql/sql_test.go b/plugins/outputs/sql/sql_test.go index 9bb67e750..62b9e55d9 100644 --- a/plugins/outputs/sql/sql_test.go +++ b/plugins/outputs/sql/sql_test.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "io" - "math/rand" "os" "path/filepath" "testing" @@ -19,36 +18,6 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func TestSqlQuoteIntegration(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } -} - -func TestSqlCreateStatementIntegration(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } -} - -func TestSqlInsertStatementIntegration(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } -} - -func pwgen(n int) string { - charset := []byte("abcdedfghijklmnopqrstABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") - - nchars := len(charset) - buffer := make([]byte, 0, n) - for i := 0; i < n; i++ { - buffer = append(buffer, charset[rand.Intn(nchars)]) - } - - return string(buffer) -} - func stableMetric( name string, tags []telegraf.Tag, @@ -169,7 +138,7 @@ func TestMysqlIntegration(t *testing.T) { // var. We'll use root to insert and query test data. const username = "root" - password := pwgen(32) + password := testutil.GetRandomString(32) outDir := t.TempDir() servicePort := "3306" @@ -195,11 +164,14 @@ func TestMysqlIntegration(t *testing.T) { address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v", username, password, container.Address, container.Ports[servicePort], dbname, ) - p := newSQL() - p.Log = testutil.Logger{} - p.Driver = "mysql" - p.DataSourceName = address - p.InitSQL = "SET sql_mode='ANSI_QUOTES';" + p := &SQL{ + Driver: "mysql", + DataSourceName: address, + Convert: defaultConvert, + InitSQL: "SET sql_mode='ANSI_QUOTES';", + ConnectionMaxIdle: 2, + Log: testutil.Logger{}, + } require.NoError(t, p.Init()) require.NoError(t, p.Connect()) @@ -249,7 +221,7 @@ func TestPostgresIntegration(t *testing.T) { // default username for postgres is postgres const username = "postgres" - password := pwgen(32) + password := testutil.GetRandomString(32) outDir := t.TempDir() servicePort := "5432" @@ -276,10 +248,13 @@ func TestPostgresIntegration(t *testing.T) { address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v", username, password, container.Address, container.Ports[servicePort], dbname, ) - p := newSQL() - p.Log = testutil.Logger{} - p.Driver = "pgx" - p.DataSourceName = address + p := &SQL{ + Driver: "pgx", + DataSourceName: address, + Convert: defaultConvert, + ConnectionMaxIdle: 2, + Log: testutil.Logger{}, + } p.Convert.Real = "double precision" p.Convert.Unsigned = "bigint" p.Convert.ConversionStyle = "literal" @@ -335,7 +310,7 @@ func TestClickHouseIntegration(t *testing.T) { // username for connecting to clickhouse const username = "clickhouse" - password := pwgen(32) + password := testutil.GetRandomString(32) outDir := t.TempDir() servicePort := "9000" @@ -364,10 +339,13 @@ func TestClickHouseIntegration(t *testing.T) { // host, port, username, password, dbname address := fmt.Sprintf("tcp://%s:%s/%s?username=%s&password=%s", container.Address, container.Ports[servicePort], dbname, username, password) - p := newSQL() - p.Log = testutil.Logger{} - p.Driver = "clickhouse" - p.DataSourceName = address + p := &SQL{ + Driver: "clickhouse", + DataSourceName: address, + Convert: defaultConvert, + ConnectionMaxIdle: 2, + Log: testutil.Logger{}, + } p.Convert.Integer = "Int64" p.Convert.Text = "String" p.Convert.Timestamp = "DateTime" @@ -446,8 +424,13 @@ func TestClickHouseDsnConvert(t *testing.T) { }, } - log := testutil.Logger{} - for _, test := range tests { - require.Equal(t, test.expected, convertClickHouseDsn(test.input, log)) + for _, tt := range tests { + plugin := &SQL{ + Driver: "clickhouse", + DataSourceName: tt.input, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.Equal(t, tt.expected, plugin.DataSourceName) } } diff --git a/plugins/outputs/sql/sqlite_test.go b/plugins/outputs/sql/sqlite_test.go index 051525ad6..4bcba28b6 100644 --- a/plugins/outputs/sql/sqlite_test.go +++ b/plugins/outputs/sql/sqlite_test.go @@ -21,10 +21,13 @@ func TestSqlite(t *testing.T) { // Use the plugin to write to the database address := // fmt.Sprintf("file:%v", dbfile) address := dbfile // accepts a path or a file: URI - p := newSQL() - p.Log = testutil.Logger{} - p.Driver = "sqlite" - p.DataSourceName = address + p := &SQL{ + Driver: "sqlite", + DataSourceName: address, + Convert: defaultConvert, + ConnectionMaxIdle: 2, + Log: testutil.Logger{}, + } require.NoError(t, p.Init()) require.NoError(t, p.Connect()) diff --git a/testutil/testutil.go b/testutil/testutil.go index 7925a17d3..6c44107b1 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -2,6 +2,7 @@ package testutil import ( "fmt" + "math/rand" "net" "net/url" "os" @@ -42,6 +43,24 @@ func GetLocalHost() string { return localhost } +// GetRandomString returns a random alphanumerical string of the given length. +// Please note, this function is different to `internal.RandomString` as it will +// not use `crypto.Rand` and will therefore not rely on the entropy-pool of the +// host which might be drained e.g. in CI pipelines. This is useful to e.g. +// create random passwords for tests where security is not a concern. +func GetRandomString(chars int) string { + charset := []byte("abcdefghijklmnopqrstABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") + + nchars := len(charset) + buffer := make([]byte, chars) + for i := range chars { + //nolint:gosec // Using a weak random number generator on purpose to not drain entropy + buffer[i] = charset[rand.Intn(nchars)] + } + + return string(buffer) +} + // MockMetrics returns a mock []telegraf.Metric object for using in unit tests // of telegraf output sinks. func MockMetrics() []telegraf.Metric {