fix: CrateDB replace dots in tag keys with underscores (#9566)
This commit is contained in:
parent
83bd10b4db
commit
eb41218fe0
|
|
@ -35,4 +35,6 @@ config option, see below.
|
|||
table = "metrics"
|
||||
# If true, and the metrics table does not exist, create it automatically.
|
||||
table_create = true
|
||||
# The character(s) to replace any '.' in an object key with
|
||||
key_separator = "_"
|
||||
```
|
||||
|
|
|
|||
|
|
@ -20,11 +20,12 @@ import (
|
|||
const MaxInt64 = int64(^uint64(0) >> 1)
|
||||
|
||||
type CrateDB struct {
|
||||
URL string
|
||||
Timeout config.Duration
|
||||
Table string
|
||||
TableCreate bool `toml:"table_create"`
|
||||
DB *sql.DB
|
||||
URL string
|
||||
Timeout config.Duration
|
||||
Table string
|
||||
TableCreate bool `toml:"table_create"`
|
||||
KeySeparator string `toml:"key_separator"`
|
||||
DB *sql.DB
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
|
|
@ -37,6 +38,8 @@ var sampleConfig = `
|
|||
table = "metrics"
|
||||
# If true, and the metrics table does not exist, create it automatically.
|
||||
table_create = true
|
||||
# The character(s) to replace any '.' in an object key with
|
||||
key_separator = "_"
|
||||
`
|
||||
|
||||
func (c *CrateDB) Connect() error {
|
||||
|
|
@ -68,15 +71,21 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` (
|
|||
func (c *CrateDB) Write(metrics []telegraf.Metric) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout))
|
||||
defer cancel()
|
||||
if sql, err := insertSQL(c.Table, metrics); err != nil {
|
||||
return err
|
||||
} else if _, err := c.DB.ExecContext(ctx, sql); err != nil {
|
||||
|
||||
generatedSQL, err := insertSQL(c.Table, c.KeySeparator, metrics)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.DB.ExecContext(ctx, generatedSQL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func insertSQL(table string, metrics []telegraf.Metric) (string, error) {
|
||||
func insertSQL(table string, keyReplacement string, metrics []telegraf.Metric) (string, error) {
|
||||
rows := make([]string, len(metrics))
|
||||
for i, m := range metrics {
|
||||
cols := []interface{}{
|
||||
|
|
@ -89,7 +98,7 @@ func insertSQL(table string, metrics []telegraf.Metric) (string, error) {
|
|||
|
||||
escapedCols := make([]string, len(cols))
|
||||
for i, col := range cols {
|
||||
escaped, err := escapeValue(col)
|
||||
escaped, err := escapeValue(col, keyReplacement)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
@ -113,7 +122,7 @@ VALUES
|
|||
// inputs.
|
||||
//
|
||||
// [1] https://github.com/influxdata/telegraf/pull/3210#issuecomment-339273371
|
||||
func escapeValue(val interface{}) (string, error) {
|
||||
func escapeValue(val interface{}, keyReplacement string) (string, error) {
|
||||
switch t := val.(type) {
|
||||
case string:
|
||||
return escapeString(t, `'`), nil
|
||||
|
|
@ -131,11 +140,11 @@ func escapeValue(val interface{}) (string, error) {
|
|||
return strconv.FormatBool(t), nil
|
||||
case time.Time:
|
||||
// see https://crate.io/docs/crate/reference/sql/data_types.html#timestamp
|
||||
return escapeValue(t.Format("2006-01-02T15:04:05.999-0700"))
|
||||
return escapeValue(t.Format("2006-01-02T15:04:05.999-0700"), keyReplacement)
|
||||
case map[string]string:
|
||||
return escapeObject(convertMap(t))
|
||||
return escapeObject(convertMap(t), keyReplacement)
|
||||
case map[string]interface{}:
|
||||
return escapeObject(t)
|
||||
return escapeObject(t, keyReplacement)
|
||||
default:
|
||||
// This might be panic worthy under normal circumstances, but it's probably
|
||||
// better to not shut down the entire telegraf process because of one
|
||||
|
|
@ -154,7 +163,7 @@ func convertMap(m map[string]string) map[string]interface{} {
|
|||
return c
|
||||
}
|
||||
|
||||
func escapeObject(m map[string]interface{}) (string, error) {
|
||||
func escapeObject(m map[string]interface{}, keyReplacement string) (string, error) {
|
||||
// There is a decent chance that the implementation below doesn't catch all
|
||||
// edge cases, but it's hard to tell since the format seems to be a bit
|
||||
// underspecified.
|
||||
|
|
@ -171,12 +180,15 @@ func escapeObject(m map[string]interface{}) (string, error) {
|
|||
// Now we build our key = val pairs
|
||||
pairs := make([]string, 0, len(m))
|
||||
for _, k := range keys {
|
||||
// escape the value of our key k (potentially recursive)
|
||||
val, err := escapeValue(m[k])
|
||||
key := escapeString(strings.ReplaceAll(k, ".", keyReplacement), `"`)
|
||||
|
||||
// escape the value of the value at k (potentially recursive)
|
||||
val, err := escapeValue(m[k], keyReplacement)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
pairs = append(pairs, escapeString(k, `"`)+" = "+val)
|
||||
|
||||
pairs = append(pairs, key+" = "+val)
|
||||
}
|
||||
return `{` + strings.Join(pairs, ", ") + `}`, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package cratedb
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
|
@ -49,9 +48,9 @@ func TestConnectAndWriteIntegration(t *testing.T) {
|
|||
// the rows using their primary keys in order to take advantage of
|
||||
// read-after-write consistency in CrateDB.
|
||||
for _, m := range metrics {
|
||||
hashIDVal, err := escapeValue(hashID(m))
|
||||
hashIDVal, err := escapeValue(hashID(m), "_")
|
||||
require.NoError(t, err)
|
||||
timestamp, err := escapeValue(m.Time())
|
||||
timestamp, err := escapeValue(m.Time(), "_")
|
||||
require.NoError(t, err)
|
||||
|
||||
var id int64
|
||||
|
|
@ -85,7 +84,7 @@ VALUES
|
|||
}
|
||||
|
||||
for _, test := range tests {
|
||||
if got, err := insertSQL("my_table", test.Metrics); err != nil {
|
||||
if got, err := insertSQL("my_table", "_", test.Metrics); err != nil {
|
||||
t.Error(err)
|
||||
} else if got != test.Want {
|
||||
t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want)
|
||||
|
|
@ -93,17 +92,13 @@ VALUES
|
|||
}
|
||||
}
|
||||
|
||||
func Test_escapeValueIntegration(t *testing.T) {
|
||||
t.Skip("Skipping due to trust authentication failure")
|
||||
type escapeValueTest struct {
|
||||
Value interface{}
|
||||
Want string
|
||||
}
|
||||
|
||||
if os.Getenv("CIRCLE_PROJECT_REPONAME") != "" {
|
||||
t.Skip("Skipping test on CircleCI due to docker failures")
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
Val interface{}
|
||||
Want string
|
||||
}{
|
||||
func escapeValueTests() []escapeValueTest {
|
||||
return []escapeValueTest{
|
||||
// string
|
||||
{`foo`, `'foo'`},
|
||||
{`foo'bar 'yeah`, `'foo''bar ''yeah'`},
|
||||
|
|
@ -122,6 +117,7 @@ func Test_escapeValueIntegration(t *testing.T) {
|
|||
{map[string]string(nil), `{}`},
|
||||
{map[string]string{"foo": "bar"}, `{"foo" = 'bar'}`},
|
||||
{map[string]string{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`},
|
||||
{map[string]string{"f.oo": "bar", "o.n.e": "more"}, `{"f_oo" = 'bar', "o_n_e" = 'more'}`},
|
||||
// map[string]interface{}
|
||||
{map[string]interface{}{}, `{}`},
|
||||
{map[string]interface{}(nil), `{}`},
|
||||
|
|
@ -130,29 +126,47 @@ func Test_escapeValueIntegration(t *testing.T) {
|
|||
{map[string]interface{}{"foo": map[string]interface{}{"one": "more"}}, `{"foo" = {"one" = 'more'}}`},
|
||||
{map[string]interface{}{`fo"o`: `b'ar`, `ab'c`: `xy"z`, `on"""e`: `mo'''re`}, `{"ab'c" = 'xy"z', "fo""o" = 'b''ar', "on""""""e" = 'mo''''''re'}`},
|
||||
}
|
||||
}
|
||||
|
||||
url := testURL()
|
||||
fmt.Println("url", url)
|
||||
db, err := sql.Open("pgx", url)
|
||||
func Test_escapeValueIntegration(t *testing.T) {
|
||||
t.Skip("Skipping due to trust authentication failure")
|
||||
|
||||
if os.Getenv("CIRCLE_PROJECT_REPONAME") != "" {
|
||||
t.Skip("Skipping test on CircleCI due to docker failures")
|
||||
}
|
||||
|
||||
db, err := sql.Open("pgx", testURL())
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
tests := escapeValueTests()
|
||||
for _, test := range tests {
|
||||
got, err := escapeValue(test.Val)
|
||||
if err != nil {
|
||||
t.Errorf("val: %#v: %s", test.Val, err)
|
||||
} else if got != test.Want {
|
||||
t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want)
|
||||
}
|
||||
got, err := escapeValue(test.Value, "_")
|
||||
require.NoError(t, err, "value: %#v", test.Value)
|
||||
|
||||
// This is a smoke test that will blow up if our escaping causing a SQL
|
||||
// syntax error, which may allow for an attack.
|
||||
// syntax error, which may allow for an attack.=
|
||||
var reply interface{}
|
||||
row := db.QueryRow("SELECT " + got)
|
||||
require.NoError(t, row.Scan(&reply))
|
||||
}
|
||||
}
|
||||
|
||||
func Test_escapeValue(t *testing.T) {
|
||||
tests := escapeValueTests()
|
||||
for _, test := range tests {
|
||||
got, err := escapeValue(test.Value, "_")
|
||||
require.NoError(t, err, "value: %#v", test.Value)
|
||||
require.Equal(t, got, test.Want)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_circumeventingStringEscape(t *testing.T) {
|
||||
value, err := escapeObject(map[string]interface{}{"a.b": "c"}, `_"`)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, value, `{"a_""b" = 'c'}`)
|
||||
}
|
||||
|
||||
func Test_hashID(t *testing.T) {
|
||||
tests := []struct {
|
||||
Name string
|
||||
|
|
|
|||
Loading…
Reference in New Issue