feat(outputs.postgresql): Allow limiting of column name length (#16041)
This commit is contained in:
parent
0d30797c08
commit
3b705f2aa0
|
|
@ -132,6 +132,12 @@ to use them.
|
||||||
## tag IDs. Each entry consumes approximately 34 bytes of memory.
|
## tag IDs. Each entry consumes approximately 34 bytes of memory.
|
||||||
# tag_cache_size = 100000
|
# tag_cache_size = 100000
|
||||||
|
|
||||||
|
## Cut column names at the given length to not exceed PostgreSQL's
|
||||||
|
## 'identifier length' limit (default: no limit)
|
||||||
|
## (see https://www.postgresql.org/docs/current/limits.html)
|
||||||
|
## Be careful to not create duplicate column names!
|
||||||
|
# column_name_length_limit = 0
|
||||||
|
|
||||||
## Enable & set the log level for the Postgres driver.
|
## Enable & set the log level for the Postgres driver.
|
||||||
# log_level = "warn" # trace, debug, info, warn, error, none
|
# log_level = "warn" # trace, debug, info, warn, error, none
|
||||||
```
|
```
|
||||||
|
|
@ -197,6 +203,19 @@ Documentation on how to write templates can be found [sqltemplate docs][1]
|
||||||
|
|
||||||
[1]: https://pkg.go.dev/github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate
|
[1]: https://pkg.go.dev/github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate
|
||||||
|
|
||||||
|
## Long Column Names
|
||||||
|
|
||||||
|
Postgres imposes a limit on the length of column identifiers, which can be found
|
||||||
|
in the [official docs](https://www.postgresql.org/docs/current/limits.html). By
|
||||||
|
default Telegraf does not enforce this limit as this limit can be modified on
|
||||||
|
the server side. Furthermore, cutting off column names could lead to collisions
|
||||||
|
if the columns are only different after the cut-off.
|
||||||
|
|
||||||
|
> [!WARNING]
|
||||||
|
> Make sure you will not cause column name collisions when setting
|
||||||
|
> `column_name_length_limit`! If in doubt, explicitly shorten the field and tag
|
||||||
|
> names using e.g. the regexp processor.
|
||||||
|
|
||||||
### Samples
|
### Samples
|
||||||
|
|
||||||
#### TimescaleDB
|
#### TimescaleDB
|
||||||
|
|
|
||||||
|
|
@ -51,6 +51,7 @@ type Postgresql struct {
|
||||||
Uint64Type string `toml:"uint64_type"`
|
Uint64Type string `toml:"uint64_type"`
|
||||||
RetryMaxBackoff config.Duration `toml:"retry_max_backoff"`
|
RetryMaxBackoff config.Duration `toml:"retry_max_backoff"`
|
||||||
TagCacheSize int `toml:"tag_cache_size"`
|
TagCacheSize int `toml:"tag_cache_size"`
|
||||||
|
ColumnNameLenLimit int `toml:"column_name_length_limit"`
|
||||||
LogLevel string `toml:"log_level"`
|
LogLevel string `toml:"log_level"`
|
||||||
Logger telegraf.Logger `toml:"-"`
|
Logger telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -976,3 +976,239 @@ func TestStressConcurrencyIntegration(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLongColumnNamesErrorIntegration(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup the plugin
|
||||||
|
p, err := newPostgresqlTest(t)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, p.Init())
|
||||||
|
require.NoError(t, p.Connect())
|
||||||
|
|
||||||
|
// Define the metric to send
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
t.Name(),
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(0),
|
||||||
|
"value": 42,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0).UTC(),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
t.Name(),
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(1),
|
||||||
|
"value": 43,
|
||||||
|
},
|
||||||
|
time.Unix(0, 1).UTC(),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
t.Name(),
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(2),
|
||||||
|
"value": 44,
|
||||||
|
},
|
||||||
|
time.Unix(0, 2).UTC(),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
t.Name(),
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(99),
|
||||||
|
"value": 45,
|
||||||
|
},
|
||||||
|
time.Unix(0, 9).UTC(),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
require.NoError(t, p.Write(metrics))
|
||||||
|
require.NoError(t, p.Write(metrics))
|
||||||
|
|
||||||
|
// Check if the logging is restricted to once per field and all columns are
|
||||||
|
// mentioned
|
||||||
|
var longColLogErrs []string
|
||||||
|
for _, l := range p.Logger.logs {
|
||||||
|
msg := l.String()
|
||||||
|
if l.level == pgx.LogLevelError && strings.Contains(msg, "Column name too long") {
|
||||||
|
longColLogErrs = append(longColLogErrs, strings.TrimPrefix(msg, "error: Column name too long: "))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
excpectedLongColumns := []string{
|
||||||
|
`"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
|
||||||
|
`"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
|
||||||
|
}
|
||||||
|
require.ElementsMatch(t, excpectedLongColumns, longColLogErrs)
|
||||||
|
|
||||||
|
// Denote the expected data in the table
|
||||||
|
expected := []map[string]interface{}{
|
||||||
|
{"time": time.Unix(0, 0).Unix(), "value": int64(42)},
|
||||||
|
{"time": time.Unix(0, 1).Unix(), "value": int64(43)},
|
||||||
|
{"time": time.Unix(0, 2).Unix(), "value": int64(44)},
|
||||||
|
{"time": time.Unix(0, 9).Unix(), "value": int64(45)},
|
||||||
|
{"time": time.Unix(0, 0).Unix(), "value": int64(42)},
|
||||||
|
{"time": time.Unix(0, 1).Unix(), "value": int64(43)},
|
||||||
|
{"time": time.Unix(0, 2).Unix(), "value": int64(44)},
|
||||||
|
{"time": time.Unix(0, 9).Unix(), "value": int64(45)},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the actual table data nd convert the time to a timestamp for
|
||||||
|
// easier comparison
|
||||||
|
dump := dbTableDump(t, p.db, "")
|
||||||
|
require.Len(t, dump, len(expected))
|
||||||
|
for i, actual := range dump {
|
||||||
|
if raw, found := actual["time"]; found {
|
||||||
|
if t, ok := raw.(time.Time); ok {
|
||||||
|
actual["time"] = t.Unix()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.EqualValues(t, expected[i], actual)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLongColumnNamesClipIntegration(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup the plugin
|
||||||
|
p, err := newPostgresqlTest(t)
|
||||||
|
require.NoError(t, err)
|
||||||
|
p.ColumnNameLenLimit = 63
|
||||||
|
require.NoError(t, p.Init())
|
||||||
|
require.NoError(t, p.Connect())
|
||||||
|
|
||||||
|
// Define the metric to send
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
t.Name(),
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(0),
|
||||||
|
"value": 42,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0).UTC(),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
t.Name(),
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(1),
|
||||||
|
"value": 43,
|
||||||
|
},
|
||||||
|
time.Unix(0, 1).UTC(),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
t.Name(),
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(2),
|
||||||
|
"value": 44,
|
||||||
|
},
|
||||||
|
time.Unix(0, 2).UTC(),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
t.Name(),
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(99),
|
||||||
|
"value": 45,
|
||||||
|
},
|
||||||
|
time.Unix(0, 9).UTC(),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
require.NoError(t, p.Write(metrics))
|
||||||
|
require.NoError(t, p.Write(metrics))
|
||||||
|
|
||||||
|
// Check if the logging is restricted to once per field and all columns are mentioned
|
||||||
|
var longColLogWarns []string
|
||||||
|
var longColLogErrs []string
|
||||||
|
for _, l := range p.Logger.logs {
|
||||||
|
msg := l.String()
|
||||||
|
if l.level == pgx.LogLevelWarn && strings.Contains(msg, "Limiting too long column name") {
|
||||||
|
longColLogWarns = append(longColLogWarns, strings.TrimPrefix(msg, "warn: Limiting too long column name: "))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if l.level == pgx.LogLevelError && strings.Contains(msg, "Column name too long") {
|
||||||
|
longColLogErrs = append(longColLogErrs, strings.TrimPrefix(msg, "error: Column name too long: "))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
excpectedLongColumns := []string{
|
||||||
|
`"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
|
||||||
|
`"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
|
||||||
|
}
|
||||||
|
require.ElementsMatch(t, excpectedLongColumns, longColLogWarns)
|
||||||
|
require.Empty(t, longColLogErrs)
|
||||||
|
|
||||||
|
// Denote the expected data in the table
|
||||||
|
expected := []map[string]interface{}{
|
||||||
|
{
|
||||||
|
"time": time.Unix(0, 0).Unix(),
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(0),
|
||||||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||||||
|
"value": int64(42),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"time": time.Unix(0, 1).Unix(),
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(1),
|
||||||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||||||
|
"value": int64(43),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"time": time.Unix(0, 2).Unix(),
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(2),
|
||||||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||||||
|
"value": int64(44),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"time": time.Unix(0, 9).Unix(),
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": nil,
|
||||||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": int64(99),
|
||||||
|
"value": int64(45),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"time": time.Unix(0, 0).Unix(),
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(0),
|
||||||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||||||
|
"value": int64(42),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"time": time.Unix(0, 1).Unix(),
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(1),
|
||||||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||||||
|
"value": int64(43),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"time": time.Unix(0, 2).Unix(),
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(2),
|
||||||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||||||
|
"value": int64(44),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"time": time.Unix(0, 9).Unix(),
|
||||||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": nil,
|
||||||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": int64(99),
|
||||||
|
"value": int64(45),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the actual table data nd convert the time to a timestamp for
|
||||||
|
// easier comparison
|
||||||
|
dump := dbTableDump(t, p.db, "")
|
||||||
|
require.Len(t, dump, len(expected))
|
||||||
|
for i, actual := range dump {
|
||||||
|
if raw, found := actual["time"]; found {
|
||||||
|
if t, ok := raw.(time.Time); ok {
|
||||||
|
actual["time"] = t.Unix()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.EqualValues(t, expected[i], actual)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -87,5 +87,11 @@
|
||||||
## tag IDs. Each entry consumes approximately 34 bytes of memory.
|
## tag IDs. Each entry consumes approximately 34 bytes of memory.
|
||||||
# tag_cache_size = 100000
|
# tag_cache_size = 100000
|
||||||
|
|
||||||
|
## Cut column names at the given length to not exceed PostgreSQL's
|
||||||
|
## 'identifier length' limit (default: no limit)
|
||||||
|
## (see https://www.postgresql.org/docs/current/limits.html)
|
||||||
|
## Be careful to not create duplicate column names!
|
||||||
|
# column_name_length_limit = 0
|
||||||
|
|
||||||
## Enable & set the log level for the Postgres driver.
|
## Enable & set the log level for the Postgres driver.
|
||||||
# log_level = "warn" # trace, debug, info, warn, error, none
|
# log_level = "warn" # trace, debug, info, warn, error, none
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,10 @@ type TableManager struct {
|
||||||
// map[tableName]map[columnName]utils.Column
|
// map[tableName]map[columnName]utils.Column
|
||||||
tables map[string]*tableState
|
tables map[string]*tableState
|
||||||
tablesMutex sync.Mutex
|
tablesMutex sync.Mutex
|
||||||
|
|
||||||
|
// Map to track which columns are already logged
|
||||||
|
loggedLongColumnWarn map[string]bool
|
||||||
|
loggedLongColumnErr map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTableManager returns an instance of the tables.Manager interface
|
// NewTableManager returns an instance of the tables.Manager interface
|
||||||
|
|
@ -35,6 +39,8 @@ func NewTableManager(postgresql *Postgresql) *TableManager {
|
||||||
return &TableManager{
|
return &TableManager{
|
||||||
Postgresql: postgresql,
|
Postgresql: postgresql,
|
||||||
tables: make(map[string]*tableState),
|
tables: make(map[string]*tableState),
|
||||||
|
loggedLongColumnWarn: make(map[string]bool),
|
||||||
|
loggedLongColumnErr: make(map[string]bool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -178,7 +184,15 @@ func (tm *TableManager) EnsureStructure(
|
||||||
// check that the missing columns are columns that can be added
|
// check that the missing columns are columns that can be added
|
||||||
addColumns := make([]utils.Column, 0, len(missingCols))
|
addColumns := make([]utils.Column, 0, len(missingCols))
|
||||||
invalidColumns := make([]utils.Column, 0, len(missingCols))
|
invalidColumns := make([]utils.Column, 0, len(missingCols))
|
||||||
for _, col := range missingCols {
|
for i, col := range missingCols {
|
||||||
|
if tm.ColumnNameLenLimit > 0 && len(col.Name) > tm.ColumnNameLenLimit {
|
||||||
|
if !tm.loggedLongColumnWarn[col.Name] {
|
||||||
|
tm.Postgresql.Logger.Warnf("Limiting too long column name: %q", col.Name)
|
||||||
|
tm.loggedLongColumnWarn[col.Name] = true
|
||||||
|
}
|
||||||
|
col.Name = col.Name[:tm.ColumnNameLenLimit]
|
||||||
|
missingCols[i] = col
|
||||||
|
}
|
||||||
if tm.validateColumnName(col.Name) {
|
if tm.validateColumnName(col.Name) {
|
||||||
addColumns = append(addColumns, col)
|
addColumns = append(addColumns, col)
|
||||||
continue
|
continue
|
||||||
|
|
@ -187,7 +201,10 @@ func (tm *TableManager) EnsureStructure(
|
||||||
if col.Role == utils.TagColType {
|
if col.Role == utils.TagColType {
|
||||||
return nil, fmt.Errorf("column name too long: %q", col.Name)
|
return nil, fmt.Errorf("column name too long: %q", col.Name)
|
||||||
}
|
}
|
||||||
|
if !tm.loggedLongColumnErr[col.Name] {
|
||||||
tm.Postgresql.Logger.Errorf("Column name too long: %q", col.Name)
|
tm.Postgresql.Logger.Errorf("Column name too long: %q", col.Name)
|
||||||
|
tm.loggedLongColumnErr[col.Name] = true
|
||||||
|
}
|
||||||
invalidColumns = append(invalidColumns, col)
|
invalidColumns = append(invalidColumns, col)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue