diff --git a/plugins/outputs/postgresql/README.md b/plugins/outputs/postgresql/README.md index da00a1a21..bde0089d7 100644 --- a/plugins/outputs/postgresql/README.md +++ b/plugins/outputs/postgresql/README.md @@ -54,6 +54,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Store all fields as a JSONB object in a single 'fields' column. # fields_as_jsonb = false + ## Name of the timestamp column + ## NOTE: Some tools (e.g. Grafana) require the default name so be careful! + # timestamp_column_name = "time" + ## Templated statements to execute when creating a new table. # create_templates = [ # '''CREATE TABLE {{ .table }} ({{ .columns }})''', diff --git a/plugins/outputs/postgresql/columns.go b/plugins/outputs/postgresql/columns.go index ca08fdfd5..b5bec6bfb 100644 --- a/plugins/outputs/postgresql/columns.go +++ b/plugins/outputs/postgresql/columns.go @@ -2,22 +2,6 @@ package postgresql import "github.com/influxdata/telegraf/plugins/outputs/postgresql/utils" -// Column names and data types for standard fields (time, tag_id, tags, and fields) -const ( - timeColumnName = "time" - timeColumnDataType = PgTimestampWithoutTimeZone - tagIDColumnName = "tag_id" - tagIDColumnDataType = PgBigInt - tagsJSONColumnName = "tags" - fieldsJSONColumnName = "fields" - jsonColumnDataType = PgJSONb -) - -var timeColumn = utils.Column{Name: timeColumnName, Type: timeColumnDataType, Role: utils.TimeColType} -var tagIDColumn = utils.Column{Name: tagIDColumnName, Type: tagIDColumnDataType, Role: utils.TagsIDColType} -var fieldsJSONColumn = utils.Column{Name: fieldsJSONColumnName, Type: jsonColumnDataType, Role: utils.FieldColType} -var tagsJSONColumn = utils.Column{Name: tagsJSONColumnName, Type: jsonColumnDataType, Role: utils.TagColType} - func (p *Postgresql) columnFromTag(key string, value interface{}) utils.Column { return utils.Column{Name: key, Type: p.derivePgDatatype(value), Role: utils.TagColType} } diff --git a/plugins/outputs/postgresql/postgresql.go b/plugins/outputs/postgresql/postgresql.go index 713d52d92..cda801a63 100644 --- a/plugins/outputs/postgresql/postgresql.go +++ b/plugins/outputs/postgresql/postgresql.go @@ -41,6 +41,7 @@ type Postgresql struct { ForeignTagConstraint bool `toml:"foreign_tag_constraint"` TagsAsJsonb bool `toml:"tags_as_jsonb"` FieldsAsJsonb bool `toml:"fields_as_jsonb"` + TimestampColumnName string `toml:"timestamp_column_name"` CreateTemplates []*sqltemplate.Template `toml:"create_templates"` AddColumnTemplates []*sqltemplate.Template `toml:"add_column_templates"` TagTableCreateTemplates []*sqltemplate.Template `toml:"tag_table_create_templates"` @@ -49,6 +50,7 @@ type Postgresql struct { RetryMaxBackoff config.Duration `toml:"retry_max_backoff"` TagCacheSize int `toml:"tag_cache_size"` LogLevel string `toml:"log_level"` + Logger telegraf.Logger `toml:"-"` dbContext context.Context dbContextCancel func() @@ -62,7 +64,11 @@ type Postgresql struct { writeChan chan *TableSource writeWaitGroup *utils.WaitGroup - Logger telegraf.Logger `toml:"-"` + // Column types + timeColumn utils.Column + tagIDColumn utils.Column + fieldsJSONColumn utils.Column + tagsJSONColumn utils.Column } func init() { @@ -99,6 +105,21 @@ func (p *Postgresql) Init() error { return fmt.Errorf("invalid tag_cache_size") } + // Set the time-column name + if p.TimestampColumnName == "" { + p.TimestampColumnName = "time" + } + + // Initialize the column prototypes + p.timeColumn = utils.Column{ + Name: p.TimestampColumnName, + Type: PgTimestampWithoutTimeZone, + Role: utils.TimeColType, + } + p.tagIDColumn = utils.Column{Name: "tag_id", Type: PgBigInt, Role: utils.TagsIDColType} + p.fieldsJSONColumn = utils.Column{Name: "fields", Type: PgJSONb, Role: utils.FieldColType} + p.tagsJSONColumn = utils.Column{Name: "tags", Type: PgJSONb, Role: utils.TagColType} + var err error if p.dbConfig, err = pgxpool.ParseConfig(p.Connection); err != nil { return err diff --git a/plugins/outputs/postgresql/postgresql_test.go b/plugins/outputs/postgresql/postgresql_test.go index f0917fc63..4ba557d85 100644 --- a/plugins/outputs/postgresql/postgresql_test.go +++ b/plugins/outputs/postgresql/postgresql_test.go @@ -17,6 +17,7 @@ import ( "github.com/testcontainers/testcontainers-go/wait" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/postgresql/utils" "github.com/influxdata/telegraf/testutil" ) @@ -588,6 +589,39 @@ func TestWriteIntegration_concurrentTempError(t *testing.T) { require.True(t, haveError, "write error not found in log") } +func TestTimestampColumnNameIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := newPostgresqlTest(t) + p.TimestampColumnName = "timestamp" + require.NoError(t, p.Init()) + require.NoError(t, p.Connect()) + + metrics := []telegraf.Metric{ + metric.New(t.Name(), map[string]string{}, map[string]interface{}{"v": 42}, time.Unix(1691747345, 0)), + } + require.NoError(t, p.Write(metrics)) + + dump := dbTableDump(t, p.db, "") + require.Len(t, dump, 1) + require.EqualValues(t, 42, dump[0]["v"]) + require.EqualValues(t, time.Unix(1691747345, 0).UTC(), dump[0]["timestamp"]) + require.NotContains(t, dump[0], "time") + + p.Logger.Clear() + require.NoError(t, p.Write(metrics)) + + stmtCount := 0 + for _, log := range p.Logger.Logs() { + if strings.Contains(log.String(), "info: PG ") { + stmtCount++ + } + } + require.Equal(t, 3, stmtCount) // BEGIN, COPY metrics table, COMMIT +} + func TestWriteTagTableIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") diff --git a/plugins/outputs/postgresql/sample.conf b/plugins/outputs/postgresql/sample.conf index 12199473a..d2493f17c 100644 --- a/plugins/outputs/postgresql/sample.conf +++ b/plugins/outputs/postgresql/sample.conf @@ -37,6 +37,10 @@ ## Store all fields as a JSONB object in a single 'fields' column. # fields_as_jsonb = false + ## Name of the timestamp column + ## NOTE: Some tools (e.g. Grafana) require the default name so be careful! + # timestamp_column_name = "time" + ## Templated statements to execute when creating a new table. # create_templates = [ # '''CREATE TABLE {{ .table }} ({{ .columns }})''', diff --git a/plugins/outputs/postgresql/table_manager.go b/plugins/outputs/postgresql/table_manager.go index e240cc3a3..205130747 100644 --- a/plugins/outputs/postgresql/table_manager.go +++ b/plugins/outputs/postgresql/table_manager.go @@ -318,16 +318,17 @@ func (tm *TableManager) getColumns(ctx context.Context, db dbh, name string) (ma role := utils.FieldColType switch colName { - case timeColumnName: + case tm.timeColumn.Name: role = utils.TimeColType - case tagIDColumnName: + case tm.tagIDColumn.Name: role = utils.TagsIDColType - case tagsJSONColumnName: + case tm.tagsJSONColumn.Name: role = utils.TagColType - case fieldsJSONColumnName: + case tm.fieldsJSONColumn.Name: role = utils.FieldColType default: - // We don't want to monopolize the column comment (preventing user from storing other information there), so just look at the first word + // We don't want to monopolize the column comment (preventing user from storing other information there), + // so just look at the first word if desc != nil { descWords := strings.Split(*desc, " ") if descWords[0] == "tag" { diff --git a/plugins/outputs/postgresql/table_source.go b/plugins/outputs/postgresql/table_source.go index 18302a910..2584ae464 100644 --- a/plugins/outputs/postgresql/table_source.go +++ b/plugins/outputs/postgresql/table_source.go @@ -133,7 +133,7 @@ func (tsrc *TableSource) TagColumns() []utils.Column { var cols []utils.Column if tsrc.postgresql.TagsAsJsonb { - cols = append(cols, tagsJSONColumn) + cols = append(cols, tsrc.postgresql.tagsJSONColumn) } else { cols = append(cols, tsrc.tagColumns.columns...) } @@ -149,17 +149,17 @@ func (tsrc *TableSource) FieldColumns() []utils.Column { // MetricTableColumns returns the full column list, including time, tag id or tags, and fields. func (tsrc *TableSource) MetricTableColumns() []utils.Column { cols := []utils.Column{ - timeColumn, + tsrc.postgresql.timeColumn, } if tsrc.postgresql.TagsAsForeignKeys { - cols = append(cols, tagIDColumn) + cols = append(cols, tsrc.postgresql.tagIDColumn) } else { cols = append(cols, tsrc.TagColumns()...) } if tsrc.postgresql.FieldsAsJsonb { - cols = append(cols, fieldsJSONColumn) + cols = append(cols, tsrc.postgresql.fieldsJSONColumn) } else { cols = append(cols, tsrc.FieldColumns()...) } @@ -169,7 +169,7 @@ func (tsrc *TableSource) MetricTableColumns() []utils.Column { func (tsrc *TableSource) TagTableColumns() []utils.Column { cols := []utils.Column{ - tagIDColumn, + tsrc.postgresql.tagIDColumn, } cols = append(cols, tsrc.TagColumns()...)