254 lines
7.5 KiB
Go
254 lines
7.5 KiB
Go
package mongodb
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
)
|
|
|
|
func (s *MongoDB) getCollections(ctx context.Context) error {
|
|
s.collections = map[string]bson.M{}
|
|
collections, err := s.client.Database(s.MetricDatabase).ListCollections(ctx, bson.M{})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to execute ListCollections: %v", err)
|
|
}
|
|
for collections.Next(ctx) {
|
|
var collection bson.M
|
|
if err := collections.Decode(&collection); err != nil {
|
|
return fmt.Errorf("unable to decode ListCollections: %v", err)
|
|
}
|
|
name, ok := collection["name"].(string)
|
|
if !ok {
|
|
return fmt.Errorf("non-string name in %v", collection)
|
|
}
|
|
s.collections[name] = collection
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *MongoDB) insertDocument(ctx context.Context, databaseCollection string, bdoc bson.D) error {
|
|
collection := s.client.Database(s.MetricDatabase).Collection(databaseCollection)
|
|
_, err := collection.InsertOne(ctx, &bdoc)
|
|
return err
|
|
}
|
|
|
|
type MongoDB struct {
|
|
Dsn string `toml:"dsn"`
|
|
AuthenticationType string `toml:"authentication"`
|
|
MetricDatabase string `toml:"database"`
|
|
MetricGranularity string `toml:"granularity"`
|
|
Username string `toml:"username"`
|
|
Password string `toml:"password"`
|
|
ServerSelectTimeout config.Duration `toml:"timeout"`
|
|
TTL config.Duration `toml:"ttl"`
|
|
Log telegraf.Logger `toml:"-"`
|
|
client *mongo.Client
|
|
clientOptions *options.ClientOptions
|
|
collections map[string]bson.M
|
|
tls.ClientConfig
|
|
}
|
|
|
|
func (s *MongoDB) Description() string {
|
|
return "Sends metrics to MongoDB"
|
|
}
|
|
|
|
var sampleConfig = `
|
|
# connection string examples for mongodb
|
|
dsn = "mongodb://localhost:27017"
|
|
# dsn = "mongodb://mongod1:27017,mongod2:27017,mongod3:27017/admin&replicaSet=myReplSet&w=1"
|
|
|
|
# overrides serverSelectionTimeoutMS in dsn if set
|
|
# timeout = "30s"
|
|
|
|
# default authentication, optional
|
|
# authentication = "NONE"
|
|
|
|
# for SCRAM-SHA-256 authentication
|
|
# authentication = "SCRAM"
|
|
# username = "root"
|
|
# password = "***"
|
|
|
|
# for x509 certificate authentication
|
|
# authentication = "X509"
|
|
# tls_ca = "ca.pem"
|
|
# tls_key = "client.pem"
|
|
# # tls_key_pwd = "changeme" # required for encrypted tls_key
|
|
# insecure_skip_verify = false
|
|
|
|
# database to store measurements and time series collections
|
|
# database = "telegraf"
|
|
|
|
# granularity can be seconds, minutes, or hours.
|
|
# configuring this value will be based on your input collection frequency.
|
|
# see https://docs.mongodb.com/manual/core/timeseries-collections/#create-a-time-series-collection
|
|
# granularity = "seconds"
|
|
|
|
# optionally set a TTL to automatically expire documents from the measurement collections.
|
|
# ttl = "360h"
|
|
`
|
|
|
|
func (s *MongoDB) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (s *MongoDB) Init() error {
|
|
if s.MetricDatabase == "" {
|
|
s.MetricDatabase = "telegraf"
|
|
}
|
|
switch s.MetricGranularity {
|
|
case "":
|
|
s.MetricGranularity = "seconds"
|
|
case "seconds", "minutes", "hours":
|
|
default:
|
|
return fmt.Errorf("invalid time series collection granularity. please specify \"seconds\", \"minutes\", or \"hours\"")
|
|
}
|
|
|
|
// do some basic Dsn checks
|
|
if !strings.HasPrefix(s.Dsn, "mongodb://") && !strings.HasPrefix(s.Dsn, "mongodb+srv://") {
|
|
return fmt.Errorf("invalid connection string. expected mongodb://host:port/?{options} or mongodb+srv://host:port/?{options}")
|
|
}
|
|
if !strings.Contains(s.Dsn[strings.Index(s.Dsn, "://")+3:], "/") { //append '/' to Dsn if its missing
|
|
s.Dsn = s.Dsn + "/"
|
|
}
|
|
|
|
serverAPIOptions := options.ServerAPI(options.ServerAPIVersion1) //use new mongodb versioned api
|
|
s.clientOptions = options.Client().SetServerAPIOptions(serverAPIOptions)
|
|
|
|
switch s.AuthenticationType {
|
|
case "SCRAM":
|
|
if s.Username == "" {
|
|
return fmt.Errorf("SCRAM authentication must specify a username")
|
|
}
|
|
if s.Password == "" {
|
|
return fmt.Errorf("SCRAM authentication must specify a password")
|
|
}
|
|
credential := options.Credential{
|
|
AuthMechanism: "SCRAM-SHA-256",
|
|
Username: s.Username,
|
|
Password: s.Password,
|
|
}
|
|
s.clientOptions.SetAuth(credential)
|
|
case "X509":
|
|
//format connection string to include tls/x509 options
|
|
newConnectionString, err := url.Parse(s.Dsn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
q := newConnectionString.Query()
|
|
q.Set("tls", "true")
|
|
if s.InsecureSkipVerify {
|
|
q.Set("tlsInsecure", strconv.FormatBool(s.InsecureSkipVerify))
|
|
}
|
|
if s.TLSCA != "" {
|
|
q.Set("tlsCAFile", s.TLSCA)
|
|
}
|
|
q.Set("sslClientCertificateKeyFile", s.TLSKey)
|
|
if s.TLSKeyPwd != "" {
|
|
q.Set("sslClientCertificateKeyPassword", s.TLSKeyPwd)
|
|
}
|
|
newConnectionString.RawQuery = q.Encode()
|
|
s.Dsn = newConnectionString.String()
|
|
// always auth source $external
|
|
credential := options.Credential{
|
|
AuthSource: "$external",
|
|
AuthMechanism: "MONGODB-X509",
|
|
}
|
|
s.clientOptions.SetAuth(credential)
|
|
}
|
|
|
|
if s.ServerSelectTimeout != 0 {
|
|
s.clientOptions.SetServerSelectionTimeout(time.Duration(s.ServerSelectTimeout))
|
|
}
|
|
|
|
s.clientOptions.ApplyURI(s.Dsn)
|
|
return nil
|
|
}
|
|
|
|
func (s *MongoDB) createTimeSeriesCollection(databaseCollection string) error {
|
|
_, collectionExists := s.collections[databaseCollection]
|
|
if !collectionExists {
|
|
ctx := context.Background()
|
|
tso := options.TimeSeries()
|
|
tso.SetTimeField("timestamp")
|
|
tso.SetMetaField("tags")
|
|
tso.SetGranularity(s.MetricGranularity)
|
|
cco := options.CreateCollection()
|
|
if s.TTL != 0 {
|
|
cco.SetExpireAfterSeconds(int64(time.Duration(s.TTL).Seconds()))
|
|
}
|
|
cco.SetTimeSeriesOptions(tso)
|
|
err := s.client.Database(s.MetricDatabase).CreateCollection(ctx, databaseCollection, cco)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to create time series collection: %v", err)
|
|
}
|
|
s.collections[databaseCollection] = bson.M{}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *MongoDB) Connect() error {
|
|
ctx := context.Background()
|
|
client, err := mongo.Connect(ctx, s.clientOptions)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to connect: %v", err)
|
|
}
|
|
s.client = client
|
|
if err := s.getCollections(ctx); err != nil {
|
|
return fmt.Errorf("unable to get collections from specified metric database: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *MongoDB) Close() error {
|
|
ctx := context.Background()
|
|
return s.client.Disconnect(ctx)
|
|
}
|
|
|
|
// all metric/measurement fields are parent level of document
|
|
// metadata field is named "tags"
|
|
// mongodb stores timestamp as UTC. conversion should be performed during reads in app or in aggregation pipeline
|
|
func marshalMetric(metric telegraf.Metric) bson.D {
|
|
var bdoc bson.D
|
|
for k, v := range metric.Fields() {
|
|
bdoc = append(bdoc, primitive.E{Key: k, Value: v})
|
|
}
|
|
var tags bson.D
|
|
for k, v := range metric.Tags() {
|
|
tags = append(tags, primitive.E{Key: k, Value: v})
|
|
}
|
|
bdoc = append(bdoc, primitive.E{Key: "tags", Value: tags})
|
|
bdoc = append(bdoc, primitive.E{Key: "timestamp", Value: metric.Time()})
|
|
return bdoc
|
|
}
|
|
|
|
func (s *MongoDB) Write(metrics []telegraf.Metric) error {
|
|
ctx := context.Background()
|
|
for _, metric := range metrics {
|
|
if err := s.createTimeSeriesCollection(metric.Name()); err != nil {
|
|
return err
|
|
}
|
|
bdoc := marshalMetric(metric)
|
|
if err := s.insertDocument(ctx, metric.Name(), bdoc); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
outputs.Add("mongodb", func() telegraf.Output { return &MongoDB{} })
|
|
}
|