fix(inputs.postgresql_extensible): Use same timestamp for each gather (#15401)
This commit is contained in:
parent
82902ebd06
commit
898b1c3e2c
|
|
@ -114,17 +114,21 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
|
||||||
dbVersion = 0
|
dbVersion = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set default timestamp to Now and use for all generated metrics during
|
||||||
|
// the same Gather call
|
||||||
|
timestamp := time.Now()
|
||||||
|
|
||||||
// We loop in order to process each query
|
// We loop in order to process each query
|
||||||
// Query is not run if Database version does not match the query version.
|
// Query is not run if Database version does not match the query version.
|
||||||
for _, q := range p.Query {
|
for _, q := range p.Query {
|
||||||
if q.MinVersion <= dbVersion && (q.MaxVersion == 0 || q.MaxVersion > dbVersion) {
|
if q.MinVersion <= dbVersion && (q.MaxVersion == 0 || q.MaxVersion > dbVersion) {
|
||||||
acc.AddError(p.gatherMetricsFromQuery(acc, q))
|
acc.AddError(p.gatherMetricsFromQuery(acc, q, timestamp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Postgresql) gatherMetricsFromQuery(acc telegraf.Accumulator, q query) error {
|
func (p *Postgresql) gatherMetricsFromQuery(acc telegraf.Accumulator, q query, timestamp time.Time) error {
|
||||||
rows, err := p.service.DB.Query(q.Sqlquery)
|
rows, err := p.service.DB.Query(q.Sqlquery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -139,7 +143,7 @@ func (p *Postgresql) gatherMetricsFromQuery(acc telegraf.Accumulator, q query) e
|
||||||
}
|
}
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
if err := p.accRow(acc, rows, columns, q); err != nil {
|
if err := p.accRow(acc, rows, columns, q, timestamp); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -150,7 +154,7 @@ type scanner interface {
|
||||||
Scan(dest ...interface{}) error
|
Scan(dest ...interface{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Postgresql) accRow(acc telegraf.Accumulator, row scanner, columns []string, q query) error {
|
func (p *Postgresql) accRow(acc telegraf.Accumulator, row scanner, columns []string, q query, timestamp time.Time) error {
|
||||||
// this is where we'll store the column name with its *interface{}
|
// this is where we'll store the column name with its *interface{}
|
||||||
columnMap := make(map[string]*interface{})
|
columnMap := make(map[string]*interface{})
|
||||||
|
|
||||||
|
|
@ -188,9 +192,6 @@ func (p *Postgresql) accRow(acc telegraf.Accumulator, row scanner, columns []str
|
||||||
"db": dbname.String(),
|
"db": dbname.String(),
|
||||||
}
|
}
|
||||||
|
|
||||||
// set default timestamp to Now
|
|
||||||
timestamp := time.Now()
|
|
||||||
|
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
for col, val := range columnMap {
|
for col, val := range columnMap {
|
||||||
p.Log.Debugf("Column: %s = %T: %v\n", col, *val, *val)
|
p.Log.Debugf("Column: %s = %T: %v\n", col, *val, *val)
|
||||||
|
|
|
||||||
|
|
@ -338,7 +338,7 @@ func TestAccRow(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
q := query{Measurement: "pgTEST", additionalTags: make(map[string]bool)}
|
q := query{Measurement: "pgTEST", additionalTags: make(map[string]bool)}
|
||||||
require.NoError(t, p.accRow(&acc, tt.fields, columns, q))
|
require.NoError(t, p.accRow(&acc, tt.fields, columns, q, time.Now()))
|
||||||
require.Len(t, acc.Metrics, 1)
|
require.Len(t, acc.Metrics, 1)
|
||||||
metric := acc.Metrics[0]
|
metric := acc.Metrics[0]
|
||||||
require.Equal(t, tt.dbName, metric.Tags["db"])
|
require.Equal(t, tt.dbName, metric.Tags["db"])
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue