feat(outputs.postgresql): Add option to rename time column (#13750)
This commit is contained in:
parent
879c42d26c
commit
17c7c0252b
|
|
@ -54,6 +54,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## Store all fields as a JSONB object in a single 'fields' column.
|
## Store all fields as a JSONB object in a single 'fields' column.
|
||||||
# fields_as_jsonb = false
|
# fields_as_jsonb = false
|
||||||
|
|
||||||
|
## Name of the timestamp column
|
||||||
|
## NOTE: Some tools (e.g. Grafana) require the default name so be careful!
|
||||||
|
# timestamp_column_name = "time"
|
||||||
|
|
||||||
## Templated statements to execute when creating a new table.
|
## Templated statements to execute when creating a new table.
|
||||||
# create_templates = [
|
# create_templates = [
|
||||||
# '''CREATE TABLE {{ .table }} ({{ .columns }})''',
|
# '''CREATE TABLE {{ .table }} ({{ .columns }})''',
|
||||||
|
|
|
||||||
|
|
@ -2,22 +2,6 @@ package postgresql
|
||||||
|
|
||||||
import "github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
import "github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
||||||
|
|
||||||
// Column names and data types for standard fields (time, tag_id, tags, and fields)
|
|
||||||
const (
|
|
||||||
timeColumnName = "time"
|
|
||||||
timeColumnDataType = PgTimestampWithoutTimeZone
|
|
||||||
tagIDColumnName = "tag_id"
|
|
||||||
tagIDColumnDataType = PgBigInt
|
|
||||||
tagsJSONColumnName = "tags"
|
|
||||||
fieldsJSONColumnName = "fields"
|
|
||||||
jsonColumnDataType = PgJSONb
|
|
||||||
)
|
|
||||||
|
|
||||||
var timeColumn = utils.Column{Name: timeColumnName, Type: timeColumnDataType, Role: utils.TimeColType}
|
|
||||||
var tagIDColumn = utils.Column{Name: tagIDColumnName, Type: tagIDColumnDataType, Role: utils.TagsIDColType}
|
|
||||||
var fieldsJSONColumn = utils.Column{Name: fieldsJSONColumnName, Type: jsonColumnDataType, Role: utils.FieldColType}
|
|
||||||
var tagsJSONColumn = utils.Column{Name: tagsJSONColumnName, Type: jsonColumnDataType, Role: utils.TagColType}
|
|
||||||
|
|
||||||
func (p *Postgresql) columnFromTag(key string, value interface{}) utils.Column {
|
func (p *Postgresql) columnFromTag(key string, value interface{}) utils.Column {
|
||||||
return utils.Column{Name: key, Type: p.derivePgDatatype(value), Role: utils.TagColType}
|
return utils.Column{Name: key, Type: p.derivePgDatatype(value), Role: utils.TagColType}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,7 @@ type Postgresql struct {
|
||||||
ForeignTagConstraint bool `toml:"foreign_tag_constraint"`
|
ForeignTagConstraint bool `toml:"foreign_tag_constraint"`
|
||||||
TagsAsJsonb bool `toml:"tags_as_jsonb"`
|
TagsAsJsonb bool `toml:"tags_as_jsonb"`
|
||||||
FieldsAsJsonb bool `toml:"fields_as_jsonb"`
|
FieldsAsJsonb bool `toml:"fields_as_jsonb"`
|
||||||
|
TimestampColumnName string `toml:"timestamp_column_name"`
|
||||||
CreateTemplates []*sqltemplate.Template `toml:"create_templates"`
|
CreateTemplates []*sqltemplate.Template `toml:"create_templates"`
|
||||||
AddColumnTemplates []*sqltemplate.Template `toml:"add_column_templates"`
|
AddColumnTemplates []*sqltemplate.Template `toml:"add_column_templates"`
|
||||||
TagTableCreateTemplates []*sqltemplate.Template `toml:"tag_table_create_templates"`
|
TagTableCreateTemplates []*sqltemplate.Template `toml:"tag_table_create_templates"`
|
||||||
|
|
@ -49,6 +50,7 @@ type Postgresql struct {
|
||||||
RetryMaxBackoff config.Duration `toml:"retry_max_backoff"`
|
RetryMaxBackoff config.Duration `toml:"retry_max_backoff"`
|
||||||
TagCacheSize int `toml:"tag_cache_size"`
|
TagCacheSize int `toml:"tag_cache_size"`
|
||||||
LogLevel string `toml:"log_level"`
|
LogLevel string `toml:"log_level"`
|
||||||
|
Logger telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
dbContext context.Context
|
dbContext context.Context
|
||||||
dbContextCancel func()
|
dbContextCancel func()
|
||||||
|
|
@ -62,7 +64,11 @@ type Postgresql struct {
|
||||||
writeChan chan *TableSource
|
writeChan chan *TableSource
|
||||||
writeWaitGroup *utils.WaitGroup
|
writeWaitGroup *utils.WaitGroup
|
||||||
|
|
||||||
Logger telegraf.Logger `toml:"-"`
|
// Column types
|
||||||
|
timeColumn utils.Column
|
||||||
|
tagIDColumn utils.Column
|
||||||
|
fieldsJSONColumn utils.Column
|
||||||
|
tagsJSONColumn utils.Column
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
@ -99,6 +105,21 @@ func (p *Postgresql) Init() error {
|
||||||
return fmt.Errorf("invalid tag_cache_size")
|
return fmt.Errorf("invalid tag_cache_size")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set the time-column name
|
||||||
|
if p.TimestampColumnName == "" {
|
||||||
|
p.TimestampColumnName = "time"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the column prototypes
|
||||||
|
p.timeColumn = utils.Column{
|
||||||
|
Name: p.TimestampColumnName,
|
||||||
|
Type: PgTimestampWithoutTimeZone,
|
||||||
|
Role: utils.TimeColType,
|
||||||
|
}
|
||||||
|
p.tagIDColumn = utils.Column{Name: "tag_id", Type: PgBigInt, Role: utils.TagsIDColType}
|
||||||
|
p.fieldsJSONColumn = utils.Column{Name: "fields", Type: PgJSONb, Role: utils.FieldColType}
|
||||||
|
p.tagsJSONColumn = utils.Column{Name: "tags", Type: PgJSONb, Role: utils.TagColType}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if p.dbConfig, err = pgxpool.ParseConfig(p.Connection); err != nil {
|
if p.dbConfig, err = pgxpool.ParseConfig(p.Connection); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
@ -588,6 +589,39 @@ func TestWriteIntegration_concurrentTempError(t *testing.T) {
|
||||||
require.True(t, haveError, "write error not found in log")
|
require.True(t, haveError, "write error not found in log")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTimestampColumnNameIntegration(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
p := newPostgresqlTest(t)
|
||||||
|
p.TimestampColumnName = "timestamp"
|
||||||
|
require.NoError(t, p.Init())
|
||||||
|
require.NoError(t, p.Connect())
|
||||||
|
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
metric.New(t.Name(), map[string]string{}, map[string]interface{}{"v": 42}, time.Unix(1691747345, 0)),
|
||||||
|
}
|
||||||
|
require.NoError(t, p.Write(metrics))
|
||||||
|
|
||||||
|
dump := dbTableDump(t, p.db, "")
|
||||||
|
require.Len(t, dump, 1)
|
||||||
|
require.EqualValues(t, 42, dump[0]["v"])
|
||||||
|
require.EqualValues(t, time.Unix(1691747345, 0).UTC(), dump[0]["timestamp"])
|
||||||
|
require.NotContains(t, dump[0], "time")
|
||||||
|
|
||||||
|
p.Logger.Clear()
|
||||||
|
require.NoError(t, p.Write(metrics))
|
||||||
|
|
||||||
|
stmtCount := 0
|
||||||
|
for _, log := range p.Logger.Logs() {
|
||||||
|
if strings.Contains(log.String(), "info: PG ") {
|
||||||
|
stmtCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.Equal(t, 3, stmtCount) // BEGIN, COPY metrics table, COMMIT
|
||||||
|
}
|
||||||
|
|
||||||
func TestWriteTagTableIntegration(t *testing.T) {
|
func TestWriteTagTableIntegration(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("Skipping integration test in short mode")
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,10 @@
|
||||||
## Store all fields as a JSONB object in a single 'fields' column.
|
## Store all fields as a JSONB object in a single 'fields' column.
|
||||||
# fields_as_jsonb = false
|
# fields_as_jsonb = false
|
||||||
|
|
||||||
|
## Name of the timestamp column
|
||||||
|
## NOTE: Some tools (e.g. Grafana) require the default name so be careful!
|
||||||
|
# timestamp_column_name = "time"
|
||||||
|
|
||||||
## Templated statements to execute when creating a new table.
|
## Templated statements to execute when creating a new table.
|
||||||
# create_templates = [
|
# create_templates = [
|
||||||
# '''CREATE TABLE {{ .table }} ({{ .columns }})''',
|
# '''CREATE TABLE {{ .table }} ({{ .columns }})''',
|
||||||
|
|
|
||||||
|
|
@ -318,16 +318,17 @@ func (tm *TableManager) getColumns(ctx context.Context, db dbh, name string) (ma
|
||||||
|
|
||||||
role := utils.FieldColType
|
role := utils.FieldColType
|
||||||
switch colName {
|
switch colName {
|
||||||
case timeColumnName:
|
case tm.timeColumn.Name:
|
||||||
role = utils.TimeColType
|
role = utils.TimeColType
|
||||||
case tagIDColumnName:
|
case tm.tagIDColumn.Name:
|
||||||
role = utils.TagsIDColType
|
role = utils.TagsIDColType
|
||||||
case tagsJSONColumnName:
|
case tm.tagsJSONColumn.Name:
|
||||||
role = utils.TagColType
|
role = utils.TagColType
|
||||||
case fieldsJSONColumnName:
|
case tm.fieldsJSONColumn.Name:
|
||||||
role = utils.FieldColType
|
role = utils.FieldColType
|
||||||
default:
|
default:
|
||||||
// We don't want to monopolize the column comment (preventing user from storing other information there), so just look at the first word
|
// We don't want to monopolize the column comment (preventing user from storing other information there),
|
||||||
|
// so just look at the first word
|
||||||
if desc != nil {
|
if desc != nil {
|
||||||
descWords := strings.Split(*desc, " ")
|
descWords := strings.Split(*desc, " ")
|
||||||
if descWords[0] == "tag" {
|
if descWords[0] == "tag" {
|
||||||
|
|
|
||||||
|
|
@ -133,7 +133,7 @@ func (tsrc *TableSource) TagColumns() []utils.Column {
|
||||||
var cols []utils.Column
|
var cols []utils.Column
|
||||||
|
|
||||||
if tsrc.postgresql.TagsAsJsonb {
|
if tsrc.postgresql.TagsAsJsonb {
|
||||||
cols = append(cols, tagsJSONColumn)
|
cols = append(cols, tsrc.postgresql.tagsJSONColumn)
|
||||||
} else {
|
} else {
|
||||||
cols = append(cols, tsrc.tagColumns.columns...)
|
cols = append(cols, tsrc.tagColumns.columns...)
|
||||||
}
|
}
|
||||||
|
|
@ -149,17 +149,17 @@ func (tsrc *TableSource) FieldColumns() []utils.Column {
|
||||||
// MetricTableColumns returns the full column list, including time, tag id or tags, and fields.
|
// MetricTableColumns returns the full column list, including time, tag id or tags, and fields.
|
||||||
func (tsrc *TableSource) MetricTableColumns() []utils.Column {
|
func (tsrc *TableSource) MetricTableColumns() []utils.Column {
|
||||||
cols := []utils.Column{
|
cols := []utils.Column{
|
||||||
timeColumn,
|
tsrc.postgresql.timeColumn,
|
||||||
}
|
}
|
||||||
|
|
||||||
if tsrc.postgresql.TagsAsForeignKeys {
|
if tsrc.postgresql.TagsAsForeignKeys {
|
||||||
cols = append(cols, tagIDColumn)
|
cols = append(cols, tsrc.postgresql.tagIDColumn)
|
||||||
} else {
|
} else {
|
||||||
cols = append(cols, tsrc.TagColumns()...)
|
cols = append(cols, tsrc.TagColumns()...)
|
||||||
}
|
}
|
||||||
|
|
||||||
if tsrc.postgresql.FieldsAsJsonb {
|
if tsrc.postgresql.FieldsAsJsonb {
|
||||||
cols = append(cols, fieldsJSONColumn)
|
cols = append(cols, tsrc.postgresql.fieldsJSONColumn)
|
||||||
} else {
|
} else {
|
||||||
cols = append(cols, tsrc.FieldColumns()...)
|
cols = append(cols, tsrc.FieldColumns()...)
|
||||||
}
|
}
|
||||||
|
|
@ -169,7 +169,7 @@ func (tsrc *TableSource) MetricTableColumns() []utils.Column {
|
||||||
|
|
||||||
func (tsrc *TableSource) TagTableColumns() []utils.Column {
|
func (tsrc *TableSource) TagTableColumns() []utils.Column {
|
||||||
cols := []utils.Column{
|
cols := []utils.Column{
|
||||||
tagIDColumn,
|
tsrc.postgresql.tagIDColumn,
|
||||||
}
|
}
|
||||||
|
|
||||||
cols = append(cols, tsrc.TagColumns()...)
|
cols = append(cols, tsrc.TagColumns()...)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue