feat: Use HTTPClientConfig struct in elastic stack plugins (#14207)

This commit is contained in:
Óscar Erades 2023-11-13 23:53:36 +01:00 committed by GitHub
parent fd65ce80d8
commit 59f53c0302
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 121 additions and 78 deletions

View File

@ -47,6 +47,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
servers = ["http://localhost:9200"] servers = ["http://localhost:9200"]
## Timeout for HTTP requests to the elastic search server(s) ## Timeout for HTTP requests to the elastic search server(s)
## deprecated in 1.29.0; use 'timeout' instead
http_timeout = "5s" http_timeout = "5s"
## When local is true (the default), the node will read only its own stats. ## When local is true (the default), the node will read only its own stats.
@ -94,6 +95,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
## provided, Telegraf will use the specified URL as HTTP proxy.
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"
## Sets the number of most recent indices to return for indices that are ## Sets the number of most recent indices to return for indices that are
## configured with a date-stamped suffix. Each 'indices_include' entry ## configured with a date-stamped suffix. Each 'indices_include' entry

View File

@ -2,6 +2,7 @@
package elasticsearch package elasticsearch
import ( import (
"context"
_ "embed" _ "embed"
"encoding/json" "encoding/json"
"errors" "errors"
@ -17,7 +18,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/common/tls" httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
) )
@ -97,7 +98,7 @@ type indexStat struct {
type Elasticsearch struct { type Elasticsearch struct {
Local bool `toml:"local"` Local bool `toml:"local"`
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
HTTPTimeout config.Duration `toml:"http_timeout"` HTTPTimeout config.Duration `toml:"http_timeout" deprecated:"1.29.0;use 'timeout' instead"`
ClusterHealth bool `toml:"cluster_health"` ClusterHealth bool `toml:"cluster_health"`
ClusterHealthLevel string `toml:"cluster_health_level"` ClusterHealthLevel string `toml:"cluster_health_level"`
ClusterStats bool `toml:"cluster_stats"` ClusterStats bool `toml:"cluster_stats"`
@ -109,9 +110,11 @@ type Elasticsearch struct {
Password string `toml:"password"` Password string `toml:"password"`
NumMostRecentIndices int `toml:"num_most_recent_indices"` NumMostRecentIndices int `toml:"num_most_recent_indices"`
tls.ClientConfig Log telegraf.Logger `toml:"-"`
client *http.Client
httpconfig.HTTPClientConfig
client *http.Client
serverInfo map[string]serverInfo serverInfo map[string]serverInfo
serverInfoMutex sync.Mutex serverInfoMutex sync.Mutex
indexMatchers map[string]filter.Filter indexMatchers map[string]filter.Filter
@ -128,9 +131,12 @@ func (i serverInfo) isMaster() bool {
// NewElasticsearch return a new instance of Elasticsearch // NewElasticsearch return a new instance of Elasticsearch
func NewElasticsearch() *Elasticsearch { func NewElasticsearch() *Elasticsearch {
return &Elasticsearch{ return &Elasticsearch{
HTTPTimeout: config.Duration(time.Second * 5),
ClusterStatsOnlyFromMaster: true, ClusterStatsOnlyFromMaster: true,
ClusterHealthLevel: "indices", ClusterHealthLevel: "indices",
HTTPClientConfig: httpconfig.HTTPClientConfig{
ResponseHeaderTimeout: config.Duration(5 * time.Second),
Timeout: config.Duration(5 * time.Second),
},
} }
} }
@ -277,20 +283,12 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
} }
func (e *Elasticsearch) createHTTPClient() (*http.Client, error) { func (e *Elasticsearch) createHTTPClient() (*http.Client, error) {
tlsCfg, err := e.ClientConfig.TLSConfig() ctx := context.Background()
if err != nil { if e.HTTPTimeout != 0 {
return nil, err e.HTTPClientConfig.Timeout = e.HTTPTimeout
e.HTTPClientConfig.ResponseHeaderTimeout = e.HTTPTimeout
} }
tr := &http.Transport{ return e.HTTPClientConfig.CreateClient(ctx, e.Log)
ResponseHeaderTimeout: time.Duration(e.HTTPTimeout),
TLSClientConfig: tlsCfg,
}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(e.HTTPTimeout),
}
return client, nil
} }
func (e *Elasticsearch) nodeStatsURL(baseURL string) string { func (e *Elasticsearch) nodeStatsURL(baseURL string) string {

View File

@ -6,6 +6,7 @@
servers = ["http://localhost:9200"] servers = ["http://localhost:9200"]
## Timeout for HTTP requests to the elastic search server(s) ## Timeout for HTTP requests to the elastic search server(s)
## deprecated in 1.29.0; use 'timeout' instead
http_timeout = "5s" http_timeout = "5s"
## When local is true (the default), the node will read only its own stats. ## When local is true (the default), the node will read only its own stats.
@ -53,6 +54,13 @@
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
## provided, Telegraf will use the specified URL as HTTP proxy.
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"
## Sets the number of most recent indices to return for indices that are ## Sets the number of most recent indices to return for indices that are
## configured with a date-stamped suffix. Each 'indices_include' entry ## configured with a date-stamped suffix. Each 'indices_include' entry

View File

@ -55,6 +55,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
## provided, Telegraf will use the specified URL as HTTP proxy.
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"
[[inputs.elasticsearch_query.aggregation]] [[inputs.elasticsearch_query.aggregation]]
## measurement name for the results of the aggregation query ## measurement name for the results of the aggregation query

View File

@ -15,7 +15,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls" httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -28,15 +28,15 @@ type ElasticsearchQuery struct {
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
EnableSniffer bool `toml:"enable_sniffer"` EnableSniffer bool `toml:"enable_sniffer"`
Timeout config.Duration `toml:"timeout"`
HealthCheckInterval config.Duration `toml:"health_check_interval"` HealthCheckInterval config.Duration `toml:"health_check_interval"`
Aggregations []esAggregation `toml:"aggregation"` Aggregations []esAggregation `toml:"aggregation"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
tls.ClientConfig
httpclient *http.Client httpclient *http.Client
esClient *elastic5.Client httpconfig.HTTPClientConfig
esClient *elastic5.Client
} }
// esAggregation struct // esAggregation struct
@ -197,20 +197,8 @@ func (e *ElasticsearchQuery) Gather(acc telegraf.Accumulator) error {
} }
func (e *ElasticsearchQuery) createHTTPClient() (*http.Client, error) { func (e *ElasticsearchQuery) createHTTPClient() (*http.Client, error) {
tlsCfg, err := e.ClientConfig.TLSConfig() ctx := context.Background()
if err != nil { return e.HTTPClientConfig.CreateClient(ctx, e.Log)
return nil, err
}
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(e.Timeout),
TLSClientConfig: tlsCfg,
}
httpclient := &http.Client{
Transport: tr,
Timeout: time.Duration(e.Timeout),
}
return httpclient, nil
} }
func (e *ElasticsearchQuery) esAggregationQuery(acc telegraf.Accumulator, aggregation esAggregation, i int) error { func (e *ElasticsearchQuery) esAggregationQuery(acc telegraf.Accumulator, aggregation esAggregation, i int) error {
@ -242,8 +230,11 @@ func (e *ElasticsearchQuery) esAggregationQuery(acc telegraf.Accumulator, aggreg
func init() { func init() {
inputs.Add("elasticsearch_query", func() telegraf.Input { inputs.Add("elasticsearch_query", func() telegraf.Input {
return &ElasticsearchQuery{ return &ElasticsearchQuery{
Timeout: config.Duration(time.Second * 5),
HealthCheckInterval: config.Duration(time.Second * 10), HealthCheckInterval: config.Duration(time.Second * 10),
HTTPClientConfig: httpconfig.HTTPClientConfig{
ResponseHeaderTimeout: config.Duration(5 * time.Second),
Timeout: config.Duration(5 * time.Second),
},
} }
}) })
} }

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -536,9 +537,12 @@ func setupIntegrationTest(t *testing.T) (*testutil.Container, error) {
"http://%s:%s", container.Address, container.Ports[servicePort], "http://%s:%s", container.Address, container.Ports[servicePort],
) )
e := &ElasticsearchQuery{ e := &ElasticsearchQuery{
URLs: []string{url}, URLs: []string{url},
Timeout: config.Duration(time.Second * 30), HTTPClientConfig: httpconfig.HTTPClientConfig{
Log: testutil.Logger{}, ResponseHeaderTimeout: config.Duration(30 * time.Second),
Timeout: config.Duration(30 * time.Second),
},
Log: testutil.Logger{},
} }
err = e.connectToES() err = e.connectToES()
@ -612,8 +616,11 @@ func TestElasticsearchQueryIntegration(t *testing.T) {
URLs: []string{ URLs: []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}, },
Timeout: config.Duration(time.Second * 30), HTTPClientConfig: httpconfig.HTTPClientConfig{
Log: testutil.Logger{}, ResponseHeaderTimeout: config.Duration(30 * time.Second),
Timeout: config.Duration(30 * time.Second),
},
Log: testutil.Logger{},
} }
err = e.connectToES() err = e.connectToES()
@ -675,8 +682,11 @@ func TestElasticsearchQueryIntegration_getMetricFields(t *testing.T) {
URLs: []string{ URLs: []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}, },
Timeout: config.Duration(time.Second * 30), HTTPClientConfig: httpconfig.HTTPClientConfig{
Log: testutil.Logger{}, ResponseHeaderTimeout: config.Duration(30 * time.Second),
Timeout: config.Duration(30 * time.Second),
},
Log: testutil.Logger{},
} }
err = e.connectToES() err = e.connectToES()

View File

@ -26,6 +26,13 @@
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
## provided, Telegraf will use the specified URL as HTTP proxy.
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"
[[inputs.elasticsearch_query.aggregation]] [[inputs.elasticsearch_query.aggregation]]
## measurement name for the results of the aggregation query ## measurement name for the results of the aggregation query

View File

@ -37,6 +37,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
## provided, Telegraf will use the specified URL as HTTP proxy.
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"
``` ```
## Metrics ## Metrics

View File

@ -2,6 +2,7 @@
package kibana package kibana
import ( import (
"context"
_ "embed" _ "embed"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -14,7 +15,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls" httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -90,15 +91,18 @@ type Kibana struct {
Servers []string Servers []string
Username string Username string
Password string Password string
Timeout config.Duration
tls.ClientConfig Log telegraf.Logger `toml:"-"`
client *http.Client client *http.Client
httpconfig.HTTPClientConfig
} }
func NewKibana() *Kibana { func NewKibana() *Kibana {
return &Kibana{ return &Kibana{
Timeout: config.Duration(time.Second * 5), HTTPClientConfig: httpconfig.HTTPClientConfig{
Timeout: config.Duration(5 * time.Second),
},
} }
} }
@ -147,19 +151,8 @@ func (k *Kibana) Gather(acc telegraf.Accumulator) error {
} }
func (k *Kibana) createHTTPClient() (*http.Client, error) { func (k *Kibana) createHTTPClient() (*http.Client, error) {
tlsCfg, err := k.ClientConfig.TLSConfig() ctx := context.Background()
if err != nil { return k.HTTPClientConfig.CreateClient(ctx, k.Log)
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: time.Duration(k.Timeout),
}
return client, nil
} }
func (k *Kibana) gatherKibanaStatus(baseURL string, acc telegraf.Accumulator) error { func (k *Kibana) gatherKibanaStatus(baseURL string, acc telegraf.Accumulator) error {

View File

@ -16,3 +16,10 @@
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
## provided, Telegraf will use the specified URL as HTTP proxy.
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"

View File

@ -44,6 +44,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Use TLS but skip chain & host verification. ## Use TLS but skip chain & host verification.
# insecure_skip_verify = false # insecure_skip_verify = false
## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
## provided, Telegraf will use the specified URL as HTTP proxy.
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"
## Optional HTTP headers. ## Optional HTTP headers.
# [inputs.logstash.headers] # [inputs.logstash.headers]

View File

@ -2,6 +2,7 @@
package logstash package logstash
import ( import (
"context"
_ "embed" _ "embed"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -14,7 +15,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/common/tls" httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
jsonParser "github.com/influxdata/telegraf/plugins/parsers/json" jsonParser "github.com/influxdata/telegraf/plugins/parsers/json"
) )
@ -31,10 +32,11 @@ type Logstash struct {
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
Headers map[string]string `toml:"headers"` Headers map[string]string `toml:"headers"`
Timeout config.Duration `toml:"timeout"`
tls.ClientConfig Log telegraf.Logger `toml:"-"`
client *http.Client client *http.Client
httpconfig.HTTPClientConfig
} }
// NewLogstash create an instance of the plugin with default settings // NewLogstash create an instance of the plugin with default settings
@ -44,7 +46,9 @@ func NewLogstash() *Logstash {
SinglePipeline: false, SinglePipeline: false,
Collect: []string{"pipelines", "process", "jvm"}, Collect: []string{"pipelines", "process", "jvm"},
Headers: make(map[string]string), Headers: make(map[string]string),
Timeout: config.Duration(time.Second * 5), HTTPClientConfig: httpconfig.HTTPClientConfig{
Timeout: config.Duration(5 * time.Second),
},
} }
} }
@ -131,19 +135,8 @@ func (logstash *Logstash) Init() error {
// createHTTPClient create a clients to access API // createHTTPClient create a clients to access API
func (logstash *Logstash) createHTTPClient() (*http.Client, error) { func (logstash *Logstash) createHTTPClient() (*http.Client, error) {
tlsConfig, err := logstash.ClientConfig.TLSConfig() ctx := context.Background()
if err != nil { return logstash.HTTPClientConfig.CreateClient(ctx, logstash.Log)
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
Timeout: time.Duration(logstash.Timeout),
}
return client, nil
} }
// gatherJSONData query the data source and parse the response JSON // gatherJSONData query the data source and parse the response JSON

View File

@ -25,6 +25,13 @@
## Use TLS but skip chain & host verification. ## Use TLS but skip chain & host verification.
# insecure_skip_verify = false # insecure_skip_verify = false
## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is
## provided, Telegraf will use the specified URL as HTTP proxy.
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"
## Optional HTTP headers. ## Optional HTTP headers.
# [inputs.logstash.headers] # [inputs.logstash.headers]