feat(inputs.mysql): add secret-store support (#12591)

This commit is contained in:
Sven Rebhan 2023-02-02 00:09:58 +01:00 committed by GitHub
parent e84bc0c590
commit 177ce5eea3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 70 deletions

View File

@ -25,7 +25,7 @@ import (
var sampleConfig string
type Mysql struct {
Servers []string `toml:"servers"`
Servers []config.Secret `toml:"servers"`
PerfEventsStatementsDigestTextLimit int64 `toml:"perf_events_statements_digest_text_limit"`
PerfEventsStatementsLimit int64 `toml:"perf_events_statements_limit"`
PerfEventsStatementsTimeLimit int64 `toml:"perf_events_statements_time_limit"`
@ -79,7 +79,7 @@ func (m *Mysql) Init() error {
// Default to localhost if nothing specified.
if len(m.Servers) == 0 {
m.Servers = append(m.Servers, localhost)
m.Servers = append(m.Servers, config.NewSecret([]byte(localhost)))
}
// Register the TLS configuration. Due to the registry being a global
@ -102,7 +102,13 @@ func (m *Mysql) Init() error {
}
// Adapt the DSN string
for i, dsn := range m.Servers {
for i, server := range m.Servers {
s, err := server.Get()
if err != nil {
return fmt.Errorf("getting server %d failed", i)
}
dsn := string(s)
config.ReleaseSecret(s)
conf, err := mysql.ParseDSN(dsn)
if err != nil {
return fmt.Errorf("parsing %q failed: %w", dsn, err)
@ -118,7 +124,8 @@ func (m *Mysql) Init() error {
conf.TLSConfig = tlsid
}
m.Servers[i] = conf.FormatDSN()
server.Destroy()
m.Servers[i] = config.NewSecret([]byte(conf.FormatDSN()))
}
return nil
@ -130,7 +137,7 @@ func (m *Mysql) Gather(acc telegraf.Accumulator) error {
// Loop through each server and collect metrics
for _, server := range m.Servers {
wg.Add(1)
go func(s string) {
go func(s config.Secret) {
defer wg.Done()
acc.AddError(m.gatherServer(s, acc))
}(server)
@ -403,14 +410,22 @@ const (
`
)
func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
db, err := sql.Open("mysql", serv)
func (m *Mysql) gatherServer(server config.Secret, acc telegraf.Accumulator) error {
s, err := server.Get()
if err != nil {
return err
}
dsn := string(s)
config.ReleaseSecret(s)
servtag := getDSNTag(dsn)
db, err := sql.Open("mysql", dsn)
if err != nil {
return err
}
defer db.Close()
err = m.gatherGlobalStatuses(db, serv, acc)
err = m.gatherGlobalStatuses(db, servtag, acc)
if err != nil {
return err
}
@ -419,7 +434,7 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
// Global Variables may be gathered less often
interval := time.Duration(m.IntervalSlow)
if interval >= time.Second && time.Since(m.lastT) >= interval {
if err := m.gatherGlobalVariables(db, serv, acc); err != nil {
if err := m.gatherGlobalVariables(db, servtag, acc); err != nil {
return err
}
m.lastT = time.Now()
@ -427,98 +442,98 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
}
if m.GatherBinaryLogs {
err = m.gatherBinaryLogs(db, serv, acc)
err = m.gatherBinaryLogs(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherProcessList {
err = m.GatherProcessListStatuses(db, serv, acc)
err = m.gatherProcessListStatuses(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherUserStatistics {
err = m.GatherUserStatisticsStatuses(db, serv, acc)
err = m.gatherUserStatisticsStatuses(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherSlaveStatus {
err = m.gatherSlaveStatuses(db, serv, acc)
err = m.gatherSlaveStatuses(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherInfoSchemaAutoInc {
err = m.gatherInfoSchemaAutoIncStatuses(db, serv, acc)
err = m.gatherInfoSchemaAutoIncStatuses(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherInnoDBMetrics {
err = m.gatherInnoDBMetrics(db, serv, acc)
err = m.gatherInnoDBMetrics(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherPerfSummaryPerAccountPerEvent {
err = m.gatherPerfSummaryPerAccountPerEvent(db, serv, acc)
err = m.gatherPerfSummaryPerAccountPerEvent(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherTableIOWaits {
err = m.gatherPerfTableIOWaits(db, serv, acc)
err = m.gatherPerfTableIOWaits(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherIndexIOWaits {
err = m.gatherPerfIndexIOWaits(db, serv, acc)
err = m.gatherPerfIndexIOWaits(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherTableLockWaits {
err = m.gatherPerfTableLockWaits(db, serv, acc)
err = m.gatherPerfTableLockWaits(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherEventWaits {
err = m.gatherPerfEventWaits(db, serv, acc)
err = m.gatherPerfEventWaits(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherFileEventsStats {
err = m.gatherPerfFileEventsStatuses(db, serv, acc)
err = m.gatherPerfFileEventsStatuses(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherPerfEventsStatements {
err = m.gatherPerfEventsStatements(db, serv, acc)
err = m.gatherPerfEventsStatements(db, servtag, acc)
if err != nil {
return err
}
}
if m.GatherTableSchema {
err = m.gatherTableSchema(db, serv, acc)
err = m.gatherTableSchema(db, servtag, acc)
if err != nil {
return err
}
@ -528,7 +543,7 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
// gatherGlobalVariables can be used to fetch all global variables from
// MySQL environment.
func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherGlobalVariables(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(globalVariablesQuery)
if err != nil {
@ -540,7 +555,6 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
var val sql.RawBytes
// parse DSN and save server tag
servtag := getDSNTag(serv)
tags := map[string]string{"server": servtag}
fields := make(map[string]interface{})
for rows.Next() {
@ -591,7 +605,7 @@ func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{
// When the server is slave, then it returns only one row.
// If the multi-source replication is set, then everything works differently
// This code does not work with multi-source replication.
func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherSlaveStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
// run query
var rows *sql.Rows
var err error
@ -602,8 +616,6 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
}
defer rows.Close()
servtag := getDSNTag(serv)
tags := map[string]string{"server": servtag}
fields := make(map[string]interface{})
@ -681,7 +693,7 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
// gatherBinaryLogs can be used to collect size and count of all binary files
// binlogs metric requires the MySQL server to turn it on in configuration
func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherBinaryLogs(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(binaryLogsQuery)
if err != nil {
@ -690,7 +702,6 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat
defer rows.Close()
// parse DSN and save host as a tag
servtag := getDSNTag(serv)
tags := map[string]string{"server": servtag}
var (
size uint64
@ -733,7 +744,7 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat
// gatherGlobalStatuses can be used to get MySQL status metrics
// the mappings of actual names and names of each status to be exported
// to output is provided on mappings variable
func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherGlobalStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(globalStatusQuery)
if err != nil {
@ -742,7 +753,6 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
defer rows.Close()
// parse the DSN and save host name as a tag
servtag := getDSNTag(serv)
tags := map[string]string{"server": servtag}
fields := make(map[string]interface{})
for rows.Next() {
@ -843,7 +853,7 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
// GatherProcessList can be used to collect metrics on each running command
// and its state with its running count
func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherProcessListStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(infoSchemaProcessListQuery)
if err != nil {
@ -857,9 +867,7 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
count uint32
)
var servtag string
fields := make(map[string]interface{})
servtag = getDSNTag(serv)
// mapping of state with its counts
stateCounts := make(map[string]uint32, len(generalThreadStates))
@ -917,7 +925,7 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
// GatherUserStatisticsStatuses can be used to collect metrics on each running command
// and its state with its running count
func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherUserStatisticsStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(infoSchemaUserStatisticsQuery)
if err != nil {
@ -940,7 +948,6 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
return err
}
servtag := getDSNTag(serv)
for rows.Next() {
err = rows.Scan(read...)
if err != nil {
@ -1131,7 +1138,7 @@ func getColSlice(l int) ([]interface{}, error) {
// gatherPerfTableIOWaits can be used to get total count and time
// of I/O wait event for each table and process
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
rows, err := db.Query(perfTableIOWaitsQuery)
if err != nil {
return err
@ -1139,13 +1146,11 @@ func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Acc
defer rows.Close()
var (
objSchema, objName, servtag string
objSchema, objName string
countFetch, countInsert, countUpdate, countDelete float64
timeFetch, timeInsert, timeUpdate, timeDelete float64
)
servtag = getDSNTag(serv)
for rows.Next() {
err = rows.Scan(&objSchema, &objName,
&countFetch, &countInsert, &countUpdate, &countDelete,
@ -1180,7 +1185,7 @@ func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Acc
// gatherPerfIndexIOWaits can be used to get total count and time
// of I/O wait event for each index and process
func (m *Mysql) gatherPerfIndexIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherPerfIndexIOWaits(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
rows, err := db.Query(perfIndexIOWaitsQuery)
if err != nil {
return err
@ -1188,13 +1193,11 @@ func (m *Mysql) gatherPerfIndexIOWaits(db *sql.DB, serv string, acc telegraf.Acc
defer rows.Close()
var (
objSchema, objName, indexName, servtag string
objSchema, objName, indexName string
countFetch, countInsert, countUpdate, countDelete float64
timeFetch, timeInsert, timeUpdate, timeDelete float64
)
servtag = getDSNTag(serv)
for rows.Next() {
err = rows.Scan(&objSchema, &objName, &indexName,
&countFetch, &countInsert, &countUpdate, &countDelete,
@ -1233,7 +1236,7 @@ func (m *Mysql) gatherPerfIndexIOWaits(db *sql.DB, serv string, acc telegraf.Acc
}
// gatherInfoSchemaAutoIncStatuses can be used to get auto incremented values of the column
func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
rows, err := db.Query(infoSchemaAutoIncQuery)
if err != nil {
return err
@ -1245,8 +1248,6 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel
incValue, maxInt uint64
)
servtag := getDSNTag(serv)
for rows.Next() {
if err := rows.Scan(&schema, &table, &column, &incValue, &maxInt); err != nil {
return err
@ -1272,7 +1273,7 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel
// gatherInnoDBMetrics can be used to fetch enabled metrics from
// information_schema.INNODB_METRICS
func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
var (
query string
)
@ -1291,7 +1292,6 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu
defer rows.Close()
// parse DSN and save server tag
servtag := getDSNTag(serv)
tags := map[string]string{"server": servtag}
fields := make(map[string]interface{})
for rows.Next() {
@ -1325,7 +1325,7 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu
// gatherPerfSummaryPerAccountPerEvent can be used to fetch enabled metrics from
// performance_schema.events_statements_summary_by_account_by_event_name
func (m *Mysql) gatherPerfSummaryPerAccountPerEvent(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherPerfSummaryPerAccountPerEvent(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
sqlQuery := perfSummaryPerAccountPerEvent
var rows *sql.Rows
@ -1386,7 +1386,6 @@ func (m *Mysql) gatherPerfSummaryPerAccountPerEvent(db *sql.DB, serv string, acc
defer rows.Close()
// parse DSN and save server tag
servtag := getDSNTag(serv)
tags := map[string]string{"server": servtag}
for rows.Next() {
if err := rows.Scan(
@ -1463,7 +1462,7 @@ func (m *Mysql) gatherPerfSummaryPerAccountPerEvent(db *sql.DB, serv string, acc
// the total number and time for SQL and external lock wait events
// for each table and operation
// requires the MySQL server to be enabled to save this metric
func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
// check if table exists,
// if performance_schema is not enabled, tables do not exist
// then there is no need to scan them
@ -1482,8 +1481,6 @@ func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, serv string, acc telegraf.A
}
defer rows.Close()
servtag := getDSNTag(serv)
var (
objectSchema string
objectName string
@ -1592,7 +1589,7 @@ func (m *Mysql) gatherPerfTableLockWaits(db *sql.DB, serv string, acc telegraf.A
}
// gatherPerfEventWaits can be used to get total time and number of event waits
func (m *Mysql) gatherPerfEventWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherPerfEventWaits(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
rows, err := db.Query(perfEventWaitsQuery)
if err != nil {
return err
@ -1604,7 +1601,6 @@ func (m *Mysql) gatherPerfEventWaits(db *sql.DB, serv string, acc telegraf.Accum
starCount, timeWait float64
)
servtag := getDSNTag(serv)
tags := map[string]string{
"server": servtag,
}
@ -1624,7 +1620,7 @@ func (m *Mysql) gatherPerfEventWaits(db *sql.DB, serv string, acc telegraf.Accum
}
// gatherPerfFileEvents can be used to get stats on file events
func (m *Mysql) gatherPerfFileEventsStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherPerfFileEventsStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
rows, err := db.Query(perfFileEventsQuery)
if err != nil {
return err
@ -1639,7 +1635,6 @@ func (m *Mysql) gatherPerfFileEventsStatuses(db *sql.DB, serv string, acc telegr
sumNumBytesRead, sumNumBytesWrite float64
)
servtag := getDSNTag(serv)
tags := map[string]string{
"server": servtag,
}
@ -1682,7 +1677,7 @@ func (m *Mysql) gatherPerfFileEventsStatuses(db *sql.DB, serv string, acc telegr
}
// gatherPerfEventsStatements can be used to get attributes of each event
func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
query := fmt.Sprintf(
perfEventsStatementsQuery,
m.PerfEventsStatementsDigestTextLimit,
@ -1706,7 +1701,6 @@ func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, serv string, acc telegraf
noIndexUsed float64
)
servtag := getDSNTag(serv)
tags := map[string]string{
"server": servtag,
}
@ -1749,9 +1743,8 @@ func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, serv string, acc telegraf
}
// gatherTableSchema can be used to gather stats on each schema
func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumulator) error {
func (m *Mysql) gatherTableSchema(db *sql.DB, servtag string, acc telegraf.Accumulator) error {
var dbList []string
servtag := getDSNTag(serv)
// if the list of databases if empty, then get all databases
if len(m.TableSchemaDatabases) == 0 {

View File

@ -36,8 +36,9 @@ func TestMysqlDefaultsToLocalIntegration(t *testing.T) {
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
dsn := fmt.Sprintf("root@tcp(%s:%s)/", container.Address, container.Ports[servicePort])
m := &Mysql{
Servers: []string{fmt.Sprintf("root@tcp(%s:%s)/", container.Address, container.Ports[servicePort])},
Servers: []config.Secret{config.NewSecret([]byte(dsn))},
}
require.NoError(t, m.Init())
@ -70,9 +71,9 @@ func TestMysqlMultipleInstancesIntegration(t *testing.T) {
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
testServer := fmt.Sprintf("root@tcp(%s:%s)/?tls=false", container.Address, container.Ports[servicePort])
dsn := fmt.Sprintf("root@tcp(%s:%s)/?tls=false", container.Address, container.Ports[servicePort])
m := &Mysql{
Servers: []string{testServer},
Servers: []config.Secret{config.NewSecret([]byte(dsn))},
IntervalSlow: config.Duration(30 * time.Second),
GatherGlobalVars: true,
MetricVersion: 2,
@ -87,7 +88,7 @@ func TestMysqlMultipleInstancesIntegration(t *testing.T) {
require.True(t, acc.HasMeasurement("mysql_variables"))
m2 := &Mysql{
Servers: []string{testServer},
Servers: []config.Secret{config.NewSecret([]byte(dsn))},
MetricVersion: 2,
}
require.NoError(t, m2.Init())
@ -204,9 +205,13 @@ func TestMysqlDNSAddTimeout(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Mysql{Servers: []string{tt.input}}
m := &Mysql{
Servers: []config.Secret{config.NewSecret([]byte(tt.input))},
}
require.NoError(t, m.Init())
require.Equal(t, tt.output, m.Servers[0])
equal, err := m.Servers[0].EqualTo([]byte(tt.output))
require.NoError(t, err)
require.True(t, equal)
})
}
}
@ -314,7 +319,7 @@ func TestGatherGlobalVariables(t *testing.T) {
acc := &testutil.Accumulator{}
err = m.gatherGlobalVariables(db, "test", acc)
err := m.gatherGlobalVariables(db, getDSNTag("test"), acc)
require.NoErrorf(t, err, "err on gatherGlobalVariables (test case %q)", testCase.name)
foundFields := map[string]bool{}