fix(inputs.mysql): Use correct column-types for Percona 8 userstats (#15012)
This commit is contained in:
parent
b8936a83cb
commit
13c786bdfa
|
|
@ -6,6 +6,7 @@ import (
|
||||||
_ "embed"
|
_ "embed"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
@ -947,7 +948,7 @@ func (m *Mysql) gatherUserStatisticsStatuses(db *sql.DB, servtag string, acc tel
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
read, err := getColSlice(len(cols))
|
read, err := getColSlice(rows)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -995,7 +996,13 @@ func columnsToLower(s []string, e error) ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// getColSlice returns an in interface slice that can be used in the row.Scan().
|
// getColSlice returns an in interface slice that can be used in the row.Scan().
|
||||||
func getColSlice(l int) ([]interface{}, error) {
|
func getColSlice(rows *sql.Rows) ([]interface{}, error) {
|
||||||
|
columnTypes, err := rows.ColumnTypes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
l := len(columnTypes)
|
||||||
|
|
||||||
// list of all possible column names
|
// list of all possible column names
|
||||||
var (
|
var (
|
||||||
user string
|
user string
|
||||||
|
|
@ -1111,30 +1118,26 @@ func getColSlice(l int) ([]interface{}, error) {
|
||||||
&emptyQueries,
|
&emptyQueries,
|
||||||
}, nil
|
}, nil
|
||||||
case 22: // percona
|
case 22: // percona
|
||||||
return []interface{}{
|
cols := make([]interface{}, 0, 22)
|
||||||
&user,
|
for i, ct := range columnTypes {
|
||||||
&totalConnections,
|
// The first column is the user and has to be a string
|
||||||
&concurrentConnections,
|
if i == 0 {
|
||||||
&connectedTime,
|
cols = append(cols, new(string))
|
||||||
&busyTime,
|
continue
|
||||||
&cpuTime,
|
}
|
||||||
&bytesReceived,
|
|
||||||
&bytesSent,
|
// Percona 8 has some special fields that are float instead of ints
|
||||||
&binlogBytesWritten,
|
// see: https://github.com/influxdata/telegraf/issues/7360
|
||||||
&rowsFetched,
|
switch ct.ScanType().Kind() {
|
||||||
&rowsUpdated,
|
case reflect.Float32, reflect.Float64:
|
||||||
&tableRowsRead,
|
cols = append(cols, new(float64))
|
||||||
&selectCommands,
|
default:
|
||||||
&updateCommands,
|
// Keep old type for backward compatibility
|
||||||
&otherCommands,
|
cols = append(cols, new(int64))
|
||||||
&commitTransactions,
|
}
|
||||||
&rollbackTransactions,
|
}
|
||||||
&deniedConnections,
|
|
||||||
&lostConnections,
|
return cols, nil
|
||||||
&accessDenied,
|
|
||||||
&emptyQueries,
|
|
||||||
&totalSslConnections,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("not Supported - %d columns", l)
|
return nil, fmt.Errorf("not Supported - %d columns", l)
|
||||||
|
|
|
||||||
|
|
@ -104,6 +104,43 @@ func TestMysqlMultipleInstancesIntegration(t *testing.T) {
|
||||||
require.False(t, acc2.HasMeasurement("mysql_variables"))
|
require.False(t, acc2.HasMeasurement("mysql_variables"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPercona8Integration(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
container := testutil.Container{
|
||||||
|
Image: "percona:8",
|
||||||
|
Env: map[string]string{
|
||||||
|
"MYSQL_ROOT_PASSWORD": "secret",
|
||||||
|
},
|
||||||
|
Cmd: []string{"--userstat=ON"},
|
||||||
|
ExposedPorts: []string{servicePort},
|
||||||
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.ForLog("/usr/sbin/mysqld: ready for connections").WithOccurrence(2),
|
||||||
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
require.NoError(t, container.Start(), "failed to start container")
|
||||||
|
defer container.Terminate()
|
||||||
|
|
||||||
|
dsn := fmt.Sprintf("root:secret@tcp(%s:%s)/", container.Address, container.Ports[servicePort])
|
||||||
|
s := config.NewSecret([]byte(dsn))
|
||||||
|
plugin := &Mysql{
|
||||||
|
Servers: []*config.Secret{&s},
|
||||||
|
GatherUserStatistics: true,
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, plugin.Gather(&acc))
|
||||||
|
require.Empty(t, acc.Errors)
|
||||||
|
require.True(t, acc.HasMeasurement("mysql_user_stats"))
|
||||||
|
require.True(t, acc.HasFloatField("mysql_user_stats", "connected_time"))
|
||||||
|
require.True(t, acc.HasFloatField("mysql_user_stats", "cpu_time"))
|
||||||
|
require.True(t, acc.HasFloatField("mysql_user_stats", "busy_time"))
|
||||||
|
}
|
||||||
|
|
||||||
func TestMysqlGetDSNTag(t *testing.T) {
|
func TestMysqlGetDSNTag(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
input string
|
input string
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue