input/sqlserver: Add service and save connection pools (#8596)

This commit is contained in:
Mattias Jiderhamn 2021-03-29 17:22:36 +02:00 committed by GitHub
parent e6165ecd18
commit 871447b22c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 54 deletions

View File

@ -79,6 +79,7 @@ following works:
- github.com/gofrs/uuid [MIT License](https://github.com/gofrs/uuid/blob/master/LICENSE)
- github.com/gogo/googleapis [Apache License 2.0](https://github.com/gogo/googleapis/blob/master/LICENSE)
- github.com/gogo/protobuf [BSD 3-Clause Clear License](https://github.com/gogo/protobuf/blob/master/LICENSE)
- github.com/golang-sql/civil [Apache License 2.0](https://github.com/golang-sql/civil/blob/master/LICENSE)
- github.com/golang/geo [Apache License 2.0](https://github.com/golang/geo/blob/master/LICENSE)
- github.com/golang/groupcache [Apache License 2.0](https://github.com/golang/groupcache/blob/master/LICENSE)
- github.com/golang/protobuf [BSD 3-Clause "New" or "Revised" License](https://github.com/golang/protobuf/blob/master/LICENSE)

2
go.mod
View File

@ -43,7 +43,7 @@ require (
github.com/couchbase/go-couchbase v0.0.0-20180501122049-16db1f1fe037
github.com/couchbase/gomemcached v0.0.0-20180502221210-0da75df14530 // indirect
github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a // indirect
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4
github.com/denisenkom/go-mssqldb v0.9.0
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dimchansky/utfbom v1.1.1
github.com/docker/docker v17.12.0-ce-rc1.0.20200916142827-bd33bbf0497b+incompatible

10
go.sum
View File

@ -1,6 +1,5 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.37.4/go.mod h1:NHPJ89PdicEuT9hdPXMROBD91xc5uRDxsMtSB16k7hw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
cloud.google.com/go v0.43.0/go.mod h1:BOSR3VbTLkk6FDC/TcffxP4NF/FFBGA5ku+jvKOP7pg=
cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
@ -260,8 +259,8 @@ github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhr
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4 h1:YcpmyvADGYw5LqMnHqSkyIELsHCGF6PkrmM31V8rF7o=
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
github.com/denisenkom/go-mssqldb v0.9.0 h1:RSohk2RsiZqLZ0zCjtfn3S4Gp4exhpBWHyQ7D0yGjAk=
github.com/denisenkom/go-mssqldb v0.9.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
@ -483,6 +482,8 @@ github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec h1:lJwO/92dFXWeXOZdoGXgptLmNLwynMSHUmU6besqtiw=
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI=
@ -1317,6 +1318,7 @@ golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1345,6 +1347,7 @@ golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -1462,7 +1465,6 @@ google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuh
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190404172233-64821d5d2107/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=

View File

@ -15,15 +15,15 @@ import (
// SQLServer struct
type SQLServer struct {
Servers []string `toml:"servers"`
QueryVersion int `toml:"query_version"`
AzureDB bool `toml:"azuredb"`
DatabaseType string `toml:"database_type"`
IncludeQuery []string `toml:"include_query"`
ExcludeQuery []string `toml:"exclude_query"`
HealthMetric bool `toml:"health_metric"`
queries MapQuery
isInitialized bool
Servers []string `toml:"servers"`
QueryVersion int `toml:"query_version"`
AzureDB bool `toml:"azuredb"`
DatabaseType string `toml:"database_type"`
IncludeQuery []string `toml:"include_query"`
ExcludeQuery []string `toml:"exclude_query"`
HealthMetric bool `toml:"health_metric"`
pools []*sql.DB
queries MapQuery
}
// Query struct
@ -223,8 +223,6 @@ func initQueries(s *SQLServer) error {
}
}
// Set a flag so we know that queries have already been initialized
s.isInitialized = true
var querylist []string
for query := range queries {
querylist = append(querylist, query)
@ -236,32 +234,25 @@ func initQueries(s *SQLServer) error {
// Gather collect data from SQL Server
func (s *SQLServer) Gather(acc telegraf.Accumulator) error {
if !s.isInitialized {
if err := initQueries(s); err != nil {
acc.AddError(err)
return err
}
}
var wg sync.WaitGroup
var mutex sync.Mutex
var healthMetrics = make(map[string]*HealthMetric)
for _, serv := range s.Servers {
for i, pool := range s.pools {
for _, query := range s.queries {
wg.Add(1)
go func(serv string, query Query) {
go func(pool *sql.DB, query Query, serverIndex int) {
defer wg.Done()
queryError := s.gatherServer(serv, query, acc)
queryError := s.gatherServer(pool, query, acc)
if s.HealthMetric {
mutex.Lock()
s.gatherHealth(healthMetrics, serv, queryError)
s.gatherHealth(healthMetrics, s.Servers[serverIndex], queryError)
mutex.Unlock()
}
acc.AddError(queryError)
}(serv, query)
}(pool, query, i)
}
}
@ -274,16 +265,40 @@ func (s *SQLServer) Gather(acc telegraf.Accumulator) error {
return nil
}
func (s *SQLServer) gatherServer(server string, query Query, acc telegraf.Accumulator) error {
// deferred opening
conn, err := sql.Open("mssql", server)
if err != nil {
// Start initialize a list of connection pools
func (s *SQLServer) Start(acc telegraf.Accumulator) error {
if err := initQueries(s); err != nil {
acc.AddError(err)
return err
}
defer conn.Close()
if len(s.Servers) == 0 {
s.Servers = append(s.Servers, defaultServer)
}
for _, serv := range s.Servers {
pool, err := sql.Open("mssql", serv)
if err != nil {
acc.AddError(err)
return err
}
s.pools = append(s.pools, pool)
}
return nil
}
// Stop cleanup server connection pools
func (s *SQLServer) Stop() {
for _, pool := range s.pools {
_ = pool.Close()
}
}
func (s *SQLServer) gatherServer(pool *sql.DB, query Query, acc telegraf.Accumulator) error {
// execute query
rows, err := conn.Query(query.Script)
rows, err := pool.Query(query.Script)
if err != nil {
return fmt.Errorf("Script %s failed: %w", query.ScriptName, err)
//return err

View File

@ -124,15 +124,13 @@ func TestSqlServer_MultipleInstanceIntegration(t *testing.T) {
}
var acc, acc2 testutil.Accumulator
require.NoError(t, s.Start(&acc))
err := s.Gather(&acc)
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, false)
require.NoError(t, s2.Start(&acc2))
err = s2.Gather(&acc2)
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, true)
// acc includes size metrics, and excludes memory metrics
assert.False(t, acc.HasMeasurement("Memory breakdown (%)"))
@ -141,6 +139,9 @@ func TestSqlServer_MultipleInstanceIntegration(t *testing.T) {
// acc2 includes memory metrics, and excludes size metrics
assert.True(t, acc2.HasMeasurement("Memory breakdown (%)"))
assert.False(t, acc2.HasMeasurement("Log size (bytes)"))
s.Stop()
s2.Stop()
}
func TestSqlServer_MultipleInstanceWithHealthMetricIntegration(t *testing.T) {
@ -162,15 +163,13 @@ func TestSqlServer_MultipleInstanceWithHealthMetricIntegration(t *testing.T) {
}
var acc, acc2 testutil.Accumulator
require.NoError(t, s.Start(&acc))
err := s.Gather(&acc)
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, false)
require.NoError(t, s2.Start(&acc))
err = s2.Gather(&acc2)
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, true)
// acc includes size metrics, and excludes memory metrics and the health metric
assert.False(t, acc.HasMeasurement(healthMetricName))
@ -186,6 +185,9 @@ func TestSqlServer_MultipleInstanceWithHealthMetricIntegration(t *testing.T) {
tags := map[string]string{healthMetricInstanceTag: sqlInstance, healthMetricDatabaseTag: database}
assert.True(t, acc2.HasPoint(healthMetricName, tags, healthMetricAttemptedQueries, 9))
assert.True(t, acc2.HasPoint(healthMetricName, tags, healthMetricSuccessfulQueries, 9))
s.Stop()
s2.Stop()
}
func TestSqlServer_HealthMetric(t *testing.T) {
@ -205,6 +207,7 @@ func TestSqlServer_HealthMetric(t *testing.T) {
// acc1 should have the health metric because it is specified in the config
var acc1 testutil.Accumulator
require.NoError(t, s1.Start(&acc1))
s1.Gather(&acc1)
assert.True(t, acc1.HasMeasurement(healthMetricName))
@ -222,8 +225,12 @@ func TestSqlServer_HealthMetric(t *testing.T) {
// acc2 should not have the health metric because it is not specified in the config
var acc2 testutil.Accumulator
require.NoError(t, s2.Start(&acc2))
s2.Gather(&acc2)
assert.False(t, acc2.HasMeasurement(healthMetricName))
s1.Stop()
s2.Stop()
}
func TestSqlServer_MultipleInit(t *testing.T) {
@ -236,15 +243,14 @@ func TestSqlServer_MultipleInit(t *testing.T) {
_, ok := s.queries["DatabaseSize"]
// acc includes size metrics
assert.True(t, ok)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, false)
initQueries(s2)
_, ok = s2.queries["DatabaseSize"]
// acc2 excludes size metrics
assert.False(t, ok)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, true)
s.Stop()
s2.Stop()
}
func TestSqlServer_ConnectionString(t *testing.T) {
@ -349,15 +355,13 @@ func TestSqlServer_AGQueriesApplicableForDatabaseTypeSQLServer(t *testing.T) {
}
var acc, acc2 testutil.Accumulator
require.NoError(t, s.Start(&acc))
err := s.Gather(&acc)
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, false)
err = s2.Gather(&acc2)
require.NoError(t, s2.Start(&acc))
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, true)
// acc includes size metrics, and excludes memory metrics
assert.True(t, acc.HasMeasurement("sqlserver_hadr_replica_states"))
@ -366,6 +370,9 @@ func TestSqlServer_AGQueriesApplicableForDatabaseTypeSQLServer(t *testing.T) {
// acc2 includes memory metrics, and excludes size metrics
assert.False(t, acc2.HasMeasurement("sqlserver_hadr_replica_states"))
assert.False(t, acc2.HasMeasurement("sqlserver_hadr_dbreplica_states"))
s.Stop()
s2.Stop()
}
func TestSqlServer_AGQueryFieldsOutputBasedOnSQLServerVersion(t *testing.T) {
@ -390,15 +397,13 @@ func TestSqlServer_AGQueryFieldsOutputBasedOnSQLServerVersion(t *testing.T) {
}
var acc2019, acc2012 testutil.Accumulator
require.NoError(t, s2019.Start(&acc2019))
err := s2019.Gather(&acc2019)
require.NoError(t, err)
assert.Equal(t, s2019.isInitialized, true)
assert.Equal(t, s2012.isInitialized, false)
err = s2012.Gather(&acc2012)
require.NoError(t, s2012.Start(&acc2012))
require.NoError(t, err)
assert.Equal(t, s2019.isInitialized, true)
assert.Equal(t, s2012.isInitialized, true)
// acc2019 includes new HADR query fields
assert.True(t, acc2019.HasField("sqlserver_hadr_replica_states", "basic_features"))
@ -415,6 +420,9 @@ func TestSqlServer_AGQueryFieldsOutputBasedOnSQLServerVersion(t *testing.T) {
assert.False(t, acc2012.HasTag("sqlserver_hadr_replica_states", "seeding_mode_desc"))
assert.False(t, acc2012.HasField("sqlserver_hadr_dbreplica_states", "is_primary_replica"))
assert.False(t, acc2012.HasField("sqlserver_hadr_dbreplica_states", "secondary_lag_seconds"))
s2019.Stop()
s2012.Stop()
}
const mockPerformanceMetrics = `measurement;servername;type;Point In Time Recovery;Available physical memory (bytes);Average pending disk IO;Average runnable tasks;Average tasks;Buffer pool rate (bytes/sec);Connection memory per connection (bytes);Memory grant pending;Page File Usage (%);Page lookup per batch request;Page split per batch request;Readahead per page read;Signal wait (%);Sql compilation per batch request;Sql recompilation per batch request;Total target memory ratio