From 96d6da63f2804631b97315cb67b271acfec9500a Mon Sep 17 00:00:00 2001 From: Nick Thomas Date: Wed, 24 Apr 2024 21:18:15 +0100 Subject: [PATCH] fix(http): Stop plugins from leaking file descriptors on telegraf reload (#15213) --- plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go | 3 +++ plugins/inputs/elasticsearch/elasticsearch.go | 10 ++++++++++ .../elasticsearch_query/elasticsearch_query.go | 10 ++++++++++ plugins/inputs/http/http.go | 10 ++++++++++ plugins/inputs/kibana/kibana.go | 10 ++++++++++ plugins/inputs/logstash/logstash.go | 10 ++++++++++ plugins/inputs/openstack/openstack.go | 12 ++++++++++-- plugins/inputs/prometheus/prometheus.go | 4 ++++ plugins/inputs/vault/vault.go | 10 ++++++++++ plugins/outputs/cloudwatch/cloudwatch.go | 10 +++++++++- plugins/outputs/http/http.go | 4 ++++ plugins/secretstores/http/http.go | 9 +++++++++ 12 files changed, 99 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go index 525a5c4bb..dd0c4e8c6 100644 --- a/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go +++ b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go @@ -353,6 +353,9 @@ func (c *CtrlXDataLayer) gatherLoop(ctx context.Context) { func (c *CtrlXDataLayer) Stop() { c.cancel() c.wg.Wait() + if c.connection != nil { + c.connection.CloseIdleConnections() + } } // Gather is called by telegraf to collect the metrics. diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index b3409291b..8a1265ee2 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -185,6 +185,10 @@ func (e *Elasticsearch) Init() error { return nil } +func (e *Elasticsearch) Start(_ telegraf.Accumulator) error { + return nil +} + // Gather reads the stats from Elasticsearch and writes it to the // Accumulator. func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { @@ -282,6 +286,12 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { return nil } +func (e *Elasticsearch) Stop() { + if e.client != nil { + e.client.CloseIdleConnections() + } +} + func (e *Elasticsearch) createHTTPClient() (*http.Client, error) { ctx := context.Background() if e.HTTPTimeout != 0 { diff --git a/plugins/inputs/elasticsearch_query/elasticsearch_query.go b/plugins/inputs/elasticsearch_query/elasticsearch_query.go index 8e2fc7cc2..74856d170 100644 --- a/plugins/inputs/elasticsearch_query/elasticsearch_query.go +++ b/plugins/inputs/elasticsearch_query/elasticsearch_query.go @@ -173,6 +173,10 @@ func (e *ElasticsearchQuery) connectToES() error { return nil } +func (e *ElasticsearchQuery) Start(_ telegraf.Accumulator) error { + return nil +} + // Gather writes the results of the queries from Elasticsearch to the Accumulator. func (e *ElasticsearchQuery) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup @@ -197,6 +201,12 @@ func (e *ElasticsearchQuery) Gather(acc telegraf.Accumulator) error { return nil } +func (e *ElasticsearchQuery) Stop() { + if e.httpclient != nil { + e.httpclient.CloseIdleConnections() + } +} + func (e *ElasticsearchQuery) createHTTPClient() (*http.Client, error) { ctx := context.Background() return e.HTTPClientConfig.CreateClient(ctx, e.Log) diff --git a/plugins/inputs/http/http.go b/plugins/inputs/http/http.go index 2c2024977..aca9e26b2 100644 --- a/plugins/inputs/http/http.go +++ b/plugins/inputs/http/http.go @@ -80,6 +80,10 @@ func (h *HTTP) Init() error { return nil } +func (h *HTTP) Start(_ telegraf.Accumulator) error { + return nil +} + // Gather takes in an accumulator and adds the metrics that the Input // gathers. This is called every "interval" func (h *HTTP) Gather(acc telegraf.Accumulator) error { @@ -99,6 +103,12 @@ func (h *HTTP) Gather(acc telegraf.Accumulator) error { return nil } +func (h *HTTP) Stop() { + if h.client != nil { + h.client.CloseIdleConnections() + } +} + // SetParserFunc takes the data_format from the config and finds the right parser for that format func (h *HTTP) SetParserFunc(fn telegraf.ParserFunc) { h.parserFunc = fn diff --git a/plugins/inputs/kibana/kibana.go b/plugins/inputs/kibana/kibana.go index c00a52101..5e1836934 100644 --- a/plugins/inputs/kibana/kibana.go +++ b/plugins/inputs/kibana/kibana.go @@ -123,6 +123,10 @@ func (*Kibana) SampleConfig() string { return sampleConfig } +func (k *Kibana) Start(_ telegraf.Accumulator) error { + return nil +} + func (k *Kibana) Gather(acc telegraf.Accumulator) error { if k.client == nil { client, err := k.createHTTPClient() @@ -150,6 +154,12 @@ func (k *Kibana) Gather(acc telegraf.Accumulator) error { return nil } +func (k *Kibana) Stop() { + if k.client != nil { + k.client.CloseIdleConnections() + } +} + func (k *Kibana) createHTTPClient() (*http.Client, error) { ctx := context.Background() return k.HTTPClientConfig.CreateClient(ctx, k.Log) diff --git a/plugins/inputs/logstash/logstash.go b/plugins/inputs/logstash/logstash.go index 05f009706..27134142e 100644 --- a/plugins/inputs/logstash/logstash.go +++ b/plugins/inputs/logstash/logstash.go @@ -454,6 +454,10 @@ func (logstash *Logstash) gatherPipelinesStats(address string, accumulator teleg return nil } +func (logstash *Logstash) Start(_ telegraf.Accumulator) error { + return nil +} + // Gather ask this plugin to start gathering metrics func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { if logstash.client == nil { @@ -508,6 +512,12 @@ func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { return nil } +func (logstash *Logstash) Stop() { + if logstash.client != nil { + logstash.client.CloseIdleConnections() + } +} + // init registers this plugin instance func init() { inputs.Add("logstash", func() telegraf.Input { diff --git a/plugins/inputs/openstack/openstack.go b/plugins/inputs/openstack/openstack.go index e1d89fabe..1234a6116 100644 --- a/plugins/inputs/openstack/openstack.go +++ b/plugins/inputs/openstack/openstack.go @@ -15,6 +15,7 @@ import ( _ "embed" "errors" "fmt" + "net/http" "regexp" "sort" "strconv" @@ -79,6 +80,8 @@ type OpenStack struct { Log telegraf.Logger `toml:"-"` httpconfig.HTTPClientConfig + client *http.Client + // Locally cached clients identity *gophercloud.ServiceClient compute *gophercloud.ServiceClient @@ -153,7 +156,8 @@ func (o *OpenStack) Start(_ telegraf.Accumulator) error { return err } - provider.HTTPClient = *client + o.client = client + provider.HTTPClient = *o.client // Authenticate to the endpoint authOption := gophercloud.AuthOptions{ @@ -226,7 +230,11 @@ func (o *OpenStack) Start(_ telegraf.Accumulator) error { return nil } -func (o *OpenStack) Stop() {} +func (o *OpenStack) Stop() { + if o.client != nil { + o.client.CloseIdleConnections() + } +} // Gather gathers resources from the OpenStack API and accumulates metrics. This // implements the Input interface. diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index ee01425a1..57f806655 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -588,6 +588,10 @@ func (p *Prometheus) Start(_ telegraf.Accumulator) error { func (p *Prometheus) Stop() { p.cancel() p.wg.Wait() + + if p.client != nil { + p.client.CloseIdleConnections() + } } func init() { diff --git a/plugins/inputs/vault/vault.go b/plugins/inputs/vault/vault.go index 3ecdeb8cd..f80b3d6b4 100644 --- a/plugins/inputs/vault/vault.go +++ b/plugins/inputs/vault/vault.go @@ -70,6 +70,10 @@ func (n *Vault) Init() error { return nil } +func (n *Vault) Start(_ telegraf.Accumulator) error { + return nil +} + // Gather, collects metrics from Vault endpoint func (n *Vault) Gather(acc telegraf.Accumulator) error { sysMetrics, err := n.loadJSON(n.URL + "/v1/sys/metrics") @@ -80,6 +84,12 @@ func (n *Vault) Gather(acc telegraf.Accumulator) error { return buildVaultMetrics(acc, sysMetrics) } +func (n *Vault) Stop() { + if n.client != nil { + n.client.CloseIdleConnections() + } +} + func (n *Vault) loadJSON(url string) (*SysMetrics, error) { req, err := http.NewRequest("GET", url, nil) if err != nil { diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index d53b7ea1f..23bff0a24 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -5,6 +5,7 @@ import ( "context" _ "embed" "math" + "net/http" "sort" "strings" "time" @@ -30,6 +31,7 @@ type CloudWatch struct { Log telegraf.Logger `toml:"-"` internalaws.CredentialConfig httpconfig.HTTPClientConfig + client *http.Client } type statisticType int @@ -170,14 +172,20 @@ func (c *CloudWatch) Connect() error { return err } + c.client = client + c.svc = cloudwatch.NewFromConfig(cfg, func(options *cloudwatch.Options) { - options.HTTPClient = client + options.HTTPClient = c.client }) return nil } func (c *CloudWatch) Close() error { + if c.client != nil { + c.client.CloseIdleConnections() + } + return nil } diff --git a/plugins/outputs/http/http.go b/plugins/outputs/http/http.go index a6cdb1a8a..8e15020d1 100644 --- a/plugins/outputs/http/http.go +++ b/plugins/outputs/http/http.go @@ -102,6 +102,10 @@ func (h *HTTP) Connect() error { } func (h *HTTP) Close() error { + if h.client != nil { + h.client.CloseIdleConnections() + } + return nil } diff --git a/plugins/secretstores/http/http.go b/plugins/secretstores/http/http.go index 989193041..47e49ea82 100644 --- a/plugins/secretstores/http/http.go +++ b/plugins/secretstores/http/http.go @@ -11,6 +11,7 @@ import ( "io" "net/http" "strings" + "time" "github.com/blues/jsonata-go" @@ -23,6 +24,8 @@ import ( //go:embed sample.conf var sampleConfig string +const defaultIdleConnTimeoutMinutes = 5 + type HTTP struct { URL string `toml:"url"` Headers map[string]string `toml:"headers"` @@ -47,6 +50,12 @@ func (h *HTTP) SampleConfig() string { func (h *HTTP) Init() error { ctx := context.Background() + + // Prevent idle connections from hanging around forever on telegraf reload + if h.HTTPClientConfig.IdleConnTimeout == 0 { + h.HTTPClientConfig.IdleConnTimeout = config.Duration(defaultIdleConnTimeoutMinutes * time.Minute) + } + client, err := h.HTTPClientConfig.CreateClient(ctx, h.Log) if err != nil { return err