feat(secretstores): add support for additional input plugins (#12067)
This commit is contained in:
parent
aa2b6947ff
commit
63ab159481
|
|
@ -2,15 +2,17 @@ package kafka
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
)
|
||||
|
||||
type SASLAuth struct {
|
||||
SASLUsername string `toml:"sasl_username"`
|
||||
SASLPassword string `toml:"sasl_password"`
|
||||
SASLMechanism string `toml:"sasl_mechanism"`
|
||||
SASLVersion *int `toml:"sasl_version"`
|
||||
SASLUsername config.Secret `toml:"sasl_username"`
|
||||
SASLPassword config.Secret `toml:"sasl_password"`
|
||||
SASLMechanism string `toml:"sasl_mechanism"`
|
||||
SASLVersion *int `toml:"sasl_version"`
|
||||
|
||||
// GSSAPI config
|
||||
SASLGSSAPIServiceName string `toml:"sasl_gssapi_service_name"`
|
||||
|
|
@ -21,36 +23,46 @@ type SASLAuth struct {
|
|||
SASLGSSAPIRealm string `toml:"sasl_gssapi_realm"`
|
||||
|
||||
// OAUTHBEARER config. experimental. undoubtedly this is not good enough.
|
||||
SASLAccessToken string `toml:"sasl_access_token"`
|
||||
SASLAccessToken config.Secret `toml:"sasl_access_token"`
|
||||
}
|
||||
|
||||
// SetSASLConfig configures SASL for kafka (sarama)
|
||||
func (k *SASLAuth) SetSASLConfig(config *sarama.Config) error {
|
||||
config.Net.SASL.User = k.SASLUsername
|
||||
config.Net.SASL.Password = k.SASLPassword
|
||||
func (k *SASLAuth) SetSASLConfig(cfg *sarama.Config) error {
|
||||
username, err := k.SASLUsername.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting username failed: %w", err)
|
||||
}
|
||||
defer config.ReleaseSecret(username)
|
||||
password, err := k.SASLPassword.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting password failed: %w", err)
|
||||
}
|
||||
defer config.ReleaseSecret(password)
|
||||
cfg.Net.SASL.User = string(username)
|
||||
cfg.Net.SASL.Password = string(password)
|
||||
|
||||
if k.SASLMechanism != "" {
|
||||
config.Net.SASL.Mechanism = sarama.SASLMechanism(k.SASLMechanism)
|
||||
switch config.Net.SASL.Mechanism {
|
||||
cfg.Net.SASL.Mechanism = sarama.SASLMechanism(k.SASLMechanism)
|
||||
switch cfg.Net.SASL.Mechanism {
|
||||
case sarama.SASLTypeSCRAMSHA256:
|
||||
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
|
||||
}
|
||||
case sarama.SASLTypeSCRAMSHA512:
|
||||
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
|
||||
}
|
||||
case sarama.SASLTypeOAuth:
|
||||
config.Net.SASL.TokenProvider = k // use self as token provider.
|
||||
cfg.Net.SASL.TokenProvider = k // use self as token provider.
|
||||
case sarama.SASLTypeGSSAPI:
|
||||
config.Net.SASL.GSSAPI.ServiceName = k.SASLGSSAPIServiceName
|
||||
config.Net.SASL.GSSAPI.AuthType = gssapiAuthType(k.SASLGSSAPIAuthType)
|
||||
config.Net.SASL.GSSAPI.Username = k.SASLUsername
|
||||
config.Net.SASL.GSSAPI.Password = k.SASLPassword
|
||||
config.Net.SASL.GSSAPI.DisablePAFXFAST = k.SASLGSSAPIDisablePAFXFAST
|
||||
config.Net.SASL.GSSAPI.KerberosConfigPath = k.SASLGSSAPIKerberosConfigPath
|
||||
config.Net.SASL.GSSAPI.KeyTabPath = k.SASLGSSAPIKeyTabPath
|
||||
config.Net.SASL.GSSAPI.Realm = k.SASLGSSAPIRealm
|
||||
cfg.Net.SASL.GSSAPI.ServiceName = k.SASLGSSAPIServiceName
|
||||
cfg.Net.SASL.GSSAPI.AuthType = gssapiAuthType(k.SASLGSSAPIAuthType)
|
||||
cfg.Net.SASL.GSSAPI.Username = string(username)
|
||||
cfg.Net.SASL.GSSAPI.Password = string(password)
|
||||
cfg.Net.SASL.GSSAPI.DisablePAFXFAST = k.SASLGSSAPIDisablePAFXFAST
|
||||
cfg.Net.SASL.GSSAPI.KerberosConfigPath = k.SASLGSSAPIKerberosConfigPath
|
||||
cfg.Net.SASL.GSSAPI.KeyTabPath = k.SASLGSSAPIKeyTabPath
|
||||
cfg.Net.SASL.GSSAPI.Realm = k.SASLGSSAPIRealm
|
||||
|
||||
case sarama.SASLTypePlaintext:
|
||||
// nothing.
|
||||
|
|
@ -58,22 +70,27 @@ func (k *SASLAuth) SetSASLConfig(config *sarama.Config) error {
|
|||
}
|
||||
}
|
||||
|
||||
if k.SASLUsername != "" || k.SASLMechanism != "" {
|
||||
config.Net.SASL.Enable = true
|
||||
if len(username) > 0 || k.SASLMechanism != "" {
|
||||
cfg.Net.SASL.Enable = true
|
||||
|
||||
version, err := SASLVersion(config.Version, k.SASLVersion)
|
||||
version, err := SASLVersion(cfg.Version, k.SASLVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
config.Net.SASL.Version = version
|
||||
cfg.Net.SASL.Version = version
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Token does nothing smart, it just grabs a hard-coded token from config.
|
||||
func (k *SASLAuth) Token() (*sarama.AccessToken, error) {
|
||||
token, err := k.SASLAccessToken.Get()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting token failed: %w", err)
|
||||
}
|
||||
defer config.ReleaseSecret(token)
|
||||
return &sarama.AccessToken{
|
||||
Token: k.SASLAccessToken,
|
||||
Token: string(token),
|
||||
Extensions: map[string]string{},
|
||||
}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,8 +51,8 @@ type HTTPResponse struct {
|
|||
ResponseStatusCode int
|
||||
Interface string
|
||||
// HTTP Basic Auth Credentials
|
||||
Username string `toml:"username"`
|
||||
Password string `toml:"password"`
|
||||
Username config.Secret `toml:"username"`
|
||||
Password config.Secret `toml:"password"`
|
||||
tls.ClientConfig
|
||||
|
||||
Log telegraf.Logger
|
||||
|
|
@ -219,8 +219,8 @@ func (h *HTTPResponse) httpGather(u string) (map[string]interface{}, map[string]
|
|||
}
|
||||
}
|
||||
|
||||
if h.Username != "" || h.Password != "" {
|
||||
request.SetBasicAuth(h.Username, h.Password)
|
||||
if err := h.setRequestAuth(request); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Start Timer
|
||||
|
|
@ -398,6 +398,23 @@ func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (h *HTTPResponse) setRequestAuth(request *http.Request) error {
|
||||
username, err := h.Username.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting username failed: %v", err)
|
||||
}
|
||||
defer config.ReleaseSecret(username)
|
||||
password, err := h.Password.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting password failed: %v", err)
|
||||
}
|
||||
defer config.ReleaseSecret(password)
|
||||
if len(username) != 0 || len(password) != 0 {
|
||||
request.SetBasicAuth(string(username), string(password))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("http_response", func() telegraf.Input {
|
||||
return &HTTPResponse{}
|
||||
|
|
|
|||
|
|
@ -1115,8 +1115,8 @@ func TestBasicAuth(t *testing.T) {
|
|||
Body: "{ 'test': 'data'}",
|
||||
Method: "GET",
|
||||
ResponseTimeout: config.Duration(time.Second * 20),
|
||||
Username: "me",
|
||||
Password: "mypassword",
|
||||
Username: config.NewSecret([]byte("me")),
|
||||
Password: config.NewSecret([]byte("mypassword")),
|
||||
Headers: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
|
|
|
|||
|
|
@ -65,8 +65,8 @@ type MQTTConsumer struct {
|
|||
Topics []string `toml:"topics"`
|
||||
TopicTag *string `toml:"topic_tag"`
|
||||
TopicParsing []TopicParsingConfig `toml:"topic_parsing"`
|
||||
Username string `toml:"username"`
|
||||
Password string `toml:"password"`
|
||||
Username config.Secret `toml:"username"`
|
||||
Password config.Secret `toml:"password"`
|
||||
QoS int `toml:"qos"`
|
||||
ConnectionTimeout config.Duration `toml:"connection_timeout"`
|
||||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||
|
|
@ -331,16 +331,25 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
|
|||
if tlsCfg != nil {
|
||||
opts.SetTLSConfig(tlsCfg)
|
||||
}
|
||||
user := m.Username
|
||||
if user != "" {
|
||||
opts.SetUsername(user)
|
||||
if !m.Username.Empty() {
|
||||
user, err := m.Username.Get()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting username failed: %w", err)
|
||||
}
|
||||
opts.SetUsername(string(user))
|
||||
config.ReleaseSecret(user)
|
||||
}
|
||||
password := m.Password
|
||||
if password != "" {
|
||||
opts.SetPassword(password)
|
||||
|
||||
if !m.Password.Empty() {
|
||||
password, err := m.Password.Get()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting password failed: %w", err)
|
||||
}
|
||||
opts.SetPassword(string(password))
|
||||
config.ReleaseSecret(password)
|
||||
}
|
||||
if len(m.Servers) == 0 {
|
||||
return opts, fmt.Errorf("could not get host informations")
|
||||
return opts, fmt.Errorf("could not get host information")
|
||||
}
|
||||
for _, server := range m.Servers {
|
||||
// Preserve support for host:port style servers; deprecated in Telegraf 1.4.4
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/postgresql"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
|
@ -48,13 +49,15 @@ func TestPgBouncerGeneratesMetricsIntegration(t *testing.T) {
|
|||
require.NoError(t, err, "failed to start container")
|
||||
defer container.Terminate()
|
||||
|
||||
addr := fmt.Sprintf(
|
||||
"host=%s user=pgbouncer password=pgbouncer dbname=pgbouncer port=%s sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[pgBouncerServicePort],
|
||||
)
|
||||
|
||||
p := &PgBouncer{
|
||||
Service: postgresql.Service{
|
||||
Address: fmt.Sprintf(
|
||||
"host=%s user=pgbouncer password=pgbouncer dbname=pgbouncer port=%s sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[pgBouncerServicePort],
|
||||
),
|
||||
Address: config.NewSecret([]byte(addr)),
|
||||
IsPgBouncer: true,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
## to grab metrics for.
|
||||
##
|
||||
address = "host=localhost user=postgres sslmode=disable"
|
||||
|
||||
## A custom name for the database that will be used as the "server" tag in the
|
||||
## measurement output. If not specified, a default one generated from
|
||||
## the connection address is used.
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
|
|
@ -42,13 +43,15 @@ func TestPostgresqlGeneratesMetricsIntegration(t *testing.T) {
|
|||
container := launchTestContainer(t)
|
||||
defer container.Terminate()
|
||||
|
||||
addr := fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
)
|
||||
|
||||
p := &Postgresql{
|
||||
Service: Service{
|
||||
Address: fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
),
|
||||
Address: config.NewSecret([]byte(addr)),
|
||||
IsPgBouncer: false,
|
||||
},
|
||||
Databases: []string{"postgres"},
|
||||
|
|
@ -131,13 +134,15 @@ func TestPostgresqlTagsMetricsWithDatabaseNameIntegration(t *testing.T) {
|
|||
container := launchTestContainer(t)
|
||||
defer container.Terminate()
|
||||
|
||||
addr := fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
)
|
||||
|
||||
p := &Postgresql{
|
||||
Service: Service{
|
||||
Address: fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
),
|
||||
Address: config.NewSecret([]byte(addr)),
|
||||
},
|
||||
Databases: []string{"postgres"},
|
||||
}
|
||||
|
|
@ -161,13 +166,15 @@ func TestPostgresqlDefaultsToAllDatabasesIntegration(t *testing.T) {
|
|||
container := launchTestContainer(t)
|
||||
defer container.Terminate()
|
||||
|
||||
addr := fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
)
|
||||
|
||||
p := &Postgresql{
|
||||
Service: Service{
|
||||
Address: fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
),
|
||||
Address: config.NewSecret([]byte(addr)),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -198,13 +205,15 @@ func TestPostgresqlIgnoresUnwantedColumnsIntegration(t *testing.T) {
|
|||
container := launchTestContainer(t)
|
||||
defer container.Terminate()
|
||||
|
||||
addr := fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
)
|
||||
|
||||
p := &Postgresql{
|
||||
Service: Service{
|
||||
Address: fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
),
|
||||
Address: config.NewSecret([]byte(addr)),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -225,13 +234,15 @@ func TestPostgresqlDatabaseWhitelistTestIntegration(t *testing.T) {
|
|||
container := launchTestContainer(t)
|
||||
defer container.Terminate()
|
||||
|
||||
addr := fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
)
|
||||
|
||||
p := &Postgresql{
|
||||
Service: Service{
|
||||
Address: fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
),
|
||||
Address: config.NewSecret([]byte(addr)),
|
||||
},
|
||||
Databases: []string{"template0"},
|
||||
}
|
||||
|
|
@ -269,13 +280,15 @@ func TestPostgresqlDatabaseBlacklistTestIntegration(t *testing.T) {
|
|||
container := launchTestContainer(t)
|
||||
defer container.Terminate()
|
||||
|
||||
addr := fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
)
|
||||
|
||||
p := &Postgresql{
|
||||
Service: Service{
|
||||
Address: fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
),
|
||||
Address: config.NewSecret([]byte(addr)),
|
||||
},
|
||||
IgnoredDatabases: []string{"template0"},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@
|
|||
## to grab metrics for.
|
||||
##
|
||||
address = "host=localhost user=postgres sslmode=disable"
|
||||
|
||||
## A custom name for the database that will be used as the "server" tag in the
|
||||
## measurement output. If not specified, a default one generated from
|
||||
## the connection address is used.
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package postgresql
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net"
|
||||
|
|
@ -88,26 +89,31 @@ func parseURL(uri string) (string, error) {
|
|||
// Service common functionality shared between the postgresql and postgresql_extensible
|
||||
// packages.
|
||||
type Service struct {
|
||||
Address string
|
||||
OutputAddress string
|
||||
MaxIdle int
|
||||
MaxOpen int
|
||||
MaxLifetime config.Duration
|
||||
Address config.Secret `toml:"address"`
|
||||
OutputAddress string `toml:"outputaddress"`
|
||||
MaxIdle int `toml:"max_idle"`
|
||||
MaxOpen int `toml:"max_open"`
|
||||
MaxLifetime config.Duration `toml:"max_lifetime"`
|
||||
IsPgBouncer bool `toml:"-"`
|
||||
DB *sql.DB
|
||||
IsPgBouncer bool `toml:"-"`
|
||||
}
|
||||
|
||||
var socketRegexp = regexp.MustCompile(`/\.s\.PGSQL\.\d+$`)
|
||||
|
||||
// Start starts the ServiceInput's service, whatever that may be
|
||||
func (p *Service) Start(telegraf.Accumulator) (err error) {
|
||||
const localhost = "host=localhost sslmode=disable"
|
||||
addr, err := p.Address.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting address failed: %v", err)
|
||||
}
|
||||
defer config.ReleaseSecret(addr)
|
||||
|
||||
if p.Address == "" || p.Address == "localhost" {
|
||||
p.Address = localhost
|
||||
if p.Address.Empty() || string(addr) == "localhost" {
|
||||
addr = []byte("host=localhost sslmode=disable")
|
||||
p.Address = config.NewSecret(addr)
|
||||
}
|
||||
|
||||
connConfig, err := pgx.ParseConfig(p.Address)
|
||||
connConfig, err := pgx.ParseConfig(string(addr))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -146,23 +152,24 @@ var kvMatcher, _ = regexp.Compile(`(password|sslcert|sslkey|sslmode|sslrootcert)
|
|||
|
||||
// SanitizedAddress utility function to strip sensitive information from the connection string.
|
||||
func (p *Service) SanitizedAddress() (sanitizedAddress string, err error) {
|
||||
var (
|
||||
canonicalizedAddress string
|
||||
)
|
||||
|
||||
if p.OutputAddress != "" {
|
||||
return p.OutputAddress, nil
|
||||
}
|
||||
|
||||
if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") {
|
||||
if canonicalizedAddress, err = parseURL(p.Address); err != nil {
|
||||
addr, err := p.Address.Get()
|
||||
if err != nil {
|
||||
return sanitizedAddress, fmt.Errorf("getting address for sanitization failed: %v", err)
|
||||
}
|
||||
defer config.ReleaseSecret(addr)
|
||||
|
||||
var canonicalizedAddress string
|
||||
if bytes.HasPrefix(addr, []byte("postgres://")) || bytes.HasPrefix(addr, []byte("postgresql://")) {
|
||||
if canonicalizedAddress, err = parseURL(string(addr)); err != nil {
|
||||
return sanitizedAddress, err
|
||||
}
|
||||
} else {
|
||||
canonicalizedAddress = p.Address
|
||||
canonicalizedAddress = string(addr)
|
||||
}
|
||||
|
||||
sanitizedAddress = kvMatcher.ReplaceAllString(canonicalizedAddress, "")
|
||||
|
||||
return sanitizedAddress, err
|
||||
return kvMatcher.ReplaceAllString(canonicalizedAddress, ""), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/postgresql"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
|
@ -32,14 +33,16 @@ func queryRunner(t *testing.T, q query) *testutil.Accumulator {
|
|||
require.NoError(t, err, "failed to start container")
|
||||
defer container.Terminate()
|
||||
|
||||
addr := fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
)
|
||||
|
||||
p := &Postgresql{
|
||||
Log: testutil.Logger{},
|
||||
Service: postgresql.Service{
|
||||
Address: fmt.Sprintf(
|
||||
"host=%s port=%s user=postgres sslmode=disable",
|
||||
container.Address,
|
||||
container.Ports[servicePort],
|
||||
),
|
||||
Address: config.NewSecret([]byte(addr)),
|
||||
IsPgBouncer: false,
|
||||
},
|
||||
Databases: []string{"postgres"},
|
||||
|
|
@ -239,13 +242,16 @@ func TestPostgresqlSqlScript(t *testing.T) {
|
|||
Withdbname: false,
|
||||
Tagvalue: "",
|
||||
}}
|
||||
|
||||
addr := fmt.Sprintf(
|
||||
"host=%s user=postgres sslmode=disable",
|
||||
testutil.GetLocalHost(),
|
||||
)
|
||||
|
||||
p := &Postgresql{
|
||||
Log: testutil.Logger{},
|
||||
Service: postgresql.Service{
|
||||
Address: fmt.Sprintf(
|
||||
"host=%s user=postgres sslmode=disable",
|
||||
testutil.GetLocalHost(),
|
||||
),
|
||||
Address: config.NewSecret([]byte(addr)),
|
||||
IsPgBouncer: false,
|
||||
},
|
||||
Databases: []string{"postgres"},
|
||||
|
|
@ -263,13 +269,15 @@ func TestPostgresqlIgnoresUnwantedColumnsIntegration(t *testing.T) {
|
|||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf(
|
||||
"host=%s user=postgres sslmode=disable",
|
||||
testutil.GetLocalHost(),
|
||||
)
|
||||
|
||||
p := &Postgresql{
|
||||
Log: testutil.Logger{},
|
||||
Service: postgresql.Service{
|
||||
Address: fmt.Sprintf(
|
||||
"host=%s user=postgres sslmode=disable",
|
||||
testutil.GetLocalHost(),
|
||||
),
|
||||
Address: config.NewSecret([]byte(addr)),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -109,5 +109,7 @@ snmp_trap,mib=SNMPv2-MIB,name=coldStart,oid=.1.3.6.1.6.3.1.1.5.1,source=192.168.
|
|||
snmp_trap,mib=NET-SNMP-AGENT-MIB,name=nsNotifyShutdown,oid=.1.3.6.1.4.1.8072.4.0.2,source=192.168.122.102,version=2c,community=public sysUpTimeInstance=5803i,snmpTrapEnterprise.0="netSnmpNotificationPrefix" 1574109186555115459
|
||||
```
|
||||
|
||||
[net-snmp]: http://www.net-snmp.org/
|
||||
[man snmpcmd]: http://net-snmp.sourceforge.net/docs/man/snmpcmd.html#lbAK
|
||||
## References
|
||||
|
||||
- [net-snmp project home](http://www.net-snmp.org)
|
||||
- [`snmpcmd` man-page](http://net-snmp.sourceforge.net/docs/man/snmpcmd.html)
|
||||
|
|
|
|||
|
|
@ -35,14 +35,14 @@ type SnmpTrap struct {
|
|||
|
||||
// Settings for version 3
|
||||
// Values: "noAuthNoPriv", "authNoPriv", "authPriv"
|
||||
SecLevel string `toml:"sec_level"`
|
||||
SecName string `toml:"sec_name"`
|
||||
SecLevel string `toml:"sec_level"`
|
||||
SecName config.Secret `toml:"sec_name"`
|
||||
// Values: "MD5", "SHA", "". Default: ""
|
||||
AuthProtocol string `toml:"auth_protocol"`
|
||||
AuthPassword string `toml:"auth_password"`
|
||||
AuthProtocol string `toml:"auth_protocol"`
|
||||
AuthPassword config.Secret `toml:"auth_password"`
|
||||
// Values: "DES", "AES", "". Default: ""
|
||||
PrivProtocol string `toml:"priv_protocol"`
|
||||
PrivPassword string `toml:"priv_password"`
|
||||
PrivProtocol string `toml:"priv_protocol"`
|
||||
PrivPassword config.Secret `toml:"priv_password"`
|
||||
|
||||
acc telegraf.Accumulator
|
||||
listener *gosnmp.TrapListener
|
||||
|
|
@ -171,13 +171,28 @@ func (s *SnmpTrap) Start(acc telegraf.Accumulator) error {
|
|||
return fmt.Errorf("unknown privacy protocol '%s'", s.PrivProtocol)
|
||||
}
|
||||
|
||||
secname, err := s.SecName.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting secname failed: %v", err)
|
||||
}
|
||||
privPasswd, err := s.PrivPassword.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting secname failed: %v", err)
|
||||
}
|
||||
authPasswd, err := s.AuthPassword.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting secname failed: %v", err)
|
||||
}
|
||||
s.listener.Params.SecurityParameters = &gosnmp.UsmSecurityParameters{
|
||||
UserName: s.SecName,
|
||||
UserName: string(secname),
|
||||
PrivacyProtocol: privacyProtocol,
|
||||
PrivacyPassphrase: s.PrivPassword,
|
||||
AuthenticationPassphrase: s.AuthPassword,
|
||||
PrivacyPassphrase: string(privPasswd),
|
||||
AuthenticationPassphrase: string(authPasswd),
|
||||
AuthenticationProtocol: authenticationProtocol,
|
||||
}
|
||||
config.ReleaseSecret(secname)
|
||||
config.ReleaseSecret(privPasswd)
|
||||
config.ReleaseSecret(authPasswd)
|
||||
}
|
||||
|
||||
// wrap the handler, used in unit tests
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal/snmp"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
|
@ -1274,12 +1275,12 @@ func TestReceiveTrap(t *testing.T) {
|
|||
//if cold start be answer otherwise err
|
||||
Log: testutil.Logger{},
|
||||
Version: tt.version.String(),
|
||||
SecName: tt.secName,
|
||||
SecName: config.NewSecret([]byte(tt.secName)),
|
||||
SecLevel: tt.secLevel,
|
||||
AuthProtocol: tt.authProto,
|
||||
AuthPassword: tt.authPass,
|
||||
AuthPassword: config.NewSecret([]byte(tt.authPass)),
|
||||
PrivProtocol: tt.privProto,
|
||||
PrivPassword: tt.privPass,
|
||||
PrivPassword: config.NewSecret([]byte(tt.privPass)),
|
||||
Translator: "netsnmp",
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -146,7 +146,6 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
## "database_type" enables a specific set of queries depending on the database type. If specified, it replaces azuredb = true/false and query_version = 2
|
||||
## In the config file, the sql server plugin section should be repeated each with a set of servers for a specific database_type.
|
||||
## Possible values for database_type are - "SQLServer" or "AzureSQLDB" or "AzureSQLManagedInstance" or "AzureSQLPool"
|
||||
|
||||
database_type = "SQLServer"
|
||||
|
||||
## A list of queries to include. If not specified, all the below listed queries are used.
|
||||
|
|
|
|||
|
|
@ -4,8 +4,10 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
func TestAzureSQLIntegration_Database_ResourceStats_Query(t *testing.T) {
|
||||
|
|
@ -18,9 +20,10 @@ func TestAzureSQLIntegration_Database_ResourceStats_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_DB_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLDBResourceStats"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLDB",
|
||||
|
|
@ -63,9 +66,10 @@ func TestAzureSQLIntegration_Database_ResourceGovernance_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_DB_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLDBResourceGovernance"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLDB",
|
||||
|
|
@ -124,9 +128,10 @@ func TestAzureSQLIntegration_Database_WaitStats_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_DB_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLDBWaitStats"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLDB",
|
||||
|
|
@ -161,9 +166,10 @@ func TestAzureSQLIntegration_Database_DatabaseIO_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_DB_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLDBDatabaseIO"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLDB",
|
||||
|
|
@ -207,9 +213,10 @@ func TestAzureSQLIntegration_Database_ServerProperties_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_DB_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLDBServerProperties"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLDB",
|
||||
|
|
@ -248,9 +255,10 @@ func TestAzureSQLIntegration_Database_OsWaitstats_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_DB_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLDBOsWaitstats"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLDB",
|
||||
|
|
@ -286,9 +294,10 @@ func TestAzureSQLIntegration_Database_MemoryClerks_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_DB_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLDBMemoryClerks"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLDB",
|
||||
|
|
@ -319,9 +328,10 @@ func TestAzureSQLIntegration_Database_PerformanceCounters_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_DB_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLDBPerformanceCounters"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLDB",
|
||||
|
|
@ -355,9 +365,10 @@ func TestAzureSQLIntegration_Database_Requests_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_DB_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLDBRequests"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLDB",
|
||||
|
|
@ -413,9 +424,10 @@ func TestAzureSQLIntegration_Database_Schedulers_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_DB_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLDBSchedulers"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLDB",
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
@ -18,9 +19,10 @@ func TestAzureSQLIntegration_Managed_ResourceStats_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_MI_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLMIResourceStats"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLManagedInstance",
|
||||
|
|
@ -51,9 +53,10 @@ func TestAzureSQLIntegration_Managed_ResourceGovernance_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_MI_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLMIResourceGovernance"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLManagedInstance",
|
||||
|
|
@ -92,9 +95,10 @@ func TestAzureSQLIntegration_Managed_DatabaseIO_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_MI_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLMIDatabaseIO"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLManagedInstance",
|
||||
|
|
@ -134,9 +138,10 @@ func TestAzureSQLIntegration_Managed_ServerProperties_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_MI_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLMIServerProperties"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLManagedInstance",
|
||||
|
|
@ -181,9 +186,10 @@ func TestAzureSQLIntegration_Managed_OsWaitStats_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_MI_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLMIOsWaitstats"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLManagedInstance",
|
||||
|
|
@ -218,9 +224,10 @@ func TestAzureSQLIntegration_Managed_MemoryClerks_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_MI_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLMIMemoryClerks"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLManagedInstance",
|
||||
|
|
@ -250,9 +257,10 @@ func TestAzureSQLIntegration_Managed_PerformanceCounters_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_MI_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLMIPerformanceCounters"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLManagedInstance",
|
||||
|
|
@ -285,9 +293,10 @@ func TestAzureSQLIntegration_Managed_Requests_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_MI_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLMIRequests"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLManagedInstance",
|
||||
|
|
@ -343,9 +352,10 @@ func TestAzureSQLIntegration_Managed_Schedulers_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_MI_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLMISchedulers"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLManagedInstance",
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
@ -18,9 +19,10 @@ func TestAzureSQLIntegration_ElasticPool_ResourceStats_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_POOL_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLPoolResourceStats"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLPool",
|
||||
|
|
@ -60,9 +62,10 @@ func TestAzureSQLIntegration_ElasticPool_ResourceGovernance_Query(t *testing.T)
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_POOL_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLPoolResourceGovernance"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLPool",
|
||||
|
|
@ -123,9 +126,10 @@ func TestAzureSQLIntegration_ElasticPool_DatabaseIO_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_POOL_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLPoolDatabaseIO"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLPool",
|
||||
|
|
@ -167,9 +171,10 @@ func TestAzureSQLIntegration_ElasticPool_OsWaitStats_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_POOL_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLPoolOsWaitStats"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLPool",
|
||||
|
|
@ -204,9 +209,10 @@ func TestAzureSQLIntegration_ElasticPool_MemoryClerks_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_POOL_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLPoolMemoryClerks"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLPool",
|
||||
|
|
@ -236,9 +242,10 @@ func TestAzureSQLIntegration_ElasticPool_PerformanceCounters_Query(t *testing.T)
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_POOL_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLPoolPerformanceCounters"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLPool",
|
||||
|
|
@ -270,9 +277,10 @@ func TestAzureSQLIntegration_ElasticPool_Schedulers_Query(t *testing.T) {
|
|||
}
|
||||
|
||||
connectionString := os.Getenv("AZURESQL_POOL_CONNECTION_STRING")
|
||||
serversList := []config.Secret{config.NewSecret([]byte(connectionString))}
|
||||
|
||||
server := &SQLServer{
|
||||
Servers: []string{connectionString},
|
||||
Servers: serversList,
|
||||
IncludeQuery: []string{"AzureSQLPoolSchedulers"},
|
||||
AuthMethod: "connection_string",
|
||||
DatabaseType: "AzureSQLPool",
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@
|
|||
## "database_type" enables a specific set of queries depending on the database type. If specified, it replaces azuredb = true/false and query_version = 2
|
||||
## In the config file, the sql server plugin section should be repeated each with a set of servers for a specific database_type.
|
||||
## Possible values for database_type are - "SQLServer" or "AzureSQLDB" or "AzureSQLManagedInstance" or "AzureSQLPool"
|
||||
|
||||
database_type = "SQLServer"
|
||||
|
||||
## A list of queries to include. If not specified, all the below listed queries are used.
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ var sampleConfig string
|
|||
|
||||
// SQLServer struct
|
||||
type SQLServer struct {
|
||||
Servers []string `toml:"servers"`
|
||||
Servers []config.Secret `toml:"servers"`
|
||||
QueryTimeout config.Duration `toml:"query_timeout"`
|
||||
AuthMethod string `toml:"auth_method"`
|
||||
QueryVersion int `toml:"query_version" deprecated:"1.16.0;use 'database_type' instead"`
|
||||
|
|
@ -206,12 +206,17 @@ func (s *SQLServer) Gather(acc telegraf.Accumulator) error {
|
|||
wg.Add(1)
|
||||
go func(pool *sql.DB, query Query, serverIndex int) {
|
||||
defer wg.Done()
|
||||
connectionString := s.Servers[serverIndex]
|
||||
queryError := s.gatherServer(pool, query, acc, connectionString)
|
||||
dsn, err := s.Servers[serverIndex].Get()
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
}
|
||||
defer config.ReleaseSecret(dsn)
|
||||
queryError := s.gatherServer(pool, query, acc, string(dsn))
|
||||
|
||||
if s.HealthMetric {
|
||||
mutex.Lock()
|
||||
s.gatherHealth(healthMetrics, connectionString, queryError)
|
||||
s.gatherHealth(healthMetrics, string(dsn), queryError)
|
||||
mutex.Unlock()
|
||||
}
|
||||
|
||||
|
|
@ -244,19 +249,24 @@ func (s *SQLServer) Start(acc telegraf.Accumulator) error {
|
|||
|
||||
switch strings.ToLower(s.AuthMethod) {
|
||||
case "connection_string":
|
||||
// Get the connection string potentially containing secrets
|
||||
dsn, err := serv.Get()
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Use the DSN (connection string) directly. In this case,
|
||||
// empty username/password causes use of Windows
|
||||
// integrated authentication.
|
||||
var err error
|
||||
pool, err = sql.Open("mssql", serv)
|
||||
|
||||
pool, err = sql.Open("mssql", string(dsn))
|
||||
config.ReleaseSecret(dsn)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
continue
|
||||
}
|
||||
case "aad":
|
||||
// AAD Auth with system-assigned managed identity (MSI)
|
||||
|
||||
// AAD Auth is only supported for Azure SQL Database or Azure SQL Managed Instance
|
||||
if s.DatabaseType == "SQLServer" {
|
||||
err := errors.New("database connection failed : AAD auth is not supported for SQL VM i.e. DatabaseType=SQLServer")
|
||||
|
|
@ -271,7 +281,14 @@ func (s *SQLServer) Start(acc telegraf.Accumulator) error {
|
|||
continue
|
||||
}
|
||||
|
||||
connector, err := mssql.NewAccessTokenConnector(serv, tokenProvider)
|
||||
// Get the connection string potentially containing secrets
|
||||
dsn, err := serv.Get()
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
continue
|
||||
}
|
||||
connector, err := mssql.NewAccessTokenConnector(string(dsn), tokenProvider)
|
||||
config.ReleaseSecret(dsn)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("error creating the SQL connector : %s", err.Error()))
|
||||
continue
|
||||
|
|
@ -529,7 +546,7 @@ func (s *SQLServer) refreshToken() (*adal.Token, error) {
|
|||
func init() {
|
||||
inputs.Add("sqlserver", func() telegraf.Input {
|
||||
return &SQLServer{
|
||||
Servers: []string{defaultServer},
|
||||
Servers: []config.Secret{config.NewSecret([]byte(defaultServer))},
|
||||
AuthMethod: "connection_string",
|
||||
}
|
||||
})
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
|
|
@ -116,12 +117,12 @@ func TestSqlServerIntegration_MultipleInstance(t *testing.T) {
|
|||
|
||||
testServer := "Server=127.0.0.1;Port=1433;User Id=SA;Password=ABCabc01;app name=telegraf;log=1"
|
||||
s := &SQLServer{
|
||||
Servers: []string{testServer},
|
||||
Servers: []config.Secret{config.NewSecret([]byte(testServer))},
|
||||
ExcludeQuery: []string{"MemoryClerk"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
s2 := &SQLServer{
|
||||
Servers: []string{testServer},
|
||||
Servers: []config.Secret{config.NewSecret([]byte(testServer))},
|
||||
ExcludeQuery: []string{"DatabaseSize"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
|
@ -153,12 +154,12 @@ func TestSqlServerIntegration_MultipleInstanceWithHealthMetric(t *testing.T) {
|
|||
|
||||
testServer := "Server=127.0.0.1;Port=1433;User Id=SA;Password=ABCabc01;app name=telegraf;log=1"
|
||||
s := &SQLServer{
|
||||
Servers: []string{testServer},
|
||||
Servers: []config.Secret{config.NewSecret([]byte(testServer))},
|
||||
ExcludeQuery: []string{"MemoryClerk"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
s2 := &SQLServer{
|
||||
Servers: []string{testServer},
|
||||
Servers: []config.Secret{config.NewSecret([]byte(testServer))},
|
||||
ExcludeQuery: []string{"DatabaseSize"},
|
||||
HealthMetric: true,
|
||||
Log: testutil.Logger{},
|
||||
|
|
@ -194,7 +195,10 @@ func TestSqlServer_HealthMetric(t *testing.T) {
|
|||
fakeServer2 := "localhost\\fakeinstance2;Database=fakedb2;Password=ABCabc01;"
|
||||
|
||||
s1 := &SQLServer{
|
||||
Servers: []string{fakeServer1, fakeServer2},
|
||||
Servers: []config.Secret{
|
||||
config.NewSecret([]byte(fakeServer1)),
|
||||
config.NewSecret([]byte(fakeServer2)),
|
||||
},
|
||||
IncludeQuery: []string{"DatabaseSize", "MemoryClerk"},
|
||||
HealthMetric: true,
|
||||
AuthMethod: "connection_string",
|
||||
|
|
@ -202,7 +206,7 @@ func TestSqlServer_HealthMetric(t *testing.T) {
|
|||
}
|
||||
|
||||
s2 := &SQLServer{
|
||||
Servers: []string{fakeServer1},
|
||||
Servers: []config.Secret{config.NewSecret([]byte(fakeServer1))},
|
||||
IncludeQuery: []string{"DatabaseSize"},
|
||||
AuthMethod: "connection_string",
|
||||
Log: testutil.Logger{},
|
||||
|
|
@ -344,13 +348,13 @@ func TestSqlServerIntegration_AGQueriesApplicableForDatabaseTypeSQLServer(t *tes
|
|||
testServer := os.Getenv("AZURESQL_POOL_CONNECTION_STRING")
|
||||
|
||||
s := &SQLServer{
|
||||
Servers: []string{testServer},
|
||||
Servers: []config.Secret{config.NewSecret([]byte(testServer))},
|
||||
DatabaseType: "SQLServer",
|
||||
IncludeQuery: []string{"SQLServerAvailabilityReplicaStates", "SQLServerDatabaseReplicaStates"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
s2 := &SQLServer{
|
||||
Servers: []string{testServer},
|
||||
Servers: []config.Secret{config.NewSecret([]byte(testServer))},
|
||||
DatabaseType: "AzureSQLDB",
|
||||
IncludeQuery: []string{"SQLServerAvailabilityReplicaStates", "SQLServerDatabaseReplicaStates"},
|
||||
Log: testutil.Logger{},
|
||||
|
|
@ -395,13 +399,13 @@ func TestSqlServerIntegration_AGQueryFieldsOutputBasedOnSQLServerVersion(t *test
|
|||
testServer2012 := os.Getenv("AZURESQL_POOL_CONNECTION_STRING_2012")
|
||||
|
||||
s2019 := &SQLServer{
|
||||
Servers: []string{testServer2019},
|
||||
Servers: []config.Secret{config.NewSecret([]byte(testServer2019))},
|
||||
DatabaseType: "SQLServer",
|
||||
IncludeQuery: []string{"SQLServerAvailabilityReplicaStates", "SQLServerDatabaseReplicaStates"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
s2012 := &SQLServer{
|
||||
Servers: []string{testServer2012},
|
||||
Servers: []config.Secret{config.NewSecret([]byte(testServer2012))},
|
||||
DatabaseType: "SQLServer",
|
||||
IncludeQuery: []string{"SQLServerAvailabilityReplicaStates", "SQLServerDatabaseReplicaStates"},
|
||||
Log: testutil.Logger{},
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/vmware/govmomi/vim25/types"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
)
|
||||
|
||||
// The highest number of metrics we can query for, no matter what settings
|
||||
|
|
@ -99,9 +100,24 @@ func (cf *ClientFactory) testClient(ctx context.Context) error {
|
|||
cf.parent.Log.Info("Client session seems to have time out. Reauthenticating!")
|
||||
ctx2, cancel2 := context.WithTimeout(ctx, time.Duration(cf.parent.Timeout))
|
||||
defer cancel2()
|
||||
if err := cf.client.Client.SessionManager.Login(ctx2, url.UserPassword(cf.parent.Username, cf.parent.Password)); err != nil {
|
||||
return fmt.Errorf("renewing authentication failed: %s", err.Error())
|
||||
|
||||
// Resolving the secrets and construct the authentication info
|
||||
username, err := cf.parent.Username.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting username failed: %w", err)
|
||||
}
|
||||
defer config.ReleaseSecret(username)
|
||||
password, err := cf.parent.Password.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting password failed: %w", err)
|
||||
}
|
||||
defer config.ReleaseSecret(password)
|
||||
auth := url.UserPassword(string(username), string(password))
|
||||
|
||||
if err := cf.client.Client.SessionManager.Login(ctx2, auth); err != nil {
|
||||
return fmt.Errorf("renewing authentication failed: %w", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -120,8 +136,19 @@ func NewClient(ctx context.Context, vSphereURL *url.URL, vs *VSphere) (*Client,
|
|||
if tlsCfg == nil {
|
||||
tlsCfg = &tls.Config{}
|
||||
}
|
||||
if vs.Username != "" {
|
||||
vSphereURL.User = url.UserPassword(vs.Username, vs.Password)
|
||||
if !vs.Username.Empty() {
|
||||
// Resolving the secrets and construct the authentication info
|
||||
username, err := vs.Username.Get()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting username failed: %w", err)
|
||||
}
|
||||
defer config.ReleaseSecret(username)
|
||||
password, err := vs.Password.Get()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting password failed: %w", err)
|
||||
}
|
||||
defer config.ReleaseSecret(password)
|
||||
vSphereURL.User = url.UserPassword(string(username), string(password))
|
||||
}
|
||||
|
||||
vs.Log.Debugf("Creating client: %s", vSphereURL.Host)
|
||||
|
|
|
|||
|
|
@ -22,8 +22,8 @@ var sampleConfig string
|
|||
// and a list of connected vSphere endpoints
|
||||
type VSphere struct {
|
||||
Vcenters []string
|
||||
Username string
|
||||
Password string
|
||||
Username config.Secret `toml:"username"`
|
||||
Password config.Secret `toml:"password"`
|
||||
DatacenterInstances bool
|
||||
DatacenterMetricInclude []string
|
||||
DatacenterMetricExclude []string
|
||||
|
|
|
|||
|
|
@ -225,9 +225,7 @@ func TestMaxQuery(t *testing.T) {
|
|||
return
|
||||
}
|
||||
m, s, err := createSim(0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
defer m.Remove()
|
||||
defer s.Close()
|
||||
|
||||
|
|
@ -235,9 +233,7 @@ func TestMaxQuery(t *testing.T) {
|
|||
v.MaxQueryMetrics = 256
|
||||
ctx := context.Background()
|
||||
c, err := NewClient(ctx, s.URL, v)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 256, v.MaxQueryMetrics)
|
||||
|
||||
om := object.NewOptionManager(c.Client.Client, *c.Client.Client.ServiceContent.Setting)
|
||||
|
|
@ -245,16 +241,12 @@ func TestMaxQuery(t *testing.T) {
|
|||
Key: "config.vpxd.stats.maxQueryMetrics",
|
||||
Value: "42",
|
||||
}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
v.MaxQueryMetrics = 256
|
||||
ctx = context.Background()
|
||||
c2, err := NewClient(ctx, s.URL, v)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 42, v.MaxQueryMetrics)
|
||||
c.close()
|
||||
c2.close()
|
||||
|
|
@ -287,9 +279,7 @@ func TestFinder(t *testing.T) {
|
|||
}
|
||||
|
||||
m, s, err := createSim(0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
defer m.Remove()
|
||||
defer s.Close()
|
||||
|
||||
|
|
@ -413,9 +403,7 @@ func TestFolders(t *testing.T) {
|
|||
}
|
||||
|
||||
m, s, err := createSim(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
defer m.Remove()
|
||||
defer s.Close()
|
||||
|
||||
|
|
@ -477,8 +465,8 @@ func testCollection(t *testing.T, excludeClusters bool) {
|
|||
v := defaultVSphere()
|
||||
if vCenter != "" {
|
||||
v.Vcenters = []string{vCenter}
|
||||
v.Username = username
|
||||
v.Password = password
|
||||
v.Username = config.NewSecret([]byte(username))
|
||||
v.Password = config.NewSecret([]byte(password))
|
||||
} else {
|
||||
// Don't run test on 32-bit machines due to bug in simulator.
|
||||
// https://github.com/vmware/govmomi/issues/1330
|
||||
|
|
@ -488,9 +476,7 @@ func testCollection(t *testing.T, excludeClusters bool) {
|
|||
}
|
||||
|
||||
m, s, err := createSim(0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
defer m.Remove()
|
||||
defer s.Close()
|
||||
v.Vcenters = []string{s.URL.String()}
|
||||
|
|
|
|||
Loading…
Reference in New Issue