Add timestamp column support to postgresql_extensible (#8602)
This commit is contained in:
parent
3b87438dea
commit
0c99ae9e1d
|
|
@ -52,12 +52,17 @@ The example below has two queries are specified, with the following parameters:
|
||||||
# defined tags. The values in these columns must be of a string-type,
|
# defined tags. The values in these columns must be of a string-type,
|
||||||
# a number-type or a blob-type.
|
# a number-type or a blob-type.
|
||||||
#
|
#
|
||||||
|
# The timestamp field is used to override the data points timestamp value. By
|
||||||
|
# default, all rows inserted with current time. By setting a timestamp column,
|
||||||
|
# the row will be inserted with that column's value.
|
||||||
|
#
|
||||||
# Structure :
|
# Structure :
|
||||||
# [[inputs.postgresql_extensible.query]]
|
# [[inputs.postgresql_extensible.query]]
|
||||||
# sqlquery string
|
# sqlquery string
|
||||||
# version string
|
# version string
|
||||||
# withdbname boolean
|
# withdbname boolean
|
||||||
# tagvalue string (coma separated)
|
# tagvalue string (coma separated)
|
||||||
|
# timestamp string
|
||||||
[[inputs.postgresql_extensible.query]]
|
[[inputs.postgresql_extensible.query]]
|
||||||
sqlquery="SELECT * FROM pg_stat_database where datname"
|
sqlquery="SELECT * FROM pg_stat_database where datname"
|
||||||
version=901
|
version=901
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
_ "github.com/jackc/pgx/stdlib"
|
_ "github.com/jackc/pgx/stdlib"
|
||||||
|
|
||||||
|
|
@ -19,6 +20,7 @@ type Postgresql struct {
|
||||||
postgresql.Service
|
postgresql.Service
|
||||||
Databases []string
|
Databases []string
|
||||||
AdditionalTags []string
|
AdditionalTags []string
|
||||||
|
Timestamp string
|
||||||
Query query
|
Query query
|
||||||
Debug bool
|
Debug bool
|
||||||
|
|
||||||
|
|
@ -32,6 +34,7 @@ type query []struct {
|
||||||
Withdbname bool
|
Withdbname bool
|
||||||
Tagvalue string
|
Tagvalue string
|
||||||
Measurement string
|
Measurement string
|
||||||
|
Timestamp string
|
||||||
}
|
}
|
||||||
|
|
||||||
var ignoredColumns = map[string]bool{"stats_reset": true}
|
var ignoredColumns = map[string]bool{"stats_reset": true}
|
||||||
|
|
@ -82,6 +85,15 @@ var sampleConfig = `
|
||||||
## The script option can be used to specify the .sql file path.
|
## The script option can be used to specify the .sql file path.
|
||||||
## If script and sqlquery options specified at same time, sqlquery will be used
|
## If script and sqlquery options specified at same time, sqlquery will be used
|
||||||
##
|
##
|
||||||
|
## the tagvalue field is used to define custom tags (separated by comas).
|
||||||
|
## the query is expected to return columns which match the names of the
|
||||||
|
## defined tags. The values in these columns must be of a string-type,
|
||||||
|
## a number-type or a blob-type.
|
||||||
|
##
|
||||||
|
## The timestamp field is used to override the data points timestamp value. By
|
||||||
|
## default, all rows inserted with current time. By setting a timestamp column,
|
||||||
|
## the row will be inserted with that column's value.
|
||||||
|
##
|
||||||
## Structure :
|
## Structure :
|
||||||
## [[inputs.postgresql_extensible.query]]
|
## [[inputs.postgresql_extensible.query]]
|
||||||
## sqlquery string
|
## sqlquery string
|
||||||
|
|
@ -89,6 +101,7 @@ var sampleConfig = `
|
||||||
## withdbname boolean
|
## withdbname boolean
|
||||||
## tagvalue string (comma separated)
|
## tagvalue string (comma separated)
|
||||||
## measurement string
|
## measurement string
|
||||||
|
## timestamp string
|
||||||
[[inputs.postgresql_extensible.query]]
|
[[inputs.postgresql_extensible.query]]
|
||||||
sqlquery="SELECT * FROM pg_stat_database"
|
sqlquery="SELECT * FROM pg_stat_database"
|
||||||
version=901
|
version=901
|
||||||
|
|
@ -150,6 +163,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
|
||||||
query string
|
query string
|
||||||
tag_value string
|
tag_value string
|
||||||
meas_name string
|
meas_name string
|
||||||
|
timestamp string
|
||||||
columns []string
|
columns []string
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -164,6 +178,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
|
||||||
for i := range p.Query {
|
for i := range p.Query {
|
||||||
sql_query = p.Query[i].Sqlquery
|
sql_query = p.Query[i].Sqlquery
|
||||||
tag_value = p.Query[i].Tagvalue
|
tag_value = p.Query[i].Tagvalue
|
||||||
|
timestamp = p.Query[i].Timestamp
|
||||||
|
|
||||||
if p.Query[i].Measurement != "" {
|
if p.Query[i].Measurement != "" {
|
||||||
meas_name = p.Query[i].Measurement
|
meas_name = p.Query[i].Measurement
|
||||||
|
|
@ -206,6 +221,8 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.Timestamp = timestamp
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
err = p.accRow(meas_name, rows, acc, columns)
|
err = p.accRow(meas_name, rows, acc, columns)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -228,6 +245,7 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
|
||||||
columnVars []interface{}
|
columnVars []interface{}
|
||||||
dbname bytes.Buffer
|
dbname bytes.Buffer
|
||||||
tagAddress string
|
tagAddress string
|
||||||
|
timestamp time.Time
|
||||||
)
|
)
|
||||||
|
|
||||||
// this is where we'll store the column name with its *interface{}
|
// this is where we'll store the column name with its *interface{}
|
||||||
|
|
@ -269,6 +287,9 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
|
||||||
"db": dbname.String(),
|
"db": dbname.String(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set default timestamp to Now
|
||||||
|
timestamp = time.Now()
|
||||||
|
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
COLUMN:
|
COLUMN:
|
||||||
for col, val := range columnMap {
|
for col, val := range columnMap {
|
||||||
|
|
@ -278,6 +299,13 @@ COLUMN:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if col == p.Timestamp {
|
||||||
|
if v, ok := (*val).(time.Time); ok {
|
||||||
|
timestamp = v
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
for _, tag := range p.AdditionalTags {
|
for _, tag := range p.AdditionalTags {
|
||||||
if col != tag {
|
if col != tag {
|
||||||
continue
|
continue
|
||||||
|
|
@ -301,7 +329,7 @@ COLUMN:
|
||||||
fields[col] = *val
|
fields[col] = *val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
acc.AddFields(meas_name, fields, tags)
|
acc.AddFields(meas_name, fields, tags, timestamp)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/postgresql"
|
"github.com/influxdata/telegraf/plugins/inputs/postgresql"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
@ -126,6 +127,13 @@ func TestPostgresqlQueryOutputTests(t *testing.T) {
|
||||||
assert.True(t, found)
|
assert.True(t, found)
|
||||||
assert.Equal(t, true, v)
|
assert.Equal(t, true, v)
|
||||||
},
|
},
|
||||||
|
"SELECT timestamp'1980-07-23' as ts, true AS myvalue": func(acc *testutil.Accumulator) {
|
||||||
|
expectedTime := time.Date(1980, 7, 23, 0, 0, 0, 0, time.UTC)
|
||||||
|
v, found := acc.BoolField(measurement, "myvalue")
|
||||||
|
assert.True(t, found)
|
||||||
|
assert.Equal(t, true, v)
|
||||||
|
assert.True(t, acc.HasTimestamp(measurement, expectedTime))
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for q, assertions := range examples {
|
for q, assertions := range examples {
|
||||||
|
|
@ -134,6 +142,7 @@ func TestPostgresqlQueryOutputTests(t *testing.T) {
|
||||||
Version: 901,
|
Version: 901,
|
||||||
Withdbname: false,
|
Withdbname: false,
|
||||||
Tagvalue: "",
|
Tagvalue: "",
|
||||||
|
Timestamp: "ts",
|
||||||
}})
|
}})
|
||||||
assertions(acc)
|
assertions(acc)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue