feat(outputs.sql): Support secret for dsn (#16678)
This commit is contained in:
parent
be9e5bfbb3
commit
799e194700
|
|
@ -76,6 +76,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
|
|
||||||
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
||||||
|
|
||||||
|
## Secret-store support
|
||||||
|
|
||||||
|
This plugin supports secrets from secret-stores for the `data_source_name`
|
||||||
|
option. See the [secret-store documentation][SECRETSTORE] for more details on
|
||||||
|
how to use them.
|
||||||
|
|
||||||
|
[SECRETSTORE]: ../../../docs/CONFIGURATION.md#secret-store-secrets
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
```toml @sample.conf
|
```toml @sample.conf
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ type ConvertStruct struct {
|
||||||
|
|
||||||
type SQL struct {
|
type SQL struct {
|
||||||
Driver string `toml:"driver"`
|
Driver string `toml:"driver"`
|
||||||
DataSourceName string `toml:"data_source_name"`
|
DataSourceName config.Secret `toml:"data_source_name"`
|
||||||
TimestampColumn string `toml:"timestamp_column"`
|
TimestampColumn string `toml:"timestamp_column"`
|
||||||
TableTemplate string `toml:"table_template"`
|
TableTemplate string `toml:"table_template"`
|
||||||
TableExistsTemplate string `toml:"table_exists_template"`
|
TableExistsTemplate string `toml:"table_exists_template"`
|
||||||
|
|
@ -105,7 +105,14 @@ func (p *SQL) Init() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SQL) Connect() error {
|
func (p *SQL) Connect() error {
|
||||||
db, err := gosql.Open(p.Driver, p.DataSourceName)
|
dsnBuffer, err := p.DataSourceName.Get()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("loading data source name secret failed: %w", err)
|
||||||
|
}
|
||||||
|
dsn := dsnBuffer.String()
|
||||||
|
dsnBuffer.Destroy()
|
||||||
|
|
||||||
|
db, err := gosql.Open(p.Driver, dsn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("creating database client failed: %w", err)
|
return fmt.Errorf("creating database client failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -401,7 +408,15 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
// Convert a DSN possibly using v1 parameters to clickhouse-go v2 format
|
// Convert a DSN possibly using v1 parameters to clickhouse-go v2 format
|
||||||
func (p *SQL) convertClickHouseDsn() {
|
func (p *SQL) convertClickHouseDsn() {
|
||||||
u, err := url.Parse(p.DataSourceName)
|
dsnBuffer, err := p.DataSourceName.Get()
|
||||||
|
if err != nil {
|
||||||
|
p.Log.Errorf("loading data source name failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dsn := dsnBuffer.String()
|
||||||
|
dsnBuffer.Destroy()
|
||||||
|
|
||||||
|
u, err := url.Parse(dsn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -443,7 +458,9 @@ func (p *SQL) convertClickHouseDsn() {
|
||||||
}
|
}
|
||||||
|
|
||||||
u.RawQuery = query.Encode()
|
u.RawQuery = query.Encode()
|
||||||
p.DataSourceName = u.String()
|
if err := p.DataSourceName.Set([]byte(u.String())); err != nil {
|
||||||
|
p.Log.Errorf("updating data source name to click house dsn failed: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
@ -181,9 +182,9 @@ func TestMysqlIntegration(t *testing.T) {
|
||||||
defer container.Terminate()
|
defer container.Terminate()
|
||||||
|
|
||||||
// use the plugin to write to the database
|
// use the plugin to write to the database
|
||||||
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
|
address := config.NewSecret([]byte(fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
|
||||||
username, password, container.Address, container.Ports[servicePort], dbname,
|
username, password, container.Address, container.Ports[servicePort], dbname,
|
||||||
)
|
)))
|
||||||
p := &SQL{
|
p := &SQL{
|
||||||
Driver: "mysql",
|
Driver: "mysql",
|
||||||
DataSourceName: address,
|
DataSourceName: address,
|
||||||
|
|
@ -266,9 +267,9 @@ func TestMysqlUpdateSchemeIntegration(t *testing.T) {
|
||||||
defer container.Terminate()
|
defer container.Terminate()
|
||||||
|
|
||||||
// use the plugin to write to the database
|
// use the plugin to write to the database
|
||||||
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
|
address := config.NewSecret([]byte(fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
|
||||||
username, password, container.Address, container.Ports[servicePort], dbname,
|
username, password, container.Address, container.Ports[servicePort], dbname,
|
||||||
)
|
)))
|
||||||
p := &SQL{
|
p := &SQL{
|
||||||
Driver: "mysql",
|
Driver: "mysql",
|
||||||
DataSourceName: address,
|
DataSourceName: address,
|
||||||
|
|
@ -350,9 +351,9 @@ func TestPostgresIntegration(t *testing.T) {
|
||||||
|
|
||||||
// use the plugin to write to the database
|
// use the plugin to write to the database
|
||||||
// host, port, username, password, dbname
|
// host, port, username, password, dbname
|
||||||
address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
|
address := config.NewSecret([]byte(fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
|
||||||
username, password, container.Address, container.Ports[servicePort], dbname,
|
username, password, container.Address, container.Ports[servicePort], dbname,
|
||||||
)
|
)))
|
||||||
p := &SQL{
|
p := &SQL{
|
||||||
Driver: "pgx",
|
Driver: "pgx",
|
||||||
DataSourceName: address,
|
DataSourceName: address,
|
||||||
|
|
@ -437,9 +438,9 @@ func TestPostgresUpdateSchemeIntegration(t *testing.T) {
|
||||||
|
|
||||||
// use the plugin to write to the database
|
// use the plugin to write to the database
|
||||||
// host, port, username, password, dbname
|
// host, port, username, password, dbname
|
||||||
address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
|
address := config.NewSecret([]byte(fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
|
||||||
username, password, container.Address, container.Ports[servicePort], dbname,
|
username, password, container.Address, container.Ports[servicePort], dbname,
|
||||||
)
|
)))
|
||||||
p := &SQL{
|
p := &SQL{
|
||||||
Driver: "pgx",
|
Driver: "pgx",
|
||||||
DataSourceName: address,
|
DataSourceName: address,
|
||||||
|
|
@ -537,8 +538,8 @@ func TestClickHouseIntegration(t *testing.T) {
|
||||||
|
|
||||||
// use the plugin to write to the database
|
// use the plugin to write to the database
|
||||||
// host, port, username, password, dbname
|
// host, port, username, password, dbname
|
||||||
address := fmt.Sprintf("tcp://%s:%s/%s?username=%s&password=%s",
|
address := config.NewSecret([]byte(fmt.Sprintf("tcp://%s:%s/%s?username=%s&password=%s",
|
||||||
container.Address, container.Ports[servicePort], dbname, username, password)
|
container.Address, container.Ports[servicePort], dbname, username, password)))
|
||||||
p := &SQL{
|
p := &SQL{
|
||||||
Driver: "clickhouse",
|
Driver: "clickhouse",
|
||||||
DataSourceName: address,
|
DataSourceName: address,
|
||||||
|
|
@ -632,8 +633,8 @@ func TestClickHouseUpdateSchemeIntegration(t *testing.T) {
|
||||||
|
|
||||||
// use the plugin to write to the database
|
// use the plugin to write to the database
|
||||||
// host, port, username, password, dbname
|
// host, port, username, password, dbname
|
||||||
address := fmt.Sprintf("tcp://%s:%s/%s?username=%s&password=%s",
|
address := config.NewSecret([]byte(fmt.Sprintf("tcp://%s:%s/%s?username=%s&password=%s",
|
||||||
container.Address, container.Ports[servicePort], dbname, username, password)
|
container.Address, container.Ports[servicePort], dbname, username, password)))
|
||||||
p := &SQL{
|
p := &SQL{
|
||||||
Driver: "clickhouse",
|
Driver: "clickhouse",
|
||||||
DataSourceName: address,
|
DataSourceName: address,
|
||||||
|
|
@ -723,11 +724,15 @@ func TestClickHouseDsnConvert(t *testing.T) {
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
plugin := &SQL{
|
plugin := &SQL{
|
||||||
Driver: "clickhouse",
|
Driver: "clickhouse",
|
||||||
DataSourceName: tt.input,
|
DataSourceName: config.NewSecret([]byte(tt.input)),
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
require.NoError(t, plugin.Init())
|
require.NoError(t, plugin.Init())
|
||||||
require.Equal(t, tt.expected, plugin.DataSourceName)
|
resolvedSecret, err := plugin.DataSourceName.Get()
|
||||||
|
require.NoError(t, err)
|
||||||
|
resolvedDsn := resolvedSecret.String()
|
||||||
|
resolvedSecret.Destroy()
|
||||||
|
require.Equal(t, tt.expected, resolvedDsn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -769,9 +774,9 @@ func TestMysqlEmptyTimestampColumnIntegration(t *testing.T) {
|
||||||
defer container.Terminate()
|
defer container.Terminate()
|
||||||
|
|
||||||
// use the plugin to write to the database
|
// use the plugin to write to the database
|
||||||
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
|
address := config.NewSecret([]byte(fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
|
||||||
username, password, container.Address, container.Ports[servicePort], dbname,
|
username, password, container.Address, container.Ports[servicePort], dbname,
|
||||||
)
|
)))
|
||||||
p := &SQL{
|
p := &SQL{
|
||||||
Driver: "mysql",
|
Driver: "mysql",
|
||||||
DataSourceName: address,
|
DataSourceName: address,
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -23,7 +24,7 @@ func TestSqlite(t *testing.T) {
|
||||||
address := dbfile // accepts a path or a file: URI
|
address := dbfile // accepts a path or a file: URI
|
||||||
p := &SQL{
|
p := &SQL{
|
||||||
Driver: "sqlite",
|
Driver: "sqlite",
|
||||||
DataSourceName: address,
|
DataSourceName: config.NewSecret([]byte(address)),
|
||||||
Convert: defaultConvert,
|
Convert: defaultConvert,
|
||||||
TimestampColumn: "timestamp",
|
TimestampColumn: "timestamp",
|
||||||
ConnectionMaxIdle: 2,
|
ConnectionMaxIdle: 2,
|
||||||
|
|
@ -140,7 +141,7 @@ func TestSqliteUpdateScheme(t *testing.T) {
|
||||||
|
|
||||||
// Use the plugin to write to the database address :=
|
// Use the plugin to write to the database address :=
|
||||||
// fmt.Sprintf("file:%v", dbfile)
|
// fmt.Sprintf("file:%v", dbfile)
|
||||||
address := dbfile // accepts a path or a file: URI
|
address := config.NewSecret([]byte(dbfile)) // accepts a path or a file: URI
|
||||||
p := &SQL{
|
p := &SQL{
|
||||||
Driver: "sqlite",
|
Driver: "sqlite",
|
||||||
DataSourceName: address,
|
DataSourceName: address,
|
||||||
|
|
@ -157,7 +158,7 @@ func TestSqliteUpdateScheme(t *testing.T) {
|
||||||
require.NoError(t, p.Write(testMetrics))
|
require.NoError(t, p.Write(testMetrics))
|
||||||
|
|
||||||
// read directly from the database
|
// read directly from the database
|
||||||
db, err := gosql.Open("sqlite", address)
|
db, err := gosql.Open("sqlite", dbfile)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue