From 3b705f2aa051ff105e32174f6148cf7b4fbff1f1 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 13 Nov 2024 08:29:06 +0100 Subject: [PATCH] feat(outputs.postgresql): Allow limiting of column name length (#16041) --- plugins/outputs/postgresql/README.md | 19 ++ plugins/outputs/postgresql/postgresql.go | 1 + plugins/outputs/postgresql/postgresql_test.go | 236 ++++++++++++++++++ plugins/outputs/postgresql/sample.conf | 6 + plugins/outputs/postgresql/table_manager.go | 25 +- 5 files changed, 283 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/postgresql/README.md b/plugins/outputs/postgresql/README.md index 7a0c628e0..252704275 100644 --- a/plugins/outputs/postgresql/README.md +++ b/plugins/outputs/postgresql/README.md @@ -132,6 +132,12 @@ to use them. ## tag IDs. Each entry consumes approximately 34 bytes of memory. # tag_cache_size = 100000 + ## Cut column names at the given length to not exceed PostgreSQL's + ## 'identifier length' limit (default: no limit) + ## (see https://www.postgresql.org/docs/current/limits.html) + ## Be careful to not create duplicate column names! + # column_name_length_limit = 0 + ## Enable & set the log level for the Postgres driver. # log_level = "warn" # trace, debug, info, warn, error, none ``` @@ -197,6 +203,19 @@ Documentation on how to write templates can be found [sqltemplate docs][1] [1]: https://pkg.go.dev/github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate +## Long Column Names + +Postgres imposes a limit on the length of column identifiers, which can be found +in the [official docs](https://www.postgresql.org/docs/current/limits.html). By +default Telegraf does not enforce this limit as this limit can be modified on +the server side. Furthermore, cutting off column names could lead to collisions +if the columns are only different after the cut-off. + +> [!WARNING] +> Make sure you will not cause column name collisions when setting +> `column_name_length_limit`! If in doubt, explicitly shorten the field and tag +> names using e.g. the regexp processor. + ### Samples #### TimescaleDB diff --git a/plugins/outputs/postgresql/postgresql.go b/plugins/outputs/postgresql/postgresql.go index 0e93b061e..2f1a8164a 100644 --- a/plugins/outputs/postgresql/postgresql.go +++ b/plugins/outputs/postgresql/postgresql.go @@ -51,6 +51,7 @@ type Postgresql struct { Uint64Type string `toml:"uint64_type"` RetryMaxBackoff config.Duration `toml:"retry_max_backoff"` TagCacheSize int `toml:"tag_cache_size"` + ColumnNameLenLimit int `toml:"column_name_length_limit"` LogLevel string `toml:"log_level"` Logger telegraf.Logger `toml:"-"` diff --git a/plugins/outputs/postgresql/postgresql_test.go b/plugins/outputs/postgresql/postgresql_test.go index 846f5c27f..ad9afee45 100644 --- a/plugins/outputs/postgresql/postgresql_test.go +++ b/plugins/outputs/postgresql/postgresql_test.go @@ -976,3 +976,239 @@ func TestStressConcurrencyIntegration(t *testing.T) { } } } + +func TestLongColumnNamesErrorIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Setup the plugin + p, err := newPostgresqlTest(t) + require.NoError(t, err) + require.NoError(t, p.Init()) + require.NoError(t, p.Connect()) + + // Define the metric to send + metrics := []telegraf.Metric{ + metric.New( + t.Name(), + map[string]string{}, + map[string]interface{}{ + "a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(0), + "value": 42, + }, + time.Unix(0, 0).UTC(), + ), + metric.New( + t.Name(), + map[string]string{}, + map[string]interface{}{ + "a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(1), + "value": 43, + }, + time.Unix(0, 1).UTC(), + ), + metric.New( + t.Name(), + map[string]string{}, + map[string]interface{}{ + "a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(2), + "value": 44, + }, + time.Unix(0, 2).UTC(), + ), + metric.New( + t.Name(), + map[string]string{}, + map[string]interface{}{ + "a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(99), + "value": 45, + }, + time.Unix(0, 9).UTC(), + ), + } + require.NoError(t, p.Write(metrics)) + require.NoError(t, p.Write(metrics)) + + // Check if the logging is restricted to once per field and all columns are + // mentioned + var longColLogErrs []string + for _, l := range p.Logger.logs { + msg := l.String() + if l.level == pgx.LogLevelError && strings.Contains(msg, "Column name too long") { + longColLogErrs = append(longColLogErrs, strings.TrimPrefix(msg, "error: Column name too long: ")) + } + } + excpectedLongColumns := []string{ + `"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`, + `"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`, + } + require.ElementsMatch(t, excpectedLongColumns, longColLogErrs) + + // Denote the expected data in the table + expected := []map[string]interface{}{ + {"time": time.Unix(0, 0).Unix(), "value": int64(42)}, + {"time": time.Unix(0, 1).Unix(), "value": int64(43)}, + {"time": time.Unix(0, 2).Unix(), "value": int64(44)}, + {"time": time.Unix(0, 9).Unix(), "value": int64(45)}, + {"time": time.Unix(0, 0).Unix(), "value": int64(42)}, + {"time": time.Unix(0, 1).Unix(), "value": int64(43)}, + {"time": time.Unix(0, 2).Unix(), "value": int64(44)}, + {"time": time.Unix(0, 9).Unix(), "value": int64(45)}, + } + + // Get the actual table data nd convert the time to a timestamp for + // easier comparison + dump := dbTableDump(t, p.db, "") + require.Len(t, dump, len(expected)) + for i, actual := range dump { + if raw, found := actual["time"]; found { + if t, ok := raw.(time.Time); ok { + actual["time"] = t.Unix() + } + } + require.EqualValues(t, expected[i], actual) + } +} + +func TestLongColumnNamesClipIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Setup the plugin + p, err := newPostgresqlTest(t) + require.NoError(t, err) + p.ColumnNameLenLimit = 63 + require.NoError(t, p.Init()) + require.NoError(t, p.Connect()) + + // Define the metric to send + metrics := []telegraf.Metric{ + metric.New( + t.Name(), + map[string]string{}, + map[string]interface{}{ + "a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(0), + "value": 42, + }, + time.Unix(0, 0).UTC(), + ), + metric.New( + t.Name(), + map[string]string{}, + map[string]interface{}{ + "a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(1), + "value": 43, + }, + time.Unix(0, 1).UTC(), + ), + metric.New( + t.Name(), + map[string]string{}, + map[string]interface{}{ + "a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(2), + "value": 44, + }, + time.Unix(0, 2).UTC(), + ), + metric.New( + t.Name(), + map[string]string{}, + map[string]interface{}{ + "a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(99), + "value": 45, + }, + time.Unix(0, 9).UTC(), + ), + } + require.NoError(t, p.Write(metrics)) + require.NoError(t, p.Write(metrics)) + + // Check if the logging is restricted to once per field and all columns are mentioned + var longColLogWarns []string + var longColLogErrs []string + for _, l := range p.Logger.logs { + msg := l.String() + if l.level == pgx.LogLevelWarn && strings.Contains(msg, "Limiting too long column name") { + longColLogWarns = append(longColLogWarns, strings.TrimPrefix(msg, "warn: Limiting too long column name: ")) + continue + } + if l.level == pgx.LogLevelError && strings.Contains(msg, "Column name too long") { + longColLogErrs = append(longColLogErrs, strings.TrimPrefix(msg, "error: Column name too long: ")) + continue + } + } + + excpectedLongColumns := []string{ + `"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`, + `"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`, + } + require.ElementsMatch(t, excpectedLongColumns, longColLogWarns) + require.Empty(t, longColLogErrs) + + // Denote the expected data in the table + expected := []map[string]interface{}{ + { + "time": time.Unix(0, 0).Unix(), + "a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(0), + "a_field_with_another_very_long_name_exceeding_the_column_name_l": nil, + "value": int64(42), + }, + { + "time": time.Unix(0, 1).Unix(), + "a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(1), + "a_field_with_another_very_long_name_exceeding_the_column_name_l": nil, + "value": int64(43), + }, + { + "time": time.Unix(0, 2).Unix(), + "a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(2), + "a_field_with_another_very_long_name_exceeding_the_column_name_l": nil, + "value": int64(44), + }, + { + "time": time.Unix(0, 9).Unix(), + "a_field_with_a_some_very_long_name_exceeding_the_column_name_li": nil, + "a_field_with_another_very_long_name_exceeding_the_column_name_l": int64(99), + "value": int64(45), + }, + { + "time": time.Unix(0, 0).Unix(), + "a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(0), + "a_field_with_another_very_long_name_exceeding_the_column_name_l": nil, + "value": int64(42), + }, + { + "time": time.Unix(0, 1).Unix(), + "a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(1), + "a_field_with_another_very_long_name_exceeding_the_column_name_l": nil, + "value": int64(43), + }, + { + "time": time.Unix(0, 2).Unix(), + "a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(2), + "a_field_with_another_very_long_name_exceeding_the_column_name_l": nil, + "value": int64(44), + }, + { + "time": time.Unix(0, 9).Unix(), + "a_field_with_a_some_very_long_name_exceeding_the_column_name_li": nil, + "a_field_with_another_very_long_name_exceeding_the_column_name_l": int64(99), + "value": int64(45), + }, + } + + // Get the actual table data nd convert the time to a timestamp for + // easier comparison + dump := dbTableDump(t, p.db, "") + require.Len(t, dump, len(expected)) + for i, actual := range dump { + if raw, found := actual["time"]; found { + if t, ok := raw.(time.Time); ok { + actual["time"] = t.Unix() + } + } + require.EqualValues(t, expected[i], actual) + } +} diff --git a/plugins/outputs/postgresql/sample.conf b/plugins/outputs/postgresql/sample.conf index e1a99e796..13cc0c32f 100644 --- a/plugins/outputs/postgresql/sample.conf +++ b/plugins/outputs/postgresql/sample.conf @@ -87,5 +87,11 @@ ## tag IDs. Each entry consumes approximately 34 bytes of memory. # tag_cache_size = 100000 + ## Cut column names at the given length to not exceed PostgreSQL's + ## 'identifier length' limit (default: no limit) + ## (see https://www.postgresql.org/docs/current/limits.html) + ## Be careful to not create duplicate column names! + # column_name_length_limit = 0 + ## Enable & set the log level for the Postgres driver. # log_level = "warn" # trace, debug, info, warn, error, none diff --git a/plugins/outputs/postgresql/table_manager.go b/plugins/outputs/postgresql/table_manager.go index 7dea7f8fa..a0cd5a9bc 100644 --- a/plugins/outputs/postgresql/table_manager.go +++ b/plugins/outputs/postgresql/table_manager.go @@ -27,14 +27,20 @@ type TableManager struct { // map[tableName]map[columnName]utils.Column tables map[string]*tableState tablesMutex sync.Mutex + + // Map to track which columns are already logged + loggedLongColumnWarn map[string]bool + loggedLongColumnErr map[string]bool } // NewTableManager returns an instance of the tables.Manager interface // that can handle checking and updating the state of tables in the PG database. func NewTableManager(postgresql *Postgresql) *TableManager { return &TableManager{ - Postgresql: postgresql, - tables: make(map[string]*tableState), + Postgresql: postgresql, + tables: make(map[string]*tableState), + loggedLongColumnWarn: make(map[string]bool), + loggedLongColumnErr: make(map[string]bool), } } @@ -178,7 +184,15 @@ func (tm *TableManager) EnsureStructure( // check that the missing columns are columns that can be added addColumns := make([]utils.Column, 0, len(missingCols)) invalidColumns := make([]utils.Column, 0, len(missingCols)) - for _, col := range missingCols { + for i, col := range missingCols { + if tm.ColumnNameLenLimit > 0 && len(col.Name) > tm.ColumnNameLenLimit { + if !tm.loggedLongColumnWarn[col.Name] { + tm.Postgresql.Logger.Warnf("Limiting too long column name: %q", col.Name) + tm.loggedLongColumnWarn[col.Name] = true + } + col.Name = col.Name[:tm.ColumnNameLenLimit] + missingCols[i] = col + } if tm.validateColumnName(col.Name) { addColumns = append(addColumns, col) continue @@ -187,7 +201,10 @@ func (tm *TableManager) EnsureStructure( if col.Role == utils.TagColType { return nil, fmt.Errorf("column name too long: %q", col.Name) } - tm.Postgresql.Logger.Errorf("Column name too long: %q", col.Name) + if !tm.loggedLongColumnErr[col.Name] { + tm.Postgresql.Logger.Errorf("Column name too long: %q", col.Name) + tm.loggedLongColumnErr[col.Name] = true + } invalidColumns = append(invalidColumns, col) }