feat(inputs.prometheus): use system wide proxy settings (#11729)

This commit is contained in:
erwiese 2022-09-13 19:48:13 +02:00 committed by GitHub
parent d67f75e557
commit 215e8e030a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 53 additions and 42 deletions

View File

@ -91,6 +91,10 @@ in Prometheus format.
## Specify timeout duration for slower prometheus clients (default is 3s)
# response_timeout = "3s"
## HTTP Proxy support
# use_system_proxy = false
# http_proxy_url = ""
## Optional TLS Config
# tls_ca = /path/to/cafile

View File

@ -21,7 +21,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs"
parserV2 "github.com/influxdata/telegraf/plugins/parsers/prometheus"
)
@ -66,10 +66,10 @@ type Prometheus struct {
IgnoreTimestamp bool `toml:"ignore_timestamp"`
tls.ClientConfig
Log telegraf.Logger
httpconfig.HTTPClientConfig
client *http.Client
headers map[string]string
@ -138,6 +138,17 @@ func (p *Prometheus) Init() error {
p.Log.Infof("Using the label selector: %v and field selector: %v", p.podLabelSelector, p.podFieldSelector)
}
ctx := context.Background()
client, err := p.HTTPClientConfig.CreateClient(ctx, p.Log)
if err != nil {
return err
}
p.client = client
p.headers = map[string]string{
"User-Agent": internal.ProductToken(),
"Accept": acceptHeader,
}
return nil
}
@ -215,18 +226,6 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
if p.client == nil {
client, err := p.createHTTPClient()
if err != nil {
return err
}
p.client = client
p.headers = map[string]string{
"User-Agent": internal.ProductToken(),
"Accept": acceptHeader,
}
}
var wg sync.WaitGroup
allURLs, err := p.GetAllURLs()
@ -246,23 +245,6 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
return nil
}
func (p *Prometheus) createHTTPClient() (*http.Client, error) {
tlsCfg, err := p.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
},
Timeout: time.Duration(p.ResponseTimeout),
}
return client, nil
}
func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error {
var req *http.Request
var err error
@ -280,7 +262,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
}
// ignore error because it's been handled before getting here
tlsCfg, _ := p.ClientConfig.TLSConfig()
tlsCfg, _ := p.HTTPClientConfig.TLSConfig()
uClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,

View File

@ -60,10 +60,12 @@ func TestPrometheusGeneratesMetrics(t *testing.T) {
URLs: []string{ts.URL},
URLTag: "url",
}
err := p.Init()
require.NoError(t, err)
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
err = acc.GatherError(p.Gather)
require.NoError(t, err)
require.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
@ -86,12 +88,15 @@ func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) {
KubernetesServices: []string{ts.URL},
URLTag: "url",
}
err := p.Init()
require.NoError(t, err)
u, _ := url.Parse(ts.URL)
tsAddress := u.Hostname()
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
err = acc.GatherError(p.Gather)
require.NoError(t, err)
require.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
@ -118,10 +123,12 @@ func TestPrometheusGeneratesMetricsAlthoughFirstDNSFailsIntegration(t *testing.T
URLs: []string{ts.URL},
KubernetesServices: []string{"http://random.telegraf.local:88/metrics"},
}
err := p.Init()
require.NoError(t, err)
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
err = acc.GatherError(p.Gather)
require.NoError(t, err)
require.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
@ -142,10 +149,12 @@ func TestPrometheusGeneratesSummaryMetricsV2(t *testing.T) {
URLTag: "url",
MetricVersion: 2,
}
err := p.Init()
require.NoError(t, err)
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
err = acc.GatherError(p.Gather)
require.NoError(t, err)
require.True(t, acc.TagSetValue("prometheus", "quantile") == "0")
@ -173,10 +182,12 @@ go_gc_duration_seconds_count 42`
URLTag: "",
MetricVersion: 2,
}
err := p.Init()
require.NoError(t, err)
var acc testutil.Accumulator
err := p.Gather(&acc)
err = p.Gather(&acc)
require.NoError(t, err)
expected := []telegraf.Metric{
@ -230,10 +241,12 @@ func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) {
URLTag: "url",
MetricVersion: 2,
}
err := p.Init()
require.NoError(t, err)
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
err = acc.GatherError(p.Gather)
require.NoError(t, err)
require.True(t, acc.HasFloatField("prometheus", "go_goroutines"))
@ -254,10 +267,12 @@ func TestPrometheusGeneratesMetricsWithIgnoreTimestamp(t *testing.T) {
URLTag: "url",
IgnoreTimestamp: true,
}
err := p.Init()
require.NoError(t, err)
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
err = acc.GatherError(p.Gather)
require.NoError(t, err)
m, _ := acc.Get("test_metric")

View File

@ -83,6 +83,10 @@
## Specify timeout duration for slower prometheus clients (default is 3s)
# response_timeout = "3s"
## HTTP Proxy support
# use_system_proxy = false
# http_proxy_url = ""
## Optional TLS Config
# tls_ca = /path/to/cafile

View File

@ -364,8 +364,11 @@ rpc_duration_seconds_count 2693
URLTag: "",
MetricVersion: 1,
}
err := input.Init()
require.NoError(t, err)
var acc testutil.Accumulator
err := input.Start(&acc)
err = input.Start(&acc)
require.NoError(t, err)
err = input.Gather(&acc)
require.NoError(t, err)

View File

@ -424,8 +424,11 @@ rpc_duration_seconds_count 2693
URLTag: "",
MetricVersion: 2,
}
err := input.Init()
require.NoError(t, err)
var acc testutil.Accumulator
err := input.Start(&acc)
err = input.Start(&acc)
require.NoError(t, err)
err = input.Gather(&acc)
require.NoError(t, err)