Add SQL output plugin (#9280)

This commit is contained in:
reimda 2021-06-03 22:49:55 -06:00 committed by GitHub
parent ee44aee1ca
commit e289612ff3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1373 additions and 60 deletions

View File

@ -326,6 +326,7 @@ For documentation on the latest development code see the [documentation index][d
* [solr](./plugins/inputs/solr)
* [sql server](./plugins/inputs/sqlserver) (microsoft)
* [stackdriver](./plugins/inputs/stackdriver) (Google Cloud Monitoring)
* [sql](./plugins/outputs/sql) (SQL generic output)
* [statsd](./plugins/inputs/statsd)
* [suricata](./plugins/inputs/suricata)
* [swap](./plugins/inputs/swap)

View File

@ -10,6 +10,7 @@ following works:
- github.com/Azure/azure-event-hubs-go [MIT License](https://github.com/Azure/azure-event-hubs-go/blob/master/LICENSE)
- github.com/Azure/azure-pipeline-go [MIT License](https://github.com/Azure/azure-pipeline-go/blob/master/LICENSE)
- github.com/Azure/azure-sdk-for-go [Apache License 2.0](https://github.com/Azure/azure-sdk-for-go/blob/master/LICENSE)
- github.com/Azure/azure-storage-blob-go [MIT License](https://github.com/Azure/azure-storage-blob-go/blob/master/LICENSE)
- github.com/Azure/azure-storage-queue-go [MIT License](https://github.com/Azure/azure-storage-queue-go/blob/master/LICENSE)
- github.com/Azure/go-amqp [MIT License](https://github.com/Azure/go-amqp/blob/master/LICENSE)
- github.com/Azure/go-autorest [Apache License 2.0](https://github.com/Azure/go-autorest/blob/master/LICENSE)
@ -23,6 +24,7 @@ following works:
- github.com/amir/raidman [The Unlicense](https://github.com/amir/raidman/blob/master/UNLICENSE)
- github.com/antchfx/xmlquery [MIT License](https://github.com/antchfx/xmlquery/blob/master/LICENSE)
- github.com/antchfx/xpath [MIT License](https://github.com/antchfx/xpath/blob/master/LICENSE)
- github.com/apache/arrow/go/arrow [Apache License 2.0](https://github.com/apache/arrow/blob/master/LICENSE.txt)
- github.com/apache/thrift [Apache License 2.0](https://github.com/apache/thrift/blob/master/LICENSE)
- github.com/aristanetworks/glog [Apache License 2.0](https://github.com/aristanetworks/glog/blob/master/LICENSE)
- github.com/aristanetworks/goarista [Apache License 2.0](https://github.com/aristanetworks/goarista/blob/master/COPYING)
@ -32,8 +34,12 @@ following works:
- github.com/aws/aws-sdk-go-v2/config [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/config/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/credentials [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/credentials/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/feature/ec2/imds [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/feature/ec2/imds/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/feature/s3/manager [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/ec2 [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/ec2/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/internal/accept-encoding/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/internal/presigned-url [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/internal/presigned-url/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/internal/s3shared [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/internal/s3shared/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/s3 [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/s3/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/sso [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/ec2/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/sts [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/sts/LICENSE.txt)
- github.com/aws/smithy-go [Apache License 2.0](https://github.com/aws/smithy-go/blob/main/LICENSE)
@ -80,10 +86,12 @@ following works:
- github.com/golang/groupcache [Apache License 2.0](https://github.com/golang/groupcache/blob/master/LICENSE)
- github.com/golang/protobuf [BSD 3-Clause "New" or "Revised" License](https://github.com/golang/protobuf/blob/master/LICENSE)
- github.com/golang/snappy [BSD 3-Clause "New" or "Revised" License](https://github.com/golang/snappy/blob/master/LICENSE)
- github.com/google/flatbuffers [Apache License 2.0](https://github.com/google/flatbuffers/blob/master/LICENSE.txt)
- github.com/google/go-cmp [BSD 3-Clause "New" or "Revised" License](https://github.com/google/go-cmp/blob/master/LICENSE)
- github.com/google/go-github [BSD 3-Clause "New" or "Revised" License](https://github.com/google/go-github/blob/master/LICENSE)
- github.com/google/go-querystring [BSD 3-Clause "New" or "Revised" License](https://github.com/google/go-querystring/blob/master/LICENSE)
- github.com/google/gofuzz [Apache License 2.0](https://github.com/google/gofuzz/blob/master/LICENSE)
- github.com/google/uuid [BSD 3-Clause "New" or "Revised" License](https://github.com/google/uuid/blob/master/LICENSE)
- github.com/googleapis/gax-go [BSD 3-Clause "New" or "Revised" License](https://github.com/googleapis/gax-go/blob/master/LICENSE)
- github.com/googleapis/gnostic [Apache License 2.0](https://github.com/google/gnostic/blob/master/LICENSE)
- github.com/gopcua/opcua [MIT License](https://github.com/gopcua/opcua/blob/master/LICENSE)
@ -128,10 +136,10 @@ following works:
- github.com/karrick/godirwalk [BSD 2-Clause "Simplified" License](https://github.com/karrick/godirwalk/blob/master/LICENSE)
- github.com/kballard/go-shellquote [MIT License](https://github.com/kballard/go-shellquote/blob/master/LICENSE)
- github.com/klauspost/compress [BSD 3-Clause Clear License](https://github.com/klauspost/compress/blob/master/LICENSE)
- github.com/konsorten/go-windows-terminal-sequences [MIT License](https://github.com/konsorten/go-windows-terminal-sequences/blob/master/LICENSE)
- github.com/leodido/ragel-machinery [MIT License](https://github.com/leodido/ragel-machinery/blob/develop/LICENSE)
- github.com/mailru/easyjson [MIT License](https://github.com/mailru/easyjson/blob/master/LICENSE)
- github.com/mattn/go-colorable [MIT License](https://github.com/mattn/go-colorable/blob/master/LICENSE)
- github.com/mattn/go-ieproxy [MIT License](https://github.com/mattn/go-ieproxy/blob/master/LICENSE)
- github.com/mattn/go-isatty [MIT License](https://github.com/mattn/go-isatty/blob/master/LICENSE)
- github.com/matttproud/golang_protobuf_extensions [Apache License 2.0](https://github.com/matttproud/golang_protobuf_extensions/blob/master/LICENSE)
- github.com/mdlayher/apcupsd [MIT License](https://github.com/mdlayher/apcupsd/blob/master/LICENSE.md)
@ -160,6 +168,7 @@ following works:
- github.com/openzipkin/zipkin-go-opentracing [MIT License](https://github.com/openzipkin/zipkin-go-opentracing/blob/master/LICENSE)
- github.com/philhofer/fwd [MIT License](https://github.com/philhofer/fwd/blob/master/LICENSE.md)
- github.com/pierrec/lz4 [BSD 3-Clause "New" or "Revised" License](https://github.com/pierrec/lz4/blob/master/LICENSE)
- github.com/pkg/browser [BSD 2-Clause "Simplified" License](https://github.com/pkg/browser/blob/master/LICENSE)
- github.com/pkg/errors [BSD 2-Clause "Simplified" License](https://github.com/pkg/errors/blob/master/LICENSE)
- github.com/pmezard/go-difflib [BSD 3-Clause Clear License](https://github.com/pmezard/go-difflib/blob/master/LICENSE)
- github.com/prometheus/client_golang [Apache License 2.0](https://github.com/prometheus/client_golang/blob/master/LICENSE)
@ -168,6 +177,7 @@ following works:
- github.com/prometheus/procfs [Apache License 2.0](https://github.com/prometheus/procfs/blob/master/LICENSE)
- github.com/prometheus/prometheus [Apache License 2.0](https://github.com/prometheus/prometheus/blob/master/LICENSE)
- github.com/rcrowley/go-metrics [MIT License](https://github.com/rcrowley/go-metrics/blob/master/LICENSE)
- github.com/remyoudompheng/bigfft [BSD 3-Clause "New" or "Revised" License](https://github.com/remyoudompheng/bigfft/blob/master/LICENSE)
- github.com/riemann/riemann-go-client [MIT License](https://github.com/riemann/riemann-go-client/blob/master/LICENSE)
- github.com/safchain/ethtool [Apache License 2.0](https://github.com/safchain/ethtool/blob/master/LICENSE)
- github.com/samuel/go-zookeeper [BSD 3-Clause Clear License](https://github.com/samuel/go-zookeeper/blob/master/LICENSE)
@ -177,6 +187,7 @@ following works:
- github.com/signalfx/golib [Apache License 2.0](https://github.com/signalfx/golib/blob/master/LICENSE)
- github.com/signalfx/sapm-proto [Apache License 2.0](https://github.com/signalfx/sapm-proto/blob/master/LICENSE)
- github.com/sirupsen/logrus [MIT License](https://github.com/sirupsen/logrus/blob/master/LICENSE)
- github.com/snowflakedb/gosnowflake [Apache License 2.0](https://github.com/snowflakedb/gosnowflake/blob/master/LICENSE)
- github.com/streadway/amqp [BSD 2-Clause "Simplified" License](https://github.com/streadway/amqp/blob/master/LICENSE)
- github.com/stretchr/objx [MIT License](https://github.com/stretchr/objx/blob/master/LICENSE)
- github.com/stretchr/testify [custom -- permissive](https://github.com/stretchr/testify/blob/master/LICENSE)
@ -240,9 +251,10 @@ following works:
- k8s.io/klog [Apache License 2.0](https://github.com/kubernetes/client-go/blob/master/LICENSE)
- k8s.io/utils [Apache License 2.0](https://github.com/kubernetes/client-go/blob/master/LICENSE)
- modernc.org/libc [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/libc/-/blob/master/LICENSE)
- modernc.org/mathutil [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/mathutil/-/blob/master/LICENSE)
- modernc.org/memory [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/memory/-/blob/master/LICENSE)
- modernc.org/sqlite [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/sqlite/-/blob/master/LICENSE)
- sigs.k8s.io/structured-merge-diff [Apache License 2.0](https://github.com/kubernetes/client-go/blob/master/LICENSE)
- sigs.k8s.io/yaml [Apache License 2.0](https://github.com/kubernetes/client-go/blob/master/LICENSE)
## telegraf used and modified code from these projects
- github.com/DataDog/datadog-agent [Apache License 2.0](https://github.com/DataDog/datadog-agent/LICENSE)
- github.com/DataDog/datadog-agent [Apache License 2.0](https://github.com/DataDog/datadog-agent/LICENSE)

27
go.mod
View File

@ -27,25 +27,24 @@ require (
github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 // indirect
github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740
github.com/aws/aws-sdk-go v1.34.34
github.com/aws/aws-sdk-go-v2 v1.1.0
github.com/aws/aws-sdk-go-v2/config v1.1.0
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.1
github.com/aws/aws-sdk-go-v2 v1.3.2
github.com/aws/aws-sdk-go-v2/config v1.1.5
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.6
github.com/aws/aws-sdk-go-v2/service/ec2 v1.1.0
github.com/aws/smithy-go v1.0.0
github.com/aws/smithy-go v1.3.1
github.com/benbjohnson/clock v1.0.3
github.com/bitly/go-hostpool v0.1.0 // indirect
github.com/bmatcuk/doublestar/v3 v3.0.0
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869
github.com/caio/go-tdigest v3.1.0+incompatible
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20190531143454-82441e232cf6
github.com/containerd/containerd v1.4.1 // indirect
github.com/couchbase/go-couchbase v0.0.0-20180501122049-16db1f1fe037
github.com/couchbase/gomemcached v0.0.0-20180502221210-0da75df14530 // indirect
github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a // indirect
github.com/denisenkom/go-mssqldb v0.9.0
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dimchansky/utfbom v1.1.1
github.com/docker/docker v17.12.0-ce-rc1.0.20200916142827-bd33bbf0497b+incompatible
github.com/docker/docker v20.10.5+incompatible
github.com/eclipse/paho.mqtt.golang v1.3.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-logfmt/logfmt v0.5.0
@ -85,7 +84,7 @@ require (
github.com/karrick/godirwalk v1.16.1
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/lib/pq v1.3.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369
github.com/mdlayher/apcupsd v0.0.0-20200608131503-2bf01da7bf1b
github.com/microsoft/ApplicationInsights-Go v0.4.4
github.com/miekg/dns v1.1.31
@ -102,7 +101,7 @@ require (
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.15.0
github.com/prometheus/procfs v0.1.3
github.com/prometheus/procfs v0.2.0
github.com/prometheus/prometheus v1.8.2-0.20200911110723-e83ef207b6c2
github.com/riemann/riemann-go-client v0.5.0
github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664
@ -110,10 +109,12 @@ require (
github.com/shirou/gopsutil v3.21.3+incompatible
github.com/shopspring/decimal v0.0.0-20200105231215-408a2507e114 // indirect
github.com/signalfx/golib/v3 v3.3.0
github.com/sirupsen/logrus v1.6.0
github.com/sirupsen/logrus v1.7.0
github.com/snowflakedb/gosnowflake v1.5.0
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/testify v1.7.0
github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62
github.com/testcontainers/testcontainers-go v0.10.0
github.com/tidwall/gjson v1.6.0
github.com/tinylib/msgp v1.1.5
github.com/tklauser/go-sysconf v0.3.5 // indirect
@ -127,7 +128,7 @@ require (
github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 // indirect
go.starlark.net v0.0.0-20210406145628-7a1108eaa012
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa
@ -135,7 +136,7 @@ require (
golang.org/x/tools v0.1.0
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4
google.golang.org/api v0.29.0
google.golang.org/genproto v0.0.0-20200815001618-f69a88009b70
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a
google.golang.org/grpc v1.37.0
gopkg.in/djherbis/times.v1 v1.2.0
gopkg.in/fatih/pool.v2 v2.0.0 // indirect
@ -143,12 +144,12 @@ require (
gopkg.in/ldap.v3 v3.1.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
gopkg.in/olivere/elastic.v5 v5.0.70
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v2 v2.4.0
gotest.tools v2.2.0+incompatible
k8s.io/api v0.20.4
k8s.io/apimachinery v0.20.4
k8s.io/client-go v0.20.4
modernc.org/sqlite v1.7.4
modernc.org/sqlite v1.10.8
)
// replaced due to https://github.com/satori/go.uuid/issues/73

400
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -40,6 +40,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/sensu"
_ "github.com/influxdata/telegraf/plugins/outputs/signalfx"
_ "github.com/influxdata/telegraf/plugins/outputs/socket_writer"
_ "github.com/influxdata/telegraf/plugins/outputs/sql"
_ "github.com/influxdata/telegraf/plugins/outputs/stackdriver"
_ "github.com/influxdata/telegraf/plugins/outputs/sumologic"
_ "github.com/influxdata/telegraf/plugins/outputs/syslog"

View File

@ -0,0 +1,150 @@
# SQL Output Plugin
The SQL output plugin saves Telegraf metric data to an SQL database.
The plugin uses a simple, hard-coded database schema. There is a table
for each metric type and the table name is the metric name. There is a
column per field and a column per tag. There is an optional column for
the metric timestamp.
A row is written for every input metric. This means multiple metrics
are never merged into a single row, even if they have the same metric
name, tags, and timestamp.
The plugin uses Golang's generic "database/sql" interface and third
party drivers. See the driver-specific section below for a list of
supported drivers and details. Additional drivers may be added in
future Telegraf releases.
## Getting started
To use the plugin, set the driver setting to the driver name
appropriate for your database. Then set the data source name
(DSN). The format of the DSN varies by driver but often includes a
username, password, the database instance to use, and the hostname of
the database server. The user account must have privileges to insert
rows and create tables.
## Generated SQL
The plugin generates simple ANSI/ISO SQL that is likely to work on any
DBMS. It doesn't use language features that are specific to a
particular DBMS. If you want to use a feature that is specific to a
particular DBMS, you may be able to set it up manually outside of this
plugin or through the init_sql setting.
The insert statements generated by the plugin use placeholder
parameters. Most database drivers use question marks as placeholders
but postgres uses indexed dollar signs. The plugin chooses which
placeholder style to use depending on the driver selected.
## Advanced options
When the plugin first connects it runs SQL from the init_sql setting,
allowing you to perform custom initialization for the connection.
Before inserting a row, the plugin checks whether the table exists. If
it doesn't exist, the plugin creates the table. The existence check
and the table creation statements can be changed through template
settings. The template settings allows you to have the plugin create
customized tables or skip table creation entirely by setting the check
template to any query that executes without error, such as "select 1".
The name of the timestamp column is "timestamp" but it can be changed
with the timestamp\_column setting. The timestamp column can be
completely disabled by setting it to "".
By changing the table creation template, it's possible with some
databases to save a row insertion timestamp. You can add an additional
column with a default value to the template, like "CREATE TABLE
{TABLE}(insertion_timestamp TIMESTAMP DEFAULT CURRENT\_TIMESTAMP,
{COLUMNS})".
The mapping of metric types to sql column types can be customized
through the convert settings.
## Configuration
```
# Save metrics to an SQL Database
[[outputs.sql]]
## Database driver
## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres),
## sqlite (SQLite3), snowflake (snowflake.com)
# driver = ""
## Data source name
## The format of the data source name is different for each database driver.
## See the plugin readme for details.
# data_source_name = ""
## Timestamp column name
# timestamp_column = "timestamp"
## Table creation template
## Available template variables:
## {TABLE} - table name as a quoted identifier
## {TABLELITERAL} - table name as a quoted string literal
## {COLUMNS} - column definitions (list of quoted identifiers and types)
# table_template = "CREATE TABLE {TABLE}({COLUMNS})"
## Table existence check template
## Available template variables:
## {TABLE} - tablename as a quoted identifier
# table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"
## Initialization SQL
# init_sql = ""
## Metric type to SQL type conversion
#[outputs.sql.convert]
# integer = "INT"
# real = "DOUBLE"
# text = "TEXT"
# timestamp = "TIMESTAMP"
# defaultvalue = "TEXT"
# unsigned = "UNSIGNED"
```
## Driver-specific information
### go-sql-driver/mysql
MySQL default quoting differs from standard ANSI/ISO SQL quoting. You
must use MySQL's ANSI\_QUOTES mode with this plugin. You can enable
this mode by using the setting `init_sql = "SET
sql_mode='ANSI_QUOTES';"` or through a command-line option when
running MySQL. See MySQL's docs for [details on
ANSI\_QUOTES](https://dev.mysql.com/doc/refman/8.0/en/sql-mode.html#sqlmode_ansi_quotes)
and [how to set the SQL
mode](https://dev.mysql.com/doc/refman/8.0/en/sql-mode.html#sql-mode-setting).
You can use a DSN of the format
"username:password@tcp(host:port)/dbname". See the [driver
docs](https://github.com/go-sql-driver/mysql) for details.
### jackc/pgx
You can use a DSN of the format
"postgres://username:password@host:port/dbname". See the [driver
docs](https://github.com/jackc/pgx) for more details.
### modernc.org/sqlite
This driver is not available on all operating systems and
architectures. It is only included in Linux builds on amd64, 386,
arm64, arm, and Darwin on amd64. It is not available for Windows,
FreeBSD, and other Linux and Darwin platforms.
The DSN is a filename or url with scheme "file:". See the [driver
docs](https://modernc.org/sqlite) for details.
### denisenkom/go-mssqldb
Telegraf doesn't have unit tests for go-mssqldb so it should be
treated as experimental.
### snowflakedb/gosnowflake
Telegraf doesn't have unit tests for gosnowflake so it should be
treated as experimental.

277
plugins/outputs/sql/sql.go Normal file
View File

@ -0,0 +1,277 @@
package sql
import (
gosql "database/sql"
"fmt"
"strings"
//Register sql drivers
_ "github.com/denisenkom/go-mssqldb" // mssql (sql server)
_ "github.com/go-sql-driver/mysql" // mysql
_ "github.com/jackc/pgx/v4/stdlib" // pgx (postgres)
_ "github.com/snowflakedb/gosnowflake" // snowflake
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
)
type ConvertStruct struct {
Integer string
Real string
Text string
Timestamp string
Defaultvalue string
Unsigned string
}
type SQL struct {
Driver string
DataSourceName string
TimestampColumn string
TableTemplate string
TableExistsTemplate string
InitSQL string `toml:"init_sql"`
Convert ConvertStruct
db *gosql.DB
Log telegraf.Logger `toml:"-"`
tables map[string]bool
}
func (p *SQL) Connect() error {
db, err := gosql.Open(p.Driver, p.DataSourceName)
if err != nil {
return err
}
err = db.Ping()
if err != nil {
return err
}
if p.InitSQL != "" {
_, err = db.Exec(p.InitSQL)
if err != nil {
return err
}
}
p.db = db
p.tables = make(map[string]bool)
return nil
}
func (p *SQL) Close() error {
return p.db.Close()
}
// Quote an identifier (table or column name)
func quoteIdent(name string) string {
return `"` + strings.Replace(sanitizeQuoted(name), `"`, `""`, -1) + `"`
}
// Quote a string literal
func quoteStr(name string) string {
return "'" + strings.Replace(name, "'", "''", -1) + "'"
}
func sanitizeQuoted(in string) string {
// https://dev.mysql.com/doc/refman/8.0/en/identifiers.html
// https://www.postgresql.org/docs/13/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
// Whitelist allowed characters
return strings.Map(func(r rune) rune {
switch {
case r >= '\u0001' && r <= '\uFFFF':
return r
default:
return '_'
}
}, in)
}
func (p *SQL) deriveDatatype(value interface{}) string {
var datatype string
switch value.(type) {
case int64:
datatype = p.Convert.Integer
case uint64:
datatype = fmt.Sprintf("%s %s", p.Convert.Integer, p.Convert.Unsigned)
case float64:
datatype = p.Convert.Real
case string:
datatype = p.Convert.Text
default:
datatype = p.Convert.Defaultvalue
p.Log.Errorf("Unknown datatype: '%T' %v", value, value)
}
return datatype
}
var sampleConfig = `
## Database driver
## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres),
## sqlite (SQLite3), snowflake (snowflake.com)
# driver = ""
## Data source name
## The format of the data source name is different for each database driver.
## See the plugin readme for details.
# data_source_name = ""
## Timestamp column name
# timestamp_column = "timestamp"
## Table creation template
## Available template variables:
## {TABLE} - table name as a quoted identifier
## {TABLELITERAL} - table name as a quoted string literal
## {COLUMNS} - column definitions (list of quoted identifiers and types)
# table_template = "CREATE TABLE {TABLE}({COLUMNS})"
## Table existence check template
## Available template variables:
## {TABLE} - tablename as a quoted identifier
# table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"
## Initialization SQL
# init_sql = ""
## Metric type to SQL type conversion
#[outputs.sql.convert]
# integer = "INT"
# real = "DOUBLE"
# text = "TEXT"
# timestamp = "TIMESTAMP"
# defaultvalue = "TEXT"
# unsigned = "UNSIGNED"
`
func (p *SQL) SampleConfig() string { return sampleConfig }
func (p *SQL) Description() string { return "Send metrics to SQL Database" }
func (p *SQL) generateCreateTable(metric telegraf.Metric) string {
var columns []string
// ## {KEY_COLUMNS} is a comma-separated list of key columns (timestamp and tags)
//var pk []string
if p.TimestampColumn != "" {
//pk = append(pk, quoteIdent(p.TimestampColumn))
columns = append(columns, fmt.Sprintf("%s %s", quoteIdent(p.TimestampColumn), p.Convert.Timestamp))
}
for _, tag := range metric.TagList() {
//pk = append(pk, quoteIdent(tag.Key))
columns = append(columns, fmt.Sprintf("%s %s", quoteIdent(tag.Key), p.Convert.Text))
}
var datatype string
for _, field := range metric.FieldList() {
datatype = p.deriveDatatype(field.Value)
columns = append(columns, fmt.Sprintf("%s %s", quoteIdent(field.Key), datatype))
}
query := p.TableTemplate
query = strings.Replace(query, "{TABLE}", quoteIdent(metric.Name()), -1)
query = strings.Replace(query, "{TABLELITERAL}", quoteStr(metric.Name()), -1)
query = strings.Replace(query, "{COLUMNS}", strings.Join(columns, ","), -1)
//query = strings.Replace(query, "{KEY_COLUMNS}", strings.Join(pk, ","), -1)
return query
}
func (p *SQL) generateInsert(tablename string, columns []string) string {
var placeholders, quotedColumns []string
for _, column := range columns {
quotedColumns = append(quotedColumns, quoteIdent(column))
}
if p.Driver == "pgx" {
// Postgres uses $1 $2 $3 as placeholders
for i := 0; i < len(columns); i++ {
placeholders = append(placeholders, fmt.Sprintf("$%d", i+1))
}
} else {
// Everything else uses ? ? ? as placeholders
for i := 0; i < len(columns); i++ {
placeholders = append(placeholders, "?")
}
}
return fmt.Sprintf("INSERT INTO %s(%s) VALUES(%s)",
quoteIdent(tablename),
strings.Join(quotedColumns, ","),
strings.Join(placeholders, ","))
}
func (p *SQL) tableExists(tableName string) bool {
stmt := strings.Replace(p.TableExistsTemplate, "{TABLE}", quoteIdent(tableName), -1)
_, err := p.db.Exec(stmt)
return err == nil
}
func (p *SQL) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
tablename := metric.Name()
// create table if needed
if !p.tables[tablename] && !p.tableExists(tablename) {
createStmt := p.generateCreateTable(metric)
_, err := p.db.Exec(createStmt)
if err != nil {
return err
}
p.tables[tablename] = true
}
var columns []string
var values []interface{}
if p.TimestampColumn != "" {
columns = append(columns, p.TimestampColumn)
values = append(values, metric.Time())
}
for column, value := range metric.Tags() {
columns = append(columns, column)
values = append(values, value)
}
for column, value := range metric.Fields() {
columns = append(columns, column)
values = append(values, value)
}
sql := p.generateInsert(tablename, columns)
_, err := p.db.Exec(sql, values...)
if err != nil {
// check if insert error was caused by column mismatch
p.Log.Errorf("Error during insert: %v, %v", err, sql)
return err
}
}
return nil
}
func init() {
outputs.Add("sql", func() telegraf.Output { return newSQL() })
}
func newSQL() *SQL {
return &SQL{
TableTemplate: "CREATE TABLE {TABLE}({COLUMNS})",
TableExistsTemplate: "SELECT 1 FROM {TABLE} LIMIT 1",
TimestampColumn: "timestamp",
Convert: ConvertStruct{
Integer: "INT",
Real: "DOUBLE",
Text: "TEXT",
Timestamp: "TIMESTAMP",
Defaultvalue: "TEXT",
Unsigned: "UNSIGNED",
},
}
}

View File

@ -0,0 +1,329 @@
package sql
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
func TestSqlQuote(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
}
func TestSqlCreateStatement(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
}
func TestSqlInsertStatement(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
}
func pwgen(n int) string {
charset := []byte("abcdedfghijklmnopqrstABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
nchars := len(charset)
buffer := make([]byte, n)
for i := range buffer {
buffer[i] = charset[rand.Intn(nchars)]
}
return string(buffer)
}
func stableMetric(
name string,
tags []telegraf.Tag,
fields []telegraf.Field,
tm time.Time,
tp ...telegraf.ValueType,
) telegraf.Metric {
// We want to compare the output of this plugin with expected
// output. Maps don't preserve order so comparison fails. There's
// no metric constructor that takes a slice of tag and slice of
// field, just the one that takes maps.
//
// To preserve order, construct the metric without tags and fields
// and then add them using AddTag and AddField. Those are stable.
m := metric.New(name, map[string]string{}, map[string]interface{}{}, tm, tp...)
for _, tag := range tags {
m.AddTag(tag.Key, tag.Value)
}
for _, field := range fields {
m.AddField(field.Key, field.Value)
}
return m
}
var (
// 2021-05-17T22:04:45+00:00
// or 2021-05-17T16:04:45-06:00
ts = time.Unix(1621289085, 0).UTC()
testMetrics = []telegraf.Metric{
stableMetric(
"metric_one",
[]telegraf.Tag{
{
Key: "tag_one",
Value: "tag1",
},
{
Key: "tag_two",
Value: "tag2",
},
},
[]telegraf.Field{
{
Key: "int64_one",
Value: int64(1234),
},
{
Key: "int64_two",
Value: int64(2345),
},
},
ts,
),
stableMetric(
"metric_two",
[]telegraf.Tag{
{
Key: "tag_three",
Value: "tag3",
},
},
[]telegraf.Field{
{
Key: "string_one",
Value: "string1",
},
},
ts,
),
stableMetric( //test spaces in metric, tag, and field names
"metric three",
[]telegraf.Tag{
{
Key: "tag four",
Value: "tag4",
},
},
[]telegraf.Field{
{
Key: "string two",
Value: "string2",
},
},
ts,
),
}
)
func TestMysqlIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
initdb, err := filepath.Abs("testdata/mariadb/initdb")
require.NoError(t, err)
// initdb/script.sql creates this database
const dbname = "foo"
// The mariadb image lets you set the root password through an env
// var. We'll use root to insert and query test data.
const username = "root"
password := pwgen(32)
outDir, err := ioutil.TempDir("", "tg-mysql-*")
require.NoError(t, err)
defer os.RemoveAll(outDir)
ctx := context.Background()
req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "mariadb",
Env: map[string]string{
"MARIADB_ROOT_PASSWORD": password,
},
BindMounts: map[string]string{
initdb: "/docker-entrypoint-initdb.d",
outDir: "/out",
},
ExposedPorts: []string{"3306/tcp"},
WaitingFor: wait.ForListeningPort("3306/tcp"),
},
Started: true,
}
mariadbContainer, err := testcontainers.GenericContainer(ctx, req)
require.NoError(t, err, "starting container failed")
defer func() {
require.NoError(t, mariadbContainer.Terminate(ctx), "terminating container failed")
}()
// Get the connection details from the container
host, err := mariadbContainer.Host(ctx)
require.NoError(t, err, "getting container host address failed")
require.NotEmpty(t, host)
natPort, err := mariadbContainer.MappedPort(ctx, "3306/tcp")
require.NoError(t, err, "getting container host port failed")
port := natPort.Port()
require.NotEmpty(t, port)
//use the plugin to write to the database
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
username, password, host, port, dbname,
)
p := newSQL()
p.Log = testutil.Logger{}
p.Driver = "mysql"
p.DataSourceName = address
//p.Convert.Timestamp = "TEXT" //disable mysql default current_timestamp()
p.InitSQL = "SET sql_mode='ANSI_QUOTES';"
require.NoError(t, p.Connect())
require.NoError(t, p.Write(
testMetrics,
))
//dump the database
var rc int
rc, err = mariadbContainer.Exec(ctx, []string{
"bash",
"-c",
"mariadb-dump --user=" + username +
" --password=" + password +
" --compact --skip-opt " +
dbname +
" > /out/dump",
})
require.NoError(t, err)
require.Equal(t, 0, rc)
dumpfile := filepath.Join(outDir, "dump")
require.FileExists(t, dumpfile)
//compare the dump to what we expected
expected, err := ioutil.ReadFile("testdata/mariadb/expected.sql")
require.NoError(t, err)
actual, err := ioutil.ReadFile(dumpfile)
require.NoError(t, err)
require.Equal(t, string(expected), string(actual))
}
func TestPostgresIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
initdb, err := filepath.Abs("testdata/postgres/initdb")
require.NoError(t, err)
// initdb/init.sql creates this database
const dbname = "foo"
// default username for postgres is postgres
const username = "postgres"
password := pwgen(32)
outDir, err := ioutil.TempDir("", "tg-postgres-*")
require.NoError(t, err)
defer os.RemoveAll(outDir)
ctx := context.Background()
req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "postgres",
Env: map[string]string{
"POSTGRES_PASSWORD": password,
},
BindMounts: map[string]string{
initdb: "/docker-entrypoint-initdb.d",
outDir: "/out",
},
ExposedPorts: []string{"5432/tcp"},
WaitingFor: wait.ForListeningPort("5432/tcp"),
},
Started: true,
}
cont, err := testcontainers.GenericContainer(ctx, req)
require.NoError(t, err, "starting container failed")
defer func() {
require.NoError(t, cont.Terminate(ctx), "terminating container failed")
}()
// Get the connection details from the container
host, err := cont.Host(ctx)
require.NoError(t, err, "getting container host address failed")
require.NotEmpty(t, host)
natPort, err := cont.MappedPort(ctx, "5432/tcp")
require.NoError(t, err, "getting container host port failed")
port := natPort.Port()
require.NotEmpty(t, port)
//use the plugin to write to the database
// host, port, username, password, dbname
address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
username, password, host, port, dbname,
)
p := newSQL()
p.Log = testutil.Logger{}
p.Driver = "pgx"
p.DataSourceName = address
require.NoError(t, p.Connect())
require.NoError(t, p.Write(
testMetrics,
))
//dump the database
//psql -u postgres
var rc int
rc, err = cont.Exec(ctx, []string{
"bash",
"-c",
"pg_dump" +
" --username=" + username +
//" --password=" + password +
// " --compact --skip-opt " +
" --no-comments" +
//" --data-only" +
" " + dbname +
// pg_dump's output has comments that include build info
// of postgres and pg_dump. The build info changes with
// each release. To prevent these changes from causing the
// test to fail, we strip out comments. Also strip out
// blank lines.
"|grep -E -v '(^--|^$)'" +
" > /out/dump 2>&1",
})
require.NoError(t, err)
require.Equal(t, 0, rc)
dumpfile := filepath.Join(outDir, "dump")
require.FileExists(t, dumpfile)
//compare the dump to what we expected
expected, err := ioutil.ReadFile("testdata/postgres/expected.sql")
require.NoError(t, err)
actual, err := ioutil.ReadFile(dumpfile)
require.NoError(t, err)
require.Equal(t, string(expected), string(actual))
}

View File

@ -0,0 +1,15 @@
// +build !mips
// +build !mipsle
// +build !s390x
// +build !ppc64le
// +build !windows
// +build !freebsd
package sql
// The modernc.org sqlite driver isn't supported on all
// platforms. Register it with build constraints to prevent build
// failures on unsupported platforms.
import (
_ "modernc.org/sqlite" // Register sqlite sql driver
)

View File

@ -0,0 +1,134 @@
// +build linux
// +build 386 amd64 arm arm64
package sql
import (
gosql "database/sql"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestSqlite(t *testing.T) {
outDir, err := ioutil.TempDir("", "tg-sqlite-*")
require.NoError(t, err)
defer os.RemoveAll(outDir)
dbfile := filepath.Join(outDir, "db")
// Use the plugin to write to the database address :=
// fmt.Sprintf("file:%v", dbfile)
address := dbfile // accepts a path or a file: URI
p := newSQL()
p.Log = testutil.Logger{}
p.Driver = "sqlite"
p.DataSourceName = address
require.NoError(t, p.Connect())
require.NoError(t, p.Write(
testMetrics,
))
//read directly from the database
db, err := gosql.Open("sqlite", address)
require.NoError(t, err)
defer db.Close()
var countMetricOne int
require.NoError(t, db.QueryRow("select count(*) from metric_one").Scan(&countMetricOne))
require.Equal(t, 1, countMetricOne)
var countMetricTwo int
require.NoError(t, db.QueryRow("select count(*) from metric_one").Scan(&countMetricTwo))
require.Equal(t, 1, countMetricTwo)
var rows *gosql.Rows
// Check that tables were created as expected
rows, err = db.Query("select sql from sqlite_master")
require.NoError(t, err)
var sql string
require.True(t, rows.Next())
require.NoError(t, rows.Scan(&sql))
require.Equal(t,
`CREATE TABLE "metric_one"("timestamp" TIMESTAMP,"tag_one" TEXT,"tag_two" TEXT,"int64_one" INT,"int64_two" INT)`,
sql,
)
require.True(t, rows.Next())
require.NoError(t, rows.Scan(&sql))
require.Equal(t,
`CREATE TABLE "metric_two"("timestamp" TIMESTAMP,"tag_three" TEXT,"string_one" TEXT)`,
sql,
)
require.True(t, rows.Next())
require.NoError(t, rows.Scan(&sql))
require.Equal(t,
`CREATE TABLE "metric three"("timestamp" TIMESTAMP,"tag four" TEXT,"string two" TEXT)`,
sql,
)
require.False(t, rows.Next())
require.NoError(t, rows.Close()) //nolint:sqlclosecheck
// sqlite stores dates as strings. They may be in the local
// timezone. The test needs to parse them back into a time.Time to
// check them.
//timeLayout := "2006-01-02 15:04:05 -0700 MST"
timeLayout := "2006-01-02T15:04:05Z"
var actualTime time.Time
// Check contents of tables
rows, err = db.Query("select timestamp, tag_one, tag_two, int64_one, int64_two from metric_one")
require.NoError(t, err)
require.True(t, rows.Next())
var (
a string
b, c string
d, e int64
)
require.NoError(t, rows.Scan(&a, &b, &c, &d, &e))
actualTime, err = time.Parse(timeLayout, a)
require.NoError(t, err)
require.Equal(t, ts, actualTime.UTC())
require.Equal(t, "tag1", b)
require.Equal(t, "tag2", c)
require.Equal(t, int64(1234), d)
require.Equal(t, int64(2345), e)
require.False(t, rows.Next())
require.NoError(t, rows.Close()) //nolint:sqlclosecheck
rows, err = db.Query("select timestamp, tag_three, string_one from metric_two")
require.NoError(t, err)
require.True(t, rows.Next())
var (
f, g, h string
)
require.NoError(t, rows.Scan(&f, &g, &h))
actualTime, err = time.Parse(timeLayout, f)
require.NoError(t, err)
require.Equal(t, ts, actualTime.UTC())
require.Equal(t, "tag3", g)
require.Equal(t, "string1", h)
require.False(t, rows.Next())
require.NoError(t, rows.Close()) //nolint:sqlclosecheck
rows, err = db.Query(`select timestamp, "tag four", "string two" from "metric three"`)
require.NoError(t, err)
require.True(t, rows.Next())
var (
i, j, k string
)
require.NoError(t, rows.Scan(&i, &j, &k))
actualTime, err = time.Parse(timeLayout, i)
require.NoError(t, err)
require.Equal(t, ts, actualTime.UTC())
require.Equal(t, "tag4", j)
require.Equal(t, "string2", k)
require.False(t, rows.Next())
require.NoError(t, rows.Close()) //nolint:sqlclosecheck
}

View File

@ -0,0 +1,36 @@
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `bar` (
`baz` int(11) DEFAULT NULL
);
/*!40101 SET character_set_client = @saved_cs_client */;
INSERT INTO `bar` VALUES (1);
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `metric three` (
`timestamp` timestamp NOT NULL DEFAULT current_timestamp(),
`tag four` text DEFAULT NULL,
`string two` text DEFAULT NULL
);
/*!40101 SET character_set_client = @saved_cs_client */;
INSERT INTO `metric three` VALUES ('2021-05-17 22:04:45','tag4','string2');
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `metric_one` (
`timestamp` timestamp NOT NULL DEFAULT current_timestamp(),
`tag_one` text DEFAULT NULL,
`tag_two` text DEFAULT NULL,
`int64_one` int(11) DEFAULT NULL,
`int64_two` int(11) DEFAULT NULL
);
/*!40101 SET character_set_client = @saved_cs_client */;
INSERT INTO `metric_one` VALUES ('2021-05-17 22:04:45','tag1','tag2',1234,2345);
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `metric_two` (
`timestamp` timestamp NOT NULL DEFAULT current_timestamp(),
`tag_three` text DEFAULT NULL,
`string_one` text DEFAULT NULL
);
/*!40101 SET character_set_client = @saved_cs_client */;
INSERT INTO `metric_two` VALUES ('2021-05-17 22:04:45','tag3','string1');

View File

@ -0,0 +1,4 @@
create database foo;
use foo;
create table bar (baz int);
insert into bar (baz) values (1);

View File

@ -0,0 +1,41 @@
SET statement_timeout = 0;
SET lock_timeout = 0;
SET idle_in_transaction_session_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;
SET default_tablespace = '';
SET default_table_access_method = heap;
CREATE TABLE public."metric three" (
"timestamp" timestamp without time zone,
"tag four" text,
"string two" text
);
ALTER TABLE public."metric three" OWNER TO postgres;
CREATE TABLE public.metric_one (
"timestamp" timestamp without time zone,
tag_one text,
tag_two text,
int64_one integer,
int64_two integer
);
ALTER TABLE public.metric_one OWNER TO postgres;
CREATE TABLE public.metric_two (
"timestamp" timestamp without time zone,
tag_three text,
string_one text
);
ALTER TABLE public.metric_two OWNER TO postgres;
COPY public."metric three" ("timestamp", "tag four", "string two") FROM stdin;
2021-05-17 22:04:45 tag4 string2
\.
COPY public.metric_one ("timestamp", tag_one, tag_two, int64_one, int64_two) FROM stdin;
2021-05-17 22:04:45 tag1 tag2 1234 2345
\.
COPY public.metric_two ("timestamp", tag_three, string_one) FROM stdin;
2021-05-17 22:04:45 tag3 string1
\.

View File

@ -0,0 +1,2 @@
create database foo;