diff --git a/plugins/outputs/cratedb/README.md b/plugins/outputs/cratedb/README.md index 50386fbbc..11214092d 100644 --- a/plugins/outputs/cratedb/README.md +++ b/plugins/outputs/cratedb/README.md @@ -35,4 +35,6 @@ config option, see below. table = "metrics" # If true, and the metrics table does not exist, create it automatically. table_create = true + # The character(s) to replace any '.' in an object key with + key_separator = "_" ``` diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index a28e29dc0..b56787114 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -20,11 +20,12 @@ import ( const MaxInt64 = int64(^uint64(0) >> 1) type CrateDB struct { - URL string - Timeout config.Duration - Table string - TableCreate bool `toml:"table_create"` - DB *sql.DB + URL string + Timeout config.Duration + Table string + TableCreate bool `toml:"table_create"` + KeySeparator string `toml:"key_separator"` + DB *sql.DB } var sampleConfig = ` @@ -37,6 +38,8 @@ var sampleConfig = ` table = "metrics" # If true, and the metrics table does not exist, create it automatically. table_create = true + # The character(s) to replace any '.' in an object key with + key_separator = "_" ` func (c *CrateDB) Connect() error { @@ -68,15 +71,21 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( func (c *CrateDB) Write(metrics []telegraf.Metric) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)) defer cancel() - if sql, err := insertSQL(c.Table, metrics); err != nil { - return err - } else if _, err := c.DB.ExecContext(ctx, sql); err != nil { + + generatedSQL, err := insertSQL(c.Table, c.KeySeparator, metrics) + if err != nil { return err } + + _, err = c.DB.ExecContext(ctx, generatedSQL) + if err != nil { + return err + } + return nil } -func insertSQL(table string, metrics []telegraf.Metric) (string, error) { +func insertSQL(table string, keyReplacement string, metrics []telegraf.Metric) (string, error) { rows := make([]string, len(metrics)) for i, m := range metrics { cols := []interface{}{ @@ -89,7 +98,7 @@ func insertSQL(table string, metrics []telegraf.Metric) (string, error) { escapedCols := make([]string, len(cols)) for i, col := range cols { - escaped, err := escapeValue(col) + escaped, err := escapeValue(col, keyReplacement) if err != nil { return "", err } @@ -113,7 +122,7 @@ VALUES // inputs. // // [1] https://github.com/influxdata/telegraf/pull/3210#issuecomment-339273371 -func escapeValue(val interface{}) (string, error) { +func escapeValue(val interface{}, keyReplacement string) (string, error) { switch t := val.(type) { case string: return escapeString(t, `'`), nil @@ -131,11 +140,11 @@ func escapeValue(val interface{}) (string, error) { return strconv.FormatBool(t), nil case time.Time: // see https://crate.io/docs/crate/reference/sql/data_types.html#timestamp - return escapeValue(t.Format("2006-01-02T15:04:05.999-0700")) + return escapeValue(t.Format("2006-01-02T15:04:05.999-0700"), keyReplacement) case map[string]string: - return escapeObject(convertMap(t)) + return escapeObject(convertMap(t), keyReplacement) case map[string]interface{}: - return escapeObject(t) + return escapeObject(t, keyReplacement) default: // This might be panic worthy under normal circumstances, but it's probably // better to not shut down the entire telegraf process because of one @@ -154,7 +163,7 @@ func convertMap(m map[string]string) map[string]interface{} { return c } -func escapeObject(m map[string]interface{}) (string, error) { +func escapeObject(m map[string]interface{}, keyReplacement string) (string, error) { // There is a decent chance that the implementation below doesn't catch all // edge cases, but it's hard to tell since the format seems to be a bit // underspecified. @@ -171,12 +180,15 @@ func escapeObject(m map[string]interface{}) (string, error) { // Now we build our key = val pairs pairs := make([]string, 0, len(m)) for _, k := range keys { - // escape the value of our key k (potentially recursive) - val, err := escapeValue(m[k]) + key := escapeString(strings.ReplaceAll(k, ".", keyReplacement), `"`) + + // escape the value of the value at k (potentially recursive) + val, err := escapeValue(m[k], keyReplacement) if err != nil { return "", err } - pairs = append(pairs, escapeString(k, `"`)+" = "+val) + + pairs = append(pairs, key+" = "+val) } return `{` + strings.Join(pairs, ", ") + `}`, nil } diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index 66a2bfa79..0bdfd8d3e 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -2,7 +2,6 @@ package cratedb import ( "database/sql" - "fmt" "os" "strings" "testing" @@ -49,9 +48,9 @@ func TestConnectAndWriteIntegration(t *testing.T) { // the rows using their primary keys in order to take advantage of // read-after-write consistency in CrateDB. for _, m := range metrics { - hashIDVal, err := escapeValue(hashID(m)) + hashIDVal, err := escapeValue(hashID(m), "_") require.NoError(t, err) - timestamp, err := escapeValue(m.Time()) + timestamp, err := escapeValue(m.Time(), "_") require.NoError(t, err) var id int64 @@ -85,7 +84,7 @@ VALUES } for _, test := range tests { - if got, err := insertSQL("my_table", test.Metrics); err != nil { + if got, err := insertSQL("my_table", "_", test.Metrics); err != nil { t.Error(err) } else if got != test.Want { t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) @@ -93,17 +92,13 @@ VALUES } } -func Test_escapeValueIntegration(t *testing.T) { - t.Skip("Skipping due to trust authentication failure") +type escapeValueTest struct { + Value interface{} + Want string +} - if os.Getenv("CIRCLE_PROJECT_REPONAME") != "" { - t.Skip("Skipping test on CircleCI due to docker failures") - } - - tests := []struct { - Val interface{} - Want string - }{ +func escapeValueTests() []escapeValueTest { + return []escapeValueTest{ // string {`foo`, `'foo'`}, {`foo'bar 'yeah`, `'foo''bar ''yeah'`}, @@ -122,6 +117,7 @@ func Test_escapeValueIntegration(t *testing.T) { {map[string]string(nil), `{}`}, {map[string]string{"foo": "bar"}, `{"foo" = 'bar'}`}, {map[string]string{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`}, + {map[string]string{"f.oo": "bar", "o.n.e": "more"}, `{"f_oo" = 'bar', "o_n_e" = 'more'}`}, // map[string]interface{} {map[string]interface{}{}, `{}`}, {map[string]interface{}(nil), `{}`}, @@ -130,29 +126,47 @@ func Test_escapeValueIntegration(t *testing.T) { {map[string]interface{}{"foo": map[string]interface{}{"one": "more"}}, `{"foo" = {"one" = 'more'}}`}, {map[string]interface{}{`fo"o`: `b'ar`, `ab'c`: `xy"z`, `on"""e`: `mo'''re`}, `{"ab'c" = 'xy"z', "fo""o" = 'b''ar', "on""""""e" = 'mo''''''re'}`}, } +} - url := testURL() - fmt.Println("url", url) - db, err := sql.Open("pgx", url) +func Test_escapeValueIntegration(t *testing.T) { + t.Skip("Skipping due to trust authentication failure") + + if os.Getenv("CIRCLE_PROJECT_REPONAME") != "" { + t.Skip("Skipping test on CircleCI due to docker failures") + } + + db, err := sql.Open("pgx", testURL()) require.NoError(t, err) defer db.Close() + tests := escapeValueTests() for _, test := range tests { - got, err := escapeValue(test.Val) - if err != nil { - t.Errorf("val: %#v: %s", test.Val, err) - } else if got != test.Want { - t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) - } + got, err := escapeValue(test.Value, "_") + require.NoError(t, err, "value: %#v", test.Value) // This is a smoke test that will blow up if our escaping causing a SQL - // syntax error, which may allow for an attack. + // syntax error, which may allow for an attack.= var reply interface{} row := db.QueryRow("SELECT " + got) require.NoError(t, row.Scan(&reply)) } } +func Test_escapeValue(t *testing.T) { + tests := escapeValueTests() + for _, test := range tests { + got, err := escapeValue(test.Value, "_") + require.NoError(t, err, "value: %#v", test.Value) + require.Equal(t, got, test.Want) + } +} + +func Test_circumeventingStringEscape(t *testing.T) { + value, err := escapeObject(map[string]interface{}{"a.b": "c"}, `_"`) + require.NoError(t, err) + require.Equal(t, value, `{"a_""b" = 'c'}`) +} + func Test_hashID(t *testing.T) { tests := []struct { Name string