fix(http): Stop plugins from leaking file descriptors on telegraf reload (#15213)

This commit is contained in:
Nick Thomas 2024-04-24 21:18:15 +01:00 committed by GitHub
parent 1e57608a21
commit 96d6da63f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 99 additions and 3 deletions

View File

@ -353,6 +353,9 @@ func (c *CtrlXDataLayer) gatherLoop(ctx context.Context) {
func (c *CtrlXDataLayer) Stop() {
c.cancel()
c.wg.Wait()
if c.connection != nil {
c.connection.CloseIdleConnections()
}
}
// Gather is called by telegraf to collect the metrics.

View File

@ -185,6 +185,10 @@ func (e *Elasticsearch) Init() error {
return nil
}
func (e *Elasticsearch) Start(_ telegraf.Accumulator) error {
return nil
}
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
@ -282,6 +286,12 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
return nil
}
func (e *Elasticsearch) Stop() {
if e.client != nil {
e.client.CloseIdleConnections()
}
}
func (e *Elasticsearch) createHTTPClient() (*http.Client, error) {
ctx := context.Background()
if e.HTTPTimeout != 0 {

View File

@ -173,6 +173,10 @@ func (e *ElasticsearchQuery) connectToES() error {
return nil
}
func (e *ElasticsearchQuery) Start(_ telegraf.Accumulator) error {
return nil
}
// Gather writes the results of the queries from Elasticsearch to the Accumulator.
func (e *ElasticsearchQuery) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
@ -197,6 +201,12 @@ func (e *ElasticsearchQuery) Gather(acc telegraf.Accumulator) error {
return nil
}
func (e *ElasticsearchQuery) Stop() {
if e.httpclient != nil {
e.httpclient.CloseIdleConnections()
}
}
func (e *ElasticsearchQuery) createHTTPClient() (*http.Client, error) {
ctx := context.Background()
return e.HTTPClientConfig.CreateClient(ctx, e.Log)

View File

@ -80,6 +80,10 @@ func (h *HTTP) Init() error {
return nil
}
func (h *HTTP) Start(_ telegraf.Accumulator) error {
return nil
}
// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
@ -99,6 +103,12 @@ func (h *HTTP) Gather(acc telegraf.Accumulator) error {
return nil
}
func (h *HTTP) Stop() {
if h.client != nil {
h.client.CloseIdleConnections()
}
}
// SetParserFunc takes the data_format from the config and finds the right parser for that format
func (h *HTTP) SetParserFunc(fn telegraf.ParserFunc) {
h.parserFunc = fn

View File

@ -123,6 +123,10 @@ func (*Kibana) SampleConfig() string {
return sampleConfig
}
func (k *Kibana) Start(_ telegraf.Accumulator) error {
return nil
}
func (k *Kibana) Gather(acc telegraf.Accumulator) error {
if k.client == nil {
client, err := k.createHTTPClient()
@ -150,6 +154,12 @@ func (k *Kibana) Gather(acc telegraf.Accumulator) error {
return nil
}
func (k *Kibana) Stop() {
if k.client != nil {
k.client.CloseIdleConnections()
}
}
func (k *Kibana) createHTTPClient() (*http.Client, error) {
ctx := context.Background()
return k.HTTPClientConfig.CreateClient(ctx, k.Log)

View File

@ -454,6 +454,10 @@ func (logstash *Logstash) gatherPipelinesStats(address string, accumulator teleg
return nil
}
func (logstash *Logstash) Start(_ telegraf.Accumulator) error {
return nil
}
// Gather ask this plugin to start gathering metrics
func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
if logstash.client == nil {
@ -508,6 +512,12 @@ func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
return nil
}
func (logstash *Logstash) Stop() {
if logstash.client != nil {
logstash.client.CloseIdleConnections()
}
}
// init registers this plugin instance
func init() {
inputs.Add("logstash", func() telegraf.Input {

View File

@ -15,6 +15,7 @@ import (
_ "embed"
"errors"
"fmt"
"net/http"
"regexp"
"sort"
"strconv"
@ -79,6 +80,8 @@ type OpenStack struct {
Log telegraf.Logger `toml:"-"`
httpconfig.HTTPClientConfig
client *http.Client
// Locally cached clients
identity *gophercloud.ServiceClient
compute *gophercloud.ServiceClient
@ -153,7 +156,8 @@ func (o *OpenStack) Start(_ telegraf.Accumulator) error {
return err
}
provider.HTTPClient = *client
o.client = client
provider.HTTPClient = *o.client
// Authenticate to the endpoint
authOption := gophercloud.AuthOptions{
@ -226,7 +230,11 @@ func (o *OpenStack) Start(_ telegraf.Accumulator) error {
return nil
}
func (o *OpenStack) Stop() {}
func (o *OpenStack) Stop() {
if o.client != nil {
o.client.CloseIdleConnections()
}
}
// Gather gathers resources from the OpenStack API and accumulates metrics. This
// implements the Input interface.

View File

@ -588,6 +588,10 @@ func (p *Prometheus) Start(_ telegraf.Accumulator) error {
func (p *Prometheus) Stop() {
p.cancel()
p.wg.Wait()
if p.client != nil {
p.client.CloseIdleConnections()
}
}
func init() {

View File

@ -70,6 +70,10 @@ func (n *Vault) Init() error {
return nil
}
func (n *Vault) Start(_ telegraf.Accumulator) error {
return nil
}
// Gather, collects metrics from Vault endpoint
func (n *Vault) Gather(acc telegraf.Accumulator) error {
sysMetrics, err := n.loadJSON(n.URL + "/v1/sys/metrics")
@ -80,6 +84,12 @@ func (n *Vault) Gather(acc telegraf.Accumulator) error {
return buildVaultMetrics(acc, sysMetrics)
}
func (n *Vault) Stop() {
if n.client != nil {
n.client.CloseIdleConnections()
}
}
func (n *Vault) loadJSON(url string) (*SysMetrics, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {

View File

@ -5,6 +5,7 @@ import (
"context"
_ "embed"
"math"
"net/http"
"sort"
"strings"
"time"
@ -30,6 +31,7 @@ type CloudWatch struct {
Log telegraf.Logger `toml:"-"`
internalaws.CredentialConfig
httpconfig.HTTPClientConfig
client *http.Client
}
type statisticType int
@ -170,14 +172,20 @@ func (c *CloudWatch) Connect() error {
return err
}
c.client = client
c.svc = cloudwatch.NewFromConfig(cfg, func(options *cloudwatch.Options) {
options.HTTPClient = client
options.HTTPClient = c.client
})
return nil
}
func (c *CloudWatch) Close() error {
if c.client != nil {
c.client.CloseIdleConnections()
}
return nil
}

View File

@ -102,6 +102,10 @@ func (h *HTTP) Connect() error {
}
func (h *HTTP) Close() error {
if h.client != nil {
h.client.CloseIdleConnections()
}
return nil
}

View File

@ -11,6 +11,7 @@ import (
"io"
"net/http"
"strings"
"time"
"github.com/blues/jsonata-go"
@ -23,6 +24,8 @@ import (
//go:embed sample.conf
var sampleConfig string
const defaultIdleConnTimeoutMinutes = 5
type HTTP struct {
URL string `toml:"url"`
Headers map[string]string `toml:"headers"`
@ -47,6 +50,12 @@ func (h *HTTP) SampleConfig() string {
func (h *HTTP) Init() error {
ctx := context.Background()
// Prevent idle connections from hanging around forever on telegraf reload
if h.HTTPClientConfig.IdleConnTimeout == 0 {
h.HTTPClientConfig.IdleConnTimeout = config.Duration(defaultIdleConnTimeoutMinutes * time.Minute)
}
client, err := h.HTTPClientConfig.CreateClient(ctx, h.Log)
if err != nil {
return err