fix: mysql: type conversion follow-up (#9966)

This commit is contained in:
Felix Edelmann 2021-11-09 23:30:42 +01:00 committed by GitHub
parent 8a3ba85419
commit f7827a0408
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 116 additions and 75 deletions

View File

@ -1,7 +1,6 @@
package mysql package mysql
import ( import (
"bytes"
"database/sql" "database/sql"
"fmt" "fmt"
"strconv" "strconv"
@ -638,7 +637,12 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
value, err := m.parseGlobalVariables(key, val) value, err := m.parseGlobalVariables(key, val)
if err != nil { if err != nil {
m.Log.Debugf("Error parsing global variable %q: %v", key, err) errString := fmt.Errorf("error parsing mysql global variable %q=%q: %v", key, string(val), err)
if m.MetricVersion < 2 {
m.Log.Debug(errString)
} else {
acc.AddError(errString)
}
} else { } else {
fields[key] = value fields[key] = value
} }
@ -658,11 +662,7 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{}, error) { func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{}, error) {
if m.MetricVersion < 2 { if m.MetricVersion < 2 {
v, ok := v1.ParseValue(value) return v1.ParseValue(value)
if ok {
return v, nil
}
return v, fmt.Errorf("could not parse value: %q", string(value))
} }
return v2.ConvertGlobalVariables(key, value) return v2.ConvertGlobalVariables(key, value)
} }
@ -693,35 +693,58 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
// scanning keys and values separately // scanning keys and values separately
// get columns names, and create an array with its length // get columns names, and create an array with its length
cols, err := rows.Columns() cols, err := rows.ColumnTypes()
if err != nil { if err != nil {
return err return err
} }
vals := make([]interface{}, len(cols)) vals := make([]sql.RawBytes, len(cols))
valPtrs := make([]interface{}, len(cols))
// fill the array with sql.Rawbytes // fill the array with sql.Rawbytes
for i := range vals { for i := range vals {
vals[i] = &sql.RawBytes{} vals[i] = sql.RawBytes{}
valPtrs[i] = &vals[i]
} }
if err = rows.Scan(vals...); err != nil { if err = rows.Scan(valPtrs...); err != nil {
return err return err
} }
// range over columns, and try to parse values // range over columns, and try to parse values
for i, col := range cols { for i, col := range cols {
colName := col.Name()
if m.MetricVersion >= 2 { if m.MetricVersion >= 2 {
col = strings.ToLower(col) colName = strings.ToLower(colName)
} }
colValue := vals[i]
if m.GatherAllSlaveChannels && if m.GatherAllSlaveChannels &&
(strings.ToLower(col) == "channel_name" || strings.ToLower(col) == "connection_name") { (strings.ToLower(colName) == "channel_name" || strings.ToLower(colName) == "connection_name") {
// Since the default channel name is empty, we need this block // Since the default channel name is empty, we need this block
channelName := "default" channelName := "default"
if len(*vals[i].(*sql.RawBytes)) > 0 { if len(colValue) > 0 {
channelName = string(*vals[i].(*sql.RawBytes)) channelName = string(colValue)
} }
tags["channel"] = channelName tags["channel"] = channelName
} else if value, ok := m.parseValue(*vals[i].(*sql.RawBytes)); ok { continue
fields["slave_"+col] = value
} }
if colValue == nil || len(colValue) == 0 {
continue
}
value, err := m.parseValueByDatabaseTypeName(colValue, col.DatabaseTypeName())
if err != nil {
errString := fmt.Errorf("error parsing mysql slave status %q=%q: %v", colName, string(colValue), err)
if m.MetricVersion < 2 {
m.Log.Debug(errString)
} else {
acc.AddError(errString)
}
continue
}
fields["slave_"+colName] = value
} }
acc.AddFields("mysql", fields, tags) acc.AddFields("mysql", fields, tags)
@ -877,7 +900,7 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
key = strings.ToLower(key) key = strings.ToLower(key)
value, err := v2.ConvertGlobalStatus(key, val) value, err := v2.ConvertGlobalStatus(key, val)
if err != nil { if err != nil {
m.Log.Debugf("Error parsing global status: %v", err) acc.AddError(fmt.Errorf("error parsing mysql global status %q=%q: %v", key, string(val), err))
} else { } else {
fields[key] = value fields[key] = value
} }
@ -1346,10 +1369,16 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu
if err := rows.Scan(&key, &val); err != nil { if err := rows.Scan(&key, &val); err != nil {
return err return err
} }
key = strings.ToLower(key) key = strings.ToLower(key)
if value, ok := m.parseValue(val); ok { value, err := m.parseValueByDatabaseTypeName(val, "BIGINT")
fields[key] = value if err != nil {
acc.AddError(fmt.Errorf("error parsing mysql InnoDB metric %q=%q: %v", key, string(val), err))
continue
} }
fields[key] = value
// Send 20 fields at a time // Send 20 fields at a time
if len(fields) >= 20 { if len(fields) >= 20 {
acc.AddFields("mysql_innodb", fields, tags) acc.AddFields("mysql_innodb", fields, tags)
@ -1914,34 +1943,22 @@ func (m *Mysql) gatherSchemaForDB(db *sql.DB, database string, servtag string, a
return nil return nil
} }
func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, bool) { func (m *Mysql) parseValueByDatabaseTypeName(value sql.RawBytes, databaseTypeName string) (interface{}, error) {
if m.MetricVersion < 2 { if m.MetricVersion < 2 {
return v1.ParseValue(value) return v1.ParseValue(value)
} }
return parseValue(value)
}
// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1 switch databaseTypeName {
func parseValue(value sql.RawBytes) (interface{}, bool) { case "INT":
if bytes.EqualFold(value, []byte("YES")) || bytes.Equal(value, []byte("ON")) { return v2.ParseInt(value)
return 1, true case "BIGINT":
return v2.ParseUint(value)
case "VARCHAR":
return v2.ParseString(value)
default:
m.Log.Debugf("unknown database type name %q in parseValueByDatabaseTypeName", databaseTypeName)
return v2.ParseValue(value)
} }
if bytes.EqualFold(value, []byte("NO")) || bytes.Equal(value, []byte("OFF")) {
return 0, true
}
if val, err := strconv.ParseInt(string(value), 10, 64); err == nil {
return val, true
}
if val, err := strconv.ParseFloat(string(value), 64); err == nil {
return val, true
}
if len(string(value)) > 0 {
return string(value), true
}
return nil, false
} }
// findThreadState can be used to find thread state by command and plain state // findThreadState can be used to find thread state by command and plain state

View File

@ -1,7 +1,6 @@
package mysql package mysql
import ( import (
"database/sql"
"fmt" "fmt"
"testing" "testing"
@ -178,31 +177,7 @@ func TestMysqlDNSAddTimeout(t *testing.T) {
} }
} }
} }
func TestParseValue(t *testing.T) {
testCases := []struct {
rawByte sql.RawBytes
output interface{}
boolValue bool
}{
{sql.RawBytes("123"), int64(123), true},
{sql.RawBytes("abc"), "abc", true},
{sql.RawBytes("10.1"), 10.1, true},
{sql.RawBytes("ON"), 1, true},
{sql.RawBytes("OFF"), 0, true},
{sql.RawBytes("NO"), 0, true},
{sql.RawBytes("YES"), 1, true},
{sql.RawBytes("No"), 0, true},
{sql.RawBytes("Yes"), 1, true},
{sql.RawBytes("-794"), int64(-794), true},
{sql.RawBytes("18446744073709552333"), float64(18446744073709552000), true},
{sql.RawBytes(""), nil, false},
}
for _, cases := range testCases {
if got, ok := parseValue(cases.rawByte); got != cases.output && ok != cases.boolValue {
t.Errorf("for %s wanted %t, got %t", string(cases.rawByte), cases.output, got)
}
}
}
func TestNewNamespace(t *testing.T) { func TestNewNamespace(t *testing.T) {
testCases := []struct { testCases := []struct {
words []string words []string

View File

@ -182,14 +182,14 @@ var Mappings = []*Mapping{
}, },
} }
func ParseValue(value sql.RawBytes) (float64, bool) { func ParseValue(value sql.RawBytes) (float64, error) {
if bytes.Equal(value, []byte("Yes")) || bytes.Equal(value, []byte("ON")) { if bytes.Equal(value, []byte("Yes")) || bytes.Equal(value, []byte("ON")) {
return 1, true return 1, nil
} }
if bytes.Equal(value, []byte("No")) || bytes.Equal(value, []byte("OFF")) { if bytes.Equal(value, []byte("No")) || bytes.Equal(value, []byte("OFF")) {
return 0, true return 0, nil
} }
n, err := strconv.ParseFloat(string(value), 64) n, err := strconv.ParseFloat(string(value), 64)
return n, err == nil return n, err
} }

View File

@ -25,6 +25,10 @@ func ParseUint(value sql.RawBytes) (interface{}, error) {
return strconv.ParseUint(string(value), 10, 64) return strconv.ParseUint(string(value), 10, 64)
} }
func ParseFloat(value sql.RawBytes) (interface{}, error) {
return strconv.ParseFloat(string(value), 64)
}
func ParseBoolAsInteger(value sql.RawBytes) (interface{}, error) { func ParseBoolAsInteger(value sql.RawBytes) (interface{}, error) {
if bytes.EqualFold(value, []byte("YES")) || bytes.EqualFold(value, []byte("ON")) { if bytes.EqualFold(value, []byte("YES")) || bytes.EqualFold(value, []byte("ON")) {
return int64(1), nil return int64(1), nil
@ -86,11 +90,15 @@ var GlobalStatusConversions = map[string]ConversionFunc{
"innodb_data_pending_fsyncs": ParseUint, "innodb_data_pending_fsyncs": ParseUint,
"ssl_ctx_verify_depth": ParseUint, "ssl_ctx_verify_depth": ParseUint,
"ssl_verify_depth": ParseUint, "ssl_verify_depth": ParseUint,
// see https://galeracluster.com/library/documentation/galera-status-variables.html
"wsrep_local_index": ParseUint,
"wsrep_local_send_queue_avg": ParseFloat,
} }
// see https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html
// see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html
var GlobalVariableConversions = map[string]ConversionFunc{ var GlobalVariableConversions = map[string]ConversionFunc{
// see https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html
// see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html
"delay_key_write": ParseString, // ON, OFF, ALL "delay_key_write": ParseString, // ON, OFF, ALL
"enforce_gtid_consistency": ParseString, // ON, OFF, WARN "enforce_gtid_consistency": ParseString, // ON, OFF, WARN
"event_scheduler": ParseString, // YES, NO, DISABLED "event_scheduler": ParseString, // YES, NO, DISABLED

View File

@ -2,6 +2,7 @@ package v2
import ( import (
"database/sql" "database/sql"
"strings"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -84,3 +85,43 @@ func TestCovertGlobalVariables(t *testing.T) {
}) })
} }
} }
func TestParseValue(t *testing.T) {
testCases := []struct {
rawByte sql.RawBytes
output interface{}
err string
}{
{sql.RawBytes("123"), int64(123), ""},
{sql.RawBytes("abc"), "abc", ""},
{sql.RawBytes("10.1"), 10.1, ""},
{sql.RawBytes("ON"), 1, ""},
{sql.RawBytes("OFF"), 0, ""},
{sql.RawBytes("NO"), 0, ""},
{sql.RawBytes("YES"), 1, ""},
{sql.RawBytes("No"), 0, ""},
{sql.RawBytes("Yes"), 1, ""},
{sql.RawBytes("-794"), int64(-794), ""},
{sql.RawBytes("2147483647"), int64(2147483647), ""}, // max int32
{sql.RawBytes("2147483648"), int64(2147483648), ""}, // too big for int32
{sql.RawBytes("9223372036854775807"), int64(9223372036854775807), ""}, // max int64
{sql.RawBytes("9223372036854775808"), uint64(9223372036854775808), ""}, // too big for int64
{sql.RawBytes("18446744073709551615"), uint64(18446744073709551615), ""}, // max uint64
{sql.RawBytes("18446744073709551616"), float64(18446744073709552000), ""}, // too big for uint64
{sql.RawBytes("18446744073709552333"), float64(18446744073709552000), ""}, // too big for uint64
{sql.RawBytes(""), nil, "unconvertible value"},
}
for _, cases := range testCases {
got, err := ParseValue(cases.rawByte)
if err != nil && cases.err == "" {
t.Errorf("for %q got unexpected error: %q", string(cases.rawByte), err.Error())
} else if err != nil && !strings.HasPrefix(err.Error(), cases.err) {
t.Errorf("for %q wanted error %q, got %q", string(cases.rawByte), cases.err, err.Error())
} else if err == nil && cases.err != "" {
t.Errorf("for %q did not get expected error: %s", string(cases.rawByte), cases.err)
} else if got != cases.output {
t.Errorf("for %q wanted %#v (%T), got %#v (%T)", string(cases.rawByte), cases.output, cases.output, got, got)
}
}
}