feat: add mongodb output plugin (#9923)

This commit is contained in:
bustedware 2021-10-28 16:42:49 -04:00 committed by GitHub
parent 343e846480
commit 7d6672c53a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 872 additions and 15 deletions

View File

@ -32,7 +32,6 @@ following works:
- github.com/aristanetworks/glog [Apache License 2.0](https://github.com/aristanetworks/glog/blob/master/LICENSE)
- github.com/aristanetworks/goarista [Apache License 2.0](https://github.com/aristanetworks/goarista/blob/master/COPYING)
- github.com/armon/go-metrics [MIT License](https://github.com/armon/go-metrics/blob/master/LICENSE)
- github.com/aws/aws-sdk-go [Apache License 2.0](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2 [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/config [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/config/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/credentials [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/credentials/LICENSE.txt)

3
go.mod
View File

@ -273,7 +273,7 @@ require (
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e // indirect
go.etcd.io/etcd/api/v3 v3.5.0 // indirect
go.mongodb.org/mongo-driver v1.5.3
go.mongodb.org/mongo-driver v1.7.3
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/model v0.35.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.23.0
@ -334,7 +334,6 @@ require (
)
require (
github.com/aws/aws-sdk-go v1.38.3 // indirect
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.2.0 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.4.0 // indirect
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f // indirect

5
go.sum
View File

@ -299,7 +299,6 @@ github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpi
github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
github.com/aws/aws-sdk-go v1.38.3 h1:QCL/le04oAz2jELMRSuJVjGT7H+4hhoQc66eMPCfU/k=
github.com/aws/aws-sdk-go v1.38.3/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/aws/aws-sdk-go-v2 v1.1.0/go.mod h1:smfAbmpW+tcRVuNUjo3MOArSZmW72t62rkCzc2i0TWM=
@ -2134,8 +2133,8 @@ go.mongodb.org/mongo-driver v1.4.4/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4S
go.mongodb.org/mongo-driver v1.4.6/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc=
go.mongodb.org/mongo-driver v1.5.1/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw=
go.mongodb.org/mongo-driver v1.5.2/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw=
go.mongodb.org/mongo-driver v1.5.3 h1:wWbFB6zaGHpzguF3f7tW94sVE8sFl3lHx8OZx/4OuFI=
go.mongodb.org/mongo-driver v1.5.3/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw=
go.mongodb.org/mongo-driver v1.7.3 h1:G4l/eYY9VrQAK/AUgkV0koQKzQnyddnWxrd/Etf0jIs=
go.mongodb.org/mongo-driver v1.7.3/go.mod h1:NqaYOwnXWr5Pm7AOpO5QFxKJ503nbMse/R79oO62zWg=
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk=
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=

View File

@ -4,9 +4,10 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/influxdata/telegraf/internal/choice"
"os"
"strings"
"github.com/influxdata/telegraf/internal/choice"
)
// ClientConfig represents the standard client TLS config.
@ -14,6 +15,7 @@ type ClientConfig struct {
TLSCA string `toml:"tls_ca"`
TLSCert string `toml:"tls_cert"`
TLSKey string `toml:"tls_key"`
TLSKeyPwd string `toml:"tls_key_pwd"`
InsecureSkipVerify bool `toml:"insecure_skip_verify"`
ServerName string `toml:"tls_server_name"`
@ -27,6 +29,7 @@ type ClientConfig struct {
type ServerConfig struct {
TLSCert string `toml:"tls_cert"`
TLSKey string `toml:"tls_key"`
TLSKeyPwd string `toml:"tls_key_pwd"`
TLSAllowedCACerts []string `toml:"tls_allowed_cacerts"`
TLSCipherSuites []string `toml:"tls_cipher_suites"`
TLSMinVersion string `toml:"tls_min_version"`

View File

@ -33,6 +33,15 @@ func TestClientConfig(t *testing.T) {
TLSKey: pki.ClientKeyPath(),
},
},
{
name: "success with tls key password set",
client: tls.ClientConfig{
TLSCA: pki.CACertPath(),
TLSCert: pki.ClientCertPath(),
TLSKey: pki.ClientKeyPath(),
TLSKeyPwd: "",
},
},
{
name: "invalid ca",
client: tls.ClientConfig{
@ -137,6 +146,18 @@ func TestServerConfig(t *testing.T) {
TLSMaxVersion: pki.TLSMaxVersion(),
},
},
{
name: "success with tls key password set",
server: tls.ServerConfig{
TLSCert: pki.ServerCertPath(),
TLSKey: pki.ServerKeyPath(),
TLSKeyPwd: "",
TLSAllowedCACerts: []string{pki.CACertPath()},
TLSCipherSuites: []string{pki.CipherSuite()},
TLSMinVersion: pki.TLSMinVersion(),
TLSMaxVersion: pki.TLSMaxVersion(),
},
},
{
name: "missing tls cipher suites is okay",
server: tls.ServerConfig{

View File

@ -31,6 +31,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/librato"
_ "github.com/influxdata/telegraf/plugins/outputs/logzio"
_ "github.com/influxdata/telegraf/plugins/outputs/loki"
_ "github.com/influxdata/telegraf/plugins/outputs/mongodb"
_ "github.com/influxdata/telegraf/plugins/outputs/mqtt"
_ "github.com/influxdata/telegraf/plugins/outputs/nats"
_ "github.com/influxdata/telegraf/plugins/outputs/newrelic"

View File

@ -0,0 +1,43 @@
# MongoDB Output Plugin
This plugin sends metrics to MongoDB and automatically creates the collections as time series collections when they don't already exist.
**Please note:** Requires MongoDB 5.0+ for Time Series Collections
### Configuration:
```toml
# A plugin that can transmit logs to mongodb
[[outputs.mongodb]]
# connection string examples for mongodb
dsn = "mongodb://localhost:27017"
# dsn = "mongodb://mongod1:27017,mongod2:27017,mongod3:27017/admin&replicaSet=myReplSet&w=1"
# overrides serverSelectionTimeoutMS in dsn if set
# timeout = "30s"
# default authentication, optional
# authentication = "NONE"
# for SCRAM-SHA-256 authentication
# authentication = "SCRAM"
# username = "root"
# password = "***"
# for x509 certificate authentication
# authentication = "X509"
# tls_ca = "ca.pem"
# tls_key = "client.pem"
# # tls_key_pwd = "changeme" # required for encrypted tls_key
# insecure_skip_verify = false
# database to store measurements and time series collections
# database = "telegraf"
# granularity can be seconds, minutes, or hours.
# configuring this value will be based on your input collection frequency.
# see https://docs.mongodb.com/manual/core/timeseries-collections/#create-a-time-series-collection
# granularity = "seconds"
# optionally set a TTL to automatically expire documents from the measurement collections.
# ttl = "360h"
```

View File

@ -0,0 +1,22 @@
FROM docker.io/library/mongo:latest
RUN apt-get update && \
apt-get install -y openssh-client
WORKDIR /var/log
RUN mkdir -p mongodb_noauth/ mongodb_scram/ mongodb_x509/ mongodb_x509_expire/
WORKDIR /opt
COPY ./testutil/pki/tls-certs.sh .
RUN mkdir -p data/noauth data/scram data/x509 data/x509_expire
RUN /opt/tls-certs.sh
COPY ./plugins/outputs/mongodb/dev/mongodb.sh .
RUN chmod +x mongodb.sh
EXPOSE 27017
EXPOSE 27018
EXPOSE 27019
EXPOSE 27020
CMD ./mongodb.sh

View File

@ -0,0 +1,34 @@
#!/bin/bash
# no auth
mongod --dbpath data/noauth --fork --logpath /var/log/mongodb_noauth/mongod.log --bind_ip 0.0.0.0 --port 27017
# scram auth
mongod --dbpath data/scram --fork --logpath /var/log/mongodb_scram/mongod.log --bind_ip 0.0.0.0 --port 27018
mongo localhost:27018/admin --eval "db.createUser({user:\"root\", pwd:\"changeme\", roles:[{role:\"root\",db:\"admin\"}]})"
mongo localhost:27018/admin --eval "db.shutdownServer()"
mongod --dbpath data/scram --fork --logpath /var/log/mongodb_scram/mongod.log --auth --setParameter authenticationMechanisms=SCRAM-SHA-256 --bind_ip 0.0.0.0 --port 27018
# get client certificate subject for creating x509 authenticating user
dn=$(openssl x509 -in ./private/client.pem -noout -subject -nameopt RFC2253 | sed 's/subject=//g')
# x509 auth
mongod --dbpath data/x509 --fork --logpath /var/log/mongodb_x509/mongod.log --bind_ip 0.0.0.0 --port 27019
mongo localhost:27019/admin --eval "db.getSiblingDB(\"\$external\").runCommand({createUser:\"$dn\",roles:[{role:\"root\",db:\"admin\"}]})"
mongo localhost:27019/admin --eval "db.shutdownServer()"
mongod --dbpath data/x509 --fork --logpath /var/log/mongodb_x509/mongod.log --auth --setParameter authenticationMechanisms=MONGODB-X509 --tlsMode preferTLS --tlsCAFile certs/cacert.pem --tlsCertificateKeyFile private/server.pem --bind_ip 0.0.0.0 --port 27019
# x509 auth short expirey
# mongodb will not start with an expired certificate. service must be started before certificate expires. tests should be run after certificate expiry
mongod --dbpath data/x509_expire --fork --logpath /var/log/mongodb_x509_expire/mongod.log --bind_ip 0.0.0.0 --port 27020
mongo localhost:27020/admin --eval "db.getSiblingDB(\"\$external\").runCommand({createUser:\"$dn\",roles:[{role:\"root\",db:\"admin\"}]})"
mongo localhost:27020/admin --eval "db.shutdownServer()"
mongod --dbpath data/x509_expire --fork --logpath /var/log/mongodb_x509_expire/mongod.log --auth --setParameter authenticationMechanisms=MONGODB-X509 --tlsMode preferTLS --tlsCAFile certs/cacert.pem --tlsCertificateKeyFile private/serverexp.pem --bind_ip 0.0.0.0 --port 27020
# note about key size and mongodb
# x509 must be 2048 bytes or stronger in order for mongodb to start. otherwise you will receive similar error below
# {"keyFile":"/opt/private/server.pem","error":"error:140AB18F:SSL routines:SSL_CTX_use_certificate:ee key too small"}
# copy key files to /opt/export. docker volume should point /opt/export to outputs/mongodb/dev in order to run non short x509 tests
cp /opt/certs/cacert.pem /opt/private/client.pem /opt/private/clientenc.pem /opt/export
while true; do sleep 1; done # leave container running.

View File

@ -0,0 +1,253 @@
package mongodb
import (
"context"
"fmt"
"net/url"
"strconv"
"strings"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
)
func (s *MongoDB) getCollections(ctx context.Context) error {
s.collections = map[string]bson.M{}
collections, err := s.client.Database(s.MetricDatabase).ListCollections(ctx, bson.M{})
if err != nil {
return fmt.Errorf("unable to execute ListCollections: %v", err)
}
for collections.Next(ctx) {
var collection bson.M
if err := collections.Decode(&collection); err != nil {
return fmt.Errorf("unable to decode ListCollections: %v", err)
}
name, ok := collection["name"].(string)
if !ok {
return fmt.Errorf("non-string name in %v", collection)
}
s.collections[name] = collection
}
return nil
}
func (s *MongoDB) insertDocument(ctx context.Context, databaseCollection string, bdoc bson.D) error {
collection := s.client.Database(s.MetricDatabase).Collection(databaseCollection)
_, err := collection.InsertOne(ctx, &bdoc)
return err
}
type MongoDB struct {
Dsn string `toml:"dsn"`
AuthenticationType string `toml:"authentication"`
MetricDatabase string `toml:"database"`
MetricGranularity string `toml:"granularity"`
Username string `toml:"username"`
Password string `toml:"password"`
ServerSelectTimeout config.Duration `toml:"timeout"`
TTL config.Duration `toml:"ttl"`
Log telegraf.Logger `toml:"-"`
client *mongo.Client
clientOptions *options.ClientOptions
collections map[string]bson.M
tls.ClientConfig
}
func (s *MongoDB) Description() string {
return "Sends metrics to MongoDB"
}
var sampleConfig = `
# connection string examples for mongodb
dsn = "mongodb://localhost:27017"
# dsn = "mongodb://mongod1:27017,mongod2:27017,mongod3:27017/admin&replicaSet=myReplSet&w=1"
# overrides serverSelectionTimeoutMS in dsn if set
# timeout = "30s"
# default authentication, optional
# authentication = "NONE"
# for SCRAM-SHA-256 authentication
# authentication = "SCRAM"
# username = "root"
# password = "***"
# for x509 certificate authentication
# authentication = "X509"
# tls_ca = "ca.pem"
# tls_key = "client.pem"
# # tls_key_pwd = "changeme" # required for encrypted tls_key
# insecure_skip_verify = false
# database to store measurements and time series collections
# database = "telegraf"
# granularity can be seconds, minutes, or hours.
# configuring this value will be based on your input collection frequency.
# see https://docs.mongodb.com/manual/core/timeseries-collections/#create-a-time-series-collection
# granularity = "seconds"
# optionally set a TTL to automatically expire documents from the measurement collections.
# ttl = "360h"
`
func (s *MongoDB) SampleConfig() string {
return sampleConfig
}
func (s *MongoDB) Init() error {
if s.MetricDatabase == "" {
s.MetricDatabase = "telegraf"
}
switch s.MetricGranularity {
case "":
s.MetricGranularity = "seconds"
case "seconds", "minutes", "hours":
default:
return fmt.Errorf("invalid time series collection granularity. please specify \"seconds\", \"minutes\", or \"hours\"")
}
// do some basic Dsn checks
if !strings.HasPrefix(s.Dsn, "mongodb://") && !strings.HasPrefix(s.Dsn, "mongodb+srv://") {
return fmt.Errorf("invalid connection string. expected mongodb://host:port/?{options} or mongodb+srv://host:port/?{options}")
}
if !strings.Contains(s.Dsn[strings.Index(s.Dsn, "://")+3:], "/") { //append '/' to Dsn if its missing
s.Dsn = s.Dsn + "/"
}
serverAPIOptions := options.ServerAPI(options.ServerAPIVersion1) //use new mongodb versioned api
s.clientOptions = options.Client().SetServerAPIOptions(serverAPIOptions)
switch s.AuthenticationType {
case "SCRAM":
if s.Username == "" {
return fmt.Errorf("SCRAM authentication must specify a username")
}
if s.Password == "" {
return fmt.Errorf("SCRAM authentication must specify a password")
}
credential := options.Credential{
AuthMechanism: "SCRAM-SHA-256",
Username: s.Username,
Password: s.Password,
}
s.clientOptions.SetAuth(credential)
case "X509":
//format connection string to include tls/x509 options
newConnectionString, err := url.Parse(s.Dsn)
if err != nil {
return err
}
q := newConnectionString.Query()
q.Set("tls", "true")
if s.InsecureSkipVerify {
q.Set("tlsInsecure", strconv.FormatBool(s.InsecureSkipVerify))
}
if s.TLSCA != "" {
q.Set("tlsCAFile", s.TLSCA)
}
q.Set("sslClientCertificateKeyFile", s.TLSKey)
if s.TLSKeyPwd != "" {
q.Set("sslClientCertificateKeyPassword", s.TLSKeyPwd)
}
newConnectionString.RawQuery = q.Encode()
s.Dsn = newConnectionString.String()
// always auth source $external
credential := options.Credential{
AuthSource: "$external",
AuthMechanism: "MONGODB-X509",
}
s.clientOptions.SetAuth(credential)
}
if s.ServerSelectTimeout != 0 {
s.clientOptions.SetServerSelectionTimeout(time.Duration(s.ServerSelectTimeout))
}
s.clientOptions.ApplyURI(s.Dsn)
return nil
}
func (s *MongoDB) createTimeSeriesCollection(databaseCollection string) error {
_, collectionExists := s.collections[databaseCollection]
if !collectionExists {
ctx := context.Background()
tso := options.TimeSeries()
tso.SetTimeField("timestamp")
tso.SetMetaField("tags")
tso.SetGranularity(s.MetricGranularity)
cco := options.CreateCollection()
if s.TTL != 0 {
cco.SetExpireAfterSeconds(int64(time.Duration(s.TTL).Seconds()))
}
cco.SetTimeSeriesOptions(tso)
err := s.client.Database(s.MetricDatabase).CreateCollection(ctx, databaseCollection, cco)
if err != nil {
return fmt.Errorf("unable to create time series collection: %v", err)
}
s.collections[databaseCollection] = bson.M{}
}
return nil
}
func (s *MongoDB) Connect() error {
ctx := context.Background()
client, err := mongo.Connect(ctx, s.clientOptions)
if err != nil {
return fmt.Errorf("unable to connect: %v", err)
}
s.client = client
if err := s.getCollections(ctx); err != nil {
return fmt.Errorf("unable to get collections from specified metric database: %v", err)
}
return nil
}
func (s *MongoDB) Close() error {
ctx := context.Background()
return s.client.Disconnect(ctx)
}
// all metric/measurement fields are parent level of document
// metadata field is named "tags"
// mongodb stores timestamp as UTC. conversion should be performed during reads in app or in aggregation pipeline
func marshalMetric(metric telegraf.Metric) bson.D {
var bdoc bson.D
for k, v := range metric.Fields() {
bdoc = append(bdoc, primitive.E{Key: k, Value: v})
}
var tags bson.D
for k, v := range metric.Tags() {
tags = append(tags, primitive.E{Key: k, Value: v})
}
bdoc = append(bdoc, primitive.E{Key: "tags", Value: tags})
bdoc = append(bdoc, primitive.E{Key: "timestamp", Value: metric.Time()})
return bdoc
}
func (s *MongoDB) Write(metrics []telegraf.Metric) error {
ctx := context.Background()
for _, metric := range metrics {
if err := s.createTimeSeriesCollection(metric.Name()); err != nil {
return err
}
bdoc := marshalMetric(metric)
if err := s.insertDocument(ctx, metric.Name(), bdoc); err != nil {
return err
}
}
return nil
}
func init() {
outputs.Add("mongodb", func() telegraf.Output { return &MongoDB{} })
}

View File

@ -0,0 +1,352 @@
package mongodb
import (
"testing"
"time"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConnectAndWriteIntegrationNoAuth(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
plugin := &MongoDB{
Dsn: "mongodb://localhost:27017",
AuthenticationType: "NONE",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
}
// validate config
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
require.NoError(t, plugin.Write(testutil.MockMetrics()))
require.NoError(t, plugin.Close())
}
func TestConnectAndWriteIntegrationSCRAMAuth(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tests := []struct {
name string
plugin *MongoDB
connErrFunc func(t *testing.T, err error)
}{
{
name: "success with scram authentication",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27018/admin",
AuthenticationType: "SCRAM",
Username: "root",
Password: "changeme",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
},
connErrFunc: func(t *testing.T, err error) {
require.NoError(t, err)
},
},
{
name: "fail with scram authentication bad password",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27018/admin",
AuthenticationType: "SCRAM",
Username: "root",
Password: "root",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
ServerSelectTimeout: config.Duration(time.Duration(5) * time.Second),
},
connErrFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// validate config
err := tt.plugin.Init()
require.NoError(t, err)
if err == nil {
// connect
err = tt.plugin.Connect()
tt.connErrFunc(t, err)
if err == nil {
// insert mock metrics
err = tt.plugin.Write(testutil.MockMetrics())
require.NoError(t, err)
// cleanup
err = tt.plugin.Close()
require.NoError(t, err)
}
}
})
}
}
func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tests := []struct {
name string
plugin *MongoDB
connErrFunc func(t *testing.T, err error)
}{
{
name: "success with x509 authentication",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27019",
AuthenticationType: "X509",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
ServerSelectTimeout: config.Duration(time.Duration(5) * time.Second),
TTL: config.Duration(time.Duration(5) * time.Minute),
ClientConfig: tls.ClientConfig{
TLSCA: "dev/cacert.pem",
TLSKey: "dev/client.pem",
InsecureSkipVerify: false,
},
},
connErrFunc: func(t *testing.T, err error) {
require.NoError(t, err)
},
},
{
name: "success with x509 authentication using encrypted key file",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27019",
AuthenticationType: "X509",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
ServerSelectTimeout: config.Duration(time.Duration(5) * time.Second),
TTL: config.Duration(time.Duration(5) * time.Minute),
ClientConfig: tls.ClientConfig{
TLSCA: "dev/cacert.pem",
TLSKey: "dev/clientenc.pem",
TLSKeyPwd: "changeme",
InsecureSkipVerify: false,
},
},
connErrFunc: func(t *testing.T, err error) {
require.NoError(t, err)
},
},
{
name: "success with x509 authentication missing ca and using insceure tls",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27019",
AuthenticationType: "X509",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
ServerSelectTimeout: config.Duration(time.Duration(5) * time.Second),
TTL: config.Duration(time.Duration(5) * time.Minute),
ClientConfig: tls.ClientConfig{
TLSKey: "dev/client.pem",
InsecureSkipVerify: true,
},
},
connErrFunc: func(t *testing.T, err error) {
require.NoError(t, err)
},
},
{
name: "fail with x509 authentication missing ca",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27019",
AuthenticationType: "X509",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
ServerSelectTimeout: config.Duration(time.Duration(5) * time.Second),
TTL: config.Duration(time.Duration(5) * time.Minute),
ClientConfig: tls.ClientConfig{
TLSKey: "dev/client.pem",
InsecureSkipVerify: false,
},
},
connErrFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
{
name: "fail with x509 authentication using encrypted key file",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27019",
AuthenticationType: "X509",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
ServerSelectTimeout: config.Duration(time.Duration(5) * time.Second),
TTL: config.Duration(time.Duration(5) * time.Minute),
ClientConfig: tls.ClientConfig{
TLSCA: "dev/cacert.pem",
TLSKey: "dev/clientenc.pem",
TLSKeyPwd: "badpassword",
InsecureSkipVerify: false,
},
},
connErrFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
{
name: "fail with x509 authentication using invalid ca",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27019",
AuthenticationType: "X509",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
ServerSelectTimeout: config.Duration(time.Duration(5) * time.Second),
TTL: config.Duration(time.Duration(5) * time.Minute),
ClientConfig: tls.ClientConfig{
TLSCA: "dev/client.pem",
TLSKey: "dev/client.pem",
InsecureSkipVerify: false,
},
},
connErrFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
{
name: "fail with x509 authentication using invalid key",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27019",
AuthenticationType: "X509",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
ServerSelectTimeout: config.Duration(time.Duration(5) * time.Second),
TTL: config.Duration(time.Duration(5) * time.Minute),
ClientConfig: tls.ClientConfig{
TLSCA: "dev/cacert.pem",
TLSKey: "dev/cacert.pem",
InsecureSkipVerify: false,
},
},
connErrFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// validate config
err := tt.plugin.Init()
require.NoError(t, err)
if err == nil {
// connect
err = tt.plugin.Connect()
tt.connErrFunc(t, err)
if err == nil {
// insert mock metrics
err = tt.plugin.Write(testutil.MockMetrics())
require.NoError(t, err)
// cleanup
err = tt.plugin.Close()
require.NoError(t, err)
}
}
})
}
}
func TestConfiguration(t *testing.T) {
tests := []struct {
name string
plugin *MongoDB
errFunc func(t *testing.T, err error)
}{
{
name: "fail with invalid connection string",
plugin: &MongoDB{
Dsn: "asdf1234",
AuthenticationType: "NONE",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
TTL: config.Duration(time.Duration(5) * time.Minute),
},
},
{
name: "fail with invalid metric granularity",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27017",
AuthenticationType: "NONE",
MetricDatabase: "telegraf_test",
MetricGranularity: "somerandomgranularitythatdoesntwork",
},
},
{
name: "fail with scram authentication missing username field",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27017",
AuthenticationType: "SCRAM",
Password: "somerandompasswordthatwontwork",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
},
},
{
name: "fail with scram authentication missing password field",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27017",
AuthenticationType: "SCRAM",
Username: "somerandomusernamethatwontwork",
MetricDatabase: "telegraf_test",
MetricGranularity: "seconds",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// validate config
err := tt.plugin.Init()
require.Error(t, err)
})
}
tests = []struct {
name string
plugin *MongoDB
errFunc func(t *testing.T, err error)
}{
{
name: "success init with missing metric database",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27017",
AuthenticationType: "NONE",
MetricGranularity: "seconds",
},
},
{
name: "success init missing metric granularity",
plugin: &MongoDB{
Dsn: "mongodb://localhost:27017",
AuthenticationType: "NONE",
MetricDatabase: "telegraf_test",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// validate config
err := tt.plugin.Init()
require.NoError(t, err)
})
}
}

28
testutil/pki/client.pem Normal file
View File

@ -0,0 +1,28 @@
-----BEGIN CERTIFICATE-----
MIIB+TCCAWKgAwIBAgIBAjANBgkqhkiG9w0BAQsFADAbMRkwFwYDVQQDDBBUZWxl
Z3JhZiBUZXN0IENBMB4XDTE4MDUwMzAxMDUyOVoXDTI4MDQzMDAxMDUyOVowHTEb
MBkGA1UEAwwSY2xpZW50LmxvY2FsZG9tYWluMIGfMA0GCSqGSIb3DQEBAQUAA4GN
ADCBiQKBgQDX7Plvu0MJtA9TrusYtQnAogsdiYJZd9wfFIjH5FxE3SWJ4KAIE+yR
WRqcqX8XnpieQLaNsfXhDPWLkWngTDydk4NO/jlAQk0e6+9+NeiZ2ViIHmtXERb9
CyiiWUmo+YCd69lhzSEIMK9EPBSDHQTgQMtEfGak03G5rx3MCakE1QIDAQABo0sw
STAJBgNVHRMEAjAAMAsGA1UdDwQEAwIHgDAaBgNVHREEEzARgglsb2NhbGhvc3SH
BH8AAAEwEwYDVR0lBAwwCgYIKwYBBQUHAwIwDQYJKoZIhvcNAQELBQADgYEAVry0
L07oTN+FMLncY/Be9BzFB3b3mnbxbZr58OgI4WHuOeYBuvDI033FIIIzpwb8XYpG
HJkZlSbviqq19lAh/Cktl35BCNrA6Uc+dgW7QWhnYS2tZandVTo/8FFstJTNiiLw
uiz/Hr3mRXUIDi5OygJHY1IZr8hFTOOJY+0ws3E=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQDX7Plvu0MJtA9TrusYtQnAogsdiYJZd9wfFIjH5FxE3SWJ4KAI
E+yRWRqcqX8XnpieQLaNsfXhDPWLkWngTDydk4NO/jlAQk0e6+9+NeiZ2ViIHmtX
ERb9CyiiWUmo+YCd69lhzSEIMK9EPBSDHQTgQMtEfGak03G5rx3MCakE1QIDAQAB
AoGAOjRU4Lt3zKvO3d3u3ZAfet+zY1jn3DolCfO9EzUJcj6ymcIFIWhNgrikJcrC
yZkkxrPnAbcQ8oNNxTuDcMTcKZbnyUnlQj5NtVuty5Q+zgf3/Q2pRhaE+TwrpOJ+
ETtVp9R/PrPN2NC5wPo289fPNWFYkd4DPbdWZp5AJHz1XYECQQD3kKpinJxMYp9F
Q1Qj1OkxGln0KPgdqRYjjW/rXI4/hUodfg+xXWHPFSGj3AgEjQIvuengbOAeH3qo
wF1uxVTlAkEA30hXM3EbboMCDQzNRNkkV9EiZ0MZXhj1aIGl+sQZOmOeFdcdjGkD
dsA42nmaYqXCD9KAvc+S/tGJaa0Qg0VhMQJAb2+TAqh0Qn3yK39PFIH2JcAy1ZDL
fq5p5L75rfwPm9AnuHbSIYhjSo+8gMG+ai3+2fTZrcfUajrJP8S3SfFRcQJBANQQ
POHatxcKzlPeqMaPBXlyY553mAxK4CnVmPLGdL+EBYzwtlu5EVUj09uMSxkOHXYx
k5yzHQVvtXbsrBZBOsECQBJLlkMjJmXrIIdLPmHQWL3bm9MMg1PqzupSEwz6cyrG
uIIm/X91pDyxCHaKYWp38FXBkYAgohI8ow5/sgRvU5w=
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,31 @@
-----BEGIN CERTIFICATE-----
MIIB+TCCAWKgAwIBAgIBAjANBgkqhkiG9w0BAQsFADAbMRkwFwYDVQQDDBBUZWxl
Z3JhZiBUZXN0IENBMB4XDTE4MDUwMzAxMDUyOVoXDTI4MDQzMDAxMDUyOVowHTEb
MBkGA1UEAwwSY2xpZW50LmxvY2FsZG9tYWluMIGfMA0GCSqGSIb3DQEBAQUAA4GN
ADCBiQKBgQDX7Plvu0MJtA9TrusYtQnAogsdiYJZd9wfFIjH5FxE3SWJ4KAIE+yR
WRqcqX8XnpieQLaNsfXhDPWLkWngTDydk4NO/jlAQk0e6+9+NeiZ2ViIHmtXERb9
CyiiWUmo+YCd69lhzSEIMK9EPBSDHQTgQMtEfGak03G5rx3MCakE1QIDAQABo0sw
STAJBgNVHRMEAjAAMAsGA1UdDwQEAwIHgDAaBgNVHREEEzARgglsb2NhbGhvc3SH
BH8AAAEwEwYDVR0lBAwwCgYIKwYBBQUHAwIwDQYJKoZIhvcNAQELBQADgYEAVry0
L07oTN+FMLncY/Be9BzFB3b3mnbxbZr58OgI4WHuOeYBuvDI033FIIIzpwb8XYpG
HJkZlSbviqq19lAh/Cktl35BCNrA6Uc+dgW7QWhnYS2tZandVTo/8FFstJTNiiLw
uiz/Hr3mRXUIDi5OygJHY1IZr8hFTOOJY+0ws3E=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: AES-128-CBC,E07764654058094DE0846DF015F8CD79
PdLqVcSk+zB6F8Cbgx7PmyXFvIhcQHQcM4zsuVTSdvTdtrpDk82wLxPTVIU6D7p5
cqodMKv7xLUV2BSqGfIbSlMHyT6rFskjpZWPUSS9hQ9YlWqsoNflTMT33pNz8eMA
mYj9JlFImRq8o3E9rV2bdaFnt+UwvabPnGWW3EC3PDZRXNNFddu62X0Iip24vy/g
L5hOqkSN9l+m72wvfw0RwdTT8RMCoug+RKD/g2lUJ9l1//UhWV5Urte/cQA7l+6W
ntWzI9hwh1NheO552bOEuroMk9sjWRsYYBRkCp1JJsy+lUxZILQfoC0YP6uroVZT
TWDeWqQ839LYEJHFIZGp5fu1N/Km2HfwctelHwmJmbEMveVKaOv7TdOCjfX0fg8E
fiEvyUCZ3C/vgtZE0U4FZEaOmlGHY6VyylJmMZ20MWz9tsLJNf4GXBdaiMeD7huW
90xdbkncidRtZ/wWBPeqetP/brMu/3+1CMk66kBqVAEnw9pIxL5E3jivxMHHK9Ql
5nFJ+9epgV8wJDrTuVxqLsat/GnqfYcUPcvNgGkghblnJUdQnbM/3mBZCuuVhoMk
+Ggy3ryRiv5pUsgsriOBvZ+mGgx8IlYX8v+wSQEWuA7c/+0ylAPmqyD1B9AK5l6D
KjCxmd8/oiTlhqXZe1Z023p6+12Y+DFjGAfr5S81OwIUV6Txp5IevYdtCAs1OaDT
3F3jeWwOqbfDsXluaTc7J4SxaL4QN/CUI4ag1s0ul2Yj6giTP5g1H85XoGxjk/zN
smmRYOrmUyjChoa10wPSq9BirZ4bETnvj7OgcENaScrPmzG+8Ht6+sk5cRj+sVkv
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,18 @@
-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: AES-128-CBC,E07764654058094DE0846DF015F8CD79
PdLqVcSk+zB6F8Cbgx7PmyXFvIhcQHQcM4zsuVTSdvTdtrpDk82wLxPTVIU6D7p5
cqodMKv7xLUV2BSqGfIbSlMHyT6rFskjpZWPUSS9hQ9YlWqsoNflTMT33pNz8eMA
mYj9JlFImRq8o3E9rV2bdaFnt+UwvabPnGWW3EC3PDZRXNNFddu62X0Iip24vy/g
L5hOqkSN9l+m72wvfw0RwdTT8RMCoug+RKD/g2lUJ9l1//UhWV5Urte/cQA7l+6W
ntWzI9hwh1NheO552bOEuroMk9sjWRsYYBRkCp1JJsy+lUxZILQfoC0YP6uroVZT
TWDeWqQ839LYEJHFIZGp5fu1N/Km2HfwctelHwmJmbEMveVKaOv7TdOCjfX0fg8E
fiEvyUCZ3C/vgtZE0U4FZEaOmlGHY6VyylJmMZ20MWz9tsLJNf4GXBdaiMeD7huW
90xdbkncidRtZ/wWBPeqetP/brMu/3+1CMk66kBqVAEnw9pIxL5E3jivxMHHK9Ql
5nFJ+9epgV8wJDrTuVxqLsat/GnqfYcUPcvNgGkghblnJUdQnbM/3mBZCuuVhoMk
+Ggy3ryRiv5pUsgsriOBvZ+mGgx8IlYX8v+wSQEWuA7c/+0ylAPmqyD1B9AK5l6D
KjCxmd8/oiTlhqXZe1Z023p6+12Y+DFjGAfr5S81OwIUV6Txp5IevYdtCAs1OaDT
3F3jeWwOqbfDsXluaTc7J4SxaL4QN/CUI4ag1s0ul2Yj6giTP5g1H85XoGxjk/zN
smmRYOrmUyjChoa10wPSq9BirZ4bETnvj7OgcENaScrPmzG+8Ht6+sk5cRj+sVkv
-----END RSA PRIVATE KEY-----

28
testutil/pki/server.pem Normal file
View File

@ -0,0 +1,28 @@
-----BEGIN CERTIFICATE-----
MIIB+TCCAWKgAwIBAgIBATANBgkqhkiG9w0BAQsFADAbMRkwFwYDVQQDDBBUZWxl
Z3JhZiBUZXN0IENBMB4XDTE4MDUwMzAxMDUyOVoXDTI4MDQzMDAxMDUyOVowHTEb
MBkGA1UEAwwSc2VydmVyLmxvY2FsZG9tYWluMIGfMA0GCSqGSIb3DQEBAQUAA4GN
ADCBiQKBgQDTBmLJ0pBFUxnPkkx38sBnOKvs+OinVqxTnVcc1iCyQJQleB37uY6D
L55mSsPvnad/oDpyGpHt4RVtrhmyC6ptSrWLyk7mraeAo30Cooqr5tA9A+6yj0ij
ySLlYimTMQy8tbnVNWLwKbxgT9N4NlUzwyqxLWUMfRzLfmefqzk5bQIDAQABo0sw
STAJBgNVHRMEAjAAMBoGA1UdEQQTMBGCCWxvY2FsaG9zdIcEfwAAATALBgNVHQ8E
BAMCBaAwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDQYJKoZIhvcNAQELBQADgYEATNnM
ol0s29lJ+WkP+HUFtKaXxQ+kXLADqfhsk2G1/kZAVRHsYUDlJ+GkHnWIHlg/ggIP
JS+z44iwMPOtzJQI7MvAFYVKpYAEdIFTjXf6GafLjUfoXYi0vwHoVJHtQu3Kpm9L
Ugm02h0ycIadN8RdWAAFUf6XpVKUJa0YYLuyaXY=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQDTBmLJ0pBFUxnPkkx38sBnOKvs+OinVqxTnVcc1iCyQJQleB37
uY6DL55mSsPvnad/oDpyGpHt4RVtrhmyC6ptSrWLyk7mraeAo30Cooqr5tA9A+6y
j0ijySLlYimTMQy8tbnVNWLwKbxgT9N4NlUzwyqxLWUMfRzLfmefqzk5bQIDAQAB
AoGBALWQAgFJxM2QwV1hr59oYnitPudmBa6smRpb/q6V4Y3cmFpgrdN+hIqEtxGl
9E0+5PWfI4o3KCV2itxSdlNFTDyqTZkM+BT8PPKISzAewkdqnKjbWgAmluzOJH4O
hc1zBfIOuT5+cfx5JR5/j9BhWVC7BJ+EiREkd/Z8ZnAMeItVAkEA8bhcC+8luiFQ
6kytXx2XfbKKh4Q99+KEQHqSGeuHZOcnWfjX99jo67CIxpwBRENslpZOw78fBmi4
4kf8j+dgLwJBAN99zyRxYzKc8TSsy/fF+3V/Ex75HYGGS/eOWcwPFXpGNA63hIa8
fJ/2pDnLzCqLZ9vWdBF39NtkacJS7bo6XSMCQQCZgN2bipSn3k53bJhRJga1gXOt
2dJMoGIiXHR513QVJSJ9ZaUpNWu9eU9y6VF4m2TTQMLmVnIKbOi0csi2TlZrAkAi
7URsC5RXGpPPiZmutTAhIqTYWFI2JcjFfWenLkxK+aG1ExURAW/wh9kOdz0HARZQ
Eum8uSR5DO5CQjeIvQpFAkAgZJXAwRxuts/p1EoLuPCJTaDkIY2vc0AJzzr5nuAs
pyjnLYCYqSBUJ+3nDDBqNYpgxCJddzmjNxGuO7mef9Ue
-----END RSA PRIVATE KEY-----

View File

@ -4,6 +4,7 @@ mkdir certs certs_by_serial private &&
chmod 700 private &&
echo 01 > ./serial &&
touch ./index.txt &&
echo 'unique_subject = no' > index.txt.attr
cat >./openssl.conf <<EOF
[ ca ]
default_ca = telegraf_ca
@ -63,14 +64,23 @@ extendedKeyUsage = 1.3.6.1.5.5.7.3.1
DNS.1 = localhost
IP.1 = 127.0.0.1
EOF
openssl req -x509 -config ./openssl.conf -days 3650 -newkey rsa:1024 -out ./certs/cacert.pem -keyout ./private/cakey.pem -subj "/CN=Telegraf Test CA/" -nodes &&
openssl req -x509 -config ./openssl.conf -days 3650 -newkey rsa:2048 -out ./certs/cacert.pem -keyout ./private/cakey.pem -subj "/CN=Telegraf Test CA/" -nodes &&
# Create server keypair
openssl genrsa -out ./private/serverkey.pem 1024 &&
openssl req -new -key ./private/serverkey.pem -out ./certs/servercsr.pem -outform PEM -subj "/CN=server.localdomain/O=server/" &&
# Create server and soon to expire keypair
openssl genrsa -out ./private/serverkey.pem 2048 &&
openssl req -new -key ./private/serverkey.pem -out ./certs/servercsr.pem -outform PEM -subj "/CN=$(cat /proc/sys/kernel/hostname)/O=server/" &&
openssl ca -config ./openssl.conf -in ./certs/servercsr.pem -out ./certs/servercert.pem -notext -batch -extensions server_ca_extensions &&
openssl ca -config ./openssl.conf -in ./certs/servercsr.pem -out ./certs/servercertexp.pem -startdate $(date +%y%m%d%H%M00 --date='-5 minutes')'Z' -enddate $(date +%y%m%d%H%M00 --date='5 minutes')'Z' -notext -batch -extensions server_ca_extensions &&
# Create client keypair
openssl genrsa -out ./private/clientkey.pem 1024 &&
openssl req -new -key ./private/clientkey.pem -out ./certs/clientcsr.pem -outform PEM -subj "/CN=client.localdomain/O=client/" &&
openssl ca -config ./openssl.conf -in ./certs/clientcsr.pem -out ./certs/clientcert.pem -notext -batch -extensions client_ca_extensions
# Create client and client encrypted keypair
openssl genrsa -out ./private/clientkey.pem 2048 &&
openssl req -new -key ./private/clientkey.pem -out ./certs/clientcsr.pem -outform PEM -subj "/CN=$(cat /proc/sys/kernel/hostname)/O=client/" &&
openssl ca -config ./openssl.conf -in ./certs/clientcsr.pem -out ./certs/clientcert.pem -notext -batch -extensions client_ca_extensions &&
cp ./private/clientkey.pem ./private/clientkeyenc.pem &&
ssh-keygen -p -f ./private/clientkeyenc.pem -m PEM -N 'changeme'
# Combine crt and key to create pem formatted keyfile
cat ./certs/clientcert.pem ./private/clientkey.pem > ./private/client.pem &&
cat ./certs/clientcert.pem ./private/clientkeyenc.pem > ./private/clientenc.pem &&
cat ./certs/servercert.pem ./private/serverkey.pem > ./private/server.pem &&
cat ./certs/servercertexp.pem ./private/serverkey.pem > ./private/serverexp.pem

View File

@ -72,6 +72,18 @@ func (p *pki) ClientKeyPath() string {
return path.Join(p.path, "clientkey.pem")
}
func (p *pki) ClientCertAndKeyPath() string {
return path.Join(p.path, "client.pem")
}
func (p *pki) ClientEncKeyPath() string {
return path.Join(p.path, "clientkeyenc.pem")
}
func (p *pki) ClientCertAndEncKeyPath() string {
return path.Join(p.path, "clientenc.pem")
}
func (p *pki) ReadServerCert() string {
return readCertificate(p.ServerCertPath())
}
@ -88,6 +100,10 @@ func (p *pki) ServerKeyPath() string {
return path.Join(p.path, "serverkey.pem")
}
func (p *pki) ServerCertAndKeyPath() string {
return path.Join(p.path, "server.pem")
}
func readCertificate(filename string) string {
file, err := os.Open(filename)
if err != nil {