diff --git a/plugins/common/proxy/connect.go b/plugins/common/proxy/connect.go new file mode 100644 index 000000000..6c95ddf95 --- /dev/null +++ b/plugins/common/proxy/connect.go @@ -0,0 +1,140 @@ +package proxy + +import ( + "bufio" + "context" + "fmt" + "net" + "net/http" + "net/url" + + netProxy "golang.org/x/net/proxy" +) + +// httpConnectProxy proxies (only?) TCP over a HTTP tunnel using the CONNECT method +type httpConnectProxy struct { + forward netProxy.Dialer + url *url.URL +} + +func (c *httpConnectProxy) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + // Prevent using UDP + if network == "udp" { + return nil, fmt.Errorf("cannot proxy %q traffic over HTTP CONNECT", network) + } + + var proxyConn net.Conn + var err error + if dialer, ok := c.forward.(netProxy.ContextDialer); ok { + proxyConn, err = dialer.DialContext(ctx, "tcp", c.url.Host) + } else { + shim := contextDialerShim{c.forward} + proxyConn, err = shim.DialContext(ctx, "tcp", c.url.Host) + } + if err != nil { + return nil, err + } + + // Add and strip http:// to extract authority portion of the URL + // since CONNECT doesn't use a full URL. The request header would + // look something like: "CONNECT www.influxdata.com:443 HTTP/1.1" + requestURL, err := url.Parse("http://" + addr) + if err != nil { + if err := proxyConn.Close(); err != nil { + return nil, err + } + return nil, err + } + requestURL.Scheme = "" + + // Build HTTP CONNECT request + req, err := http.NewRequest(http.MethodConnect, requestURL.String(), nil) + if err != nil { + if err := proxyConn.Close(); err != nil { + return nil, err + } + return nil, err + } + req.Close = false + if password, hasAuth := c.url.User.Password(); hasAuth { + req.SetBasicAuth(c.url.User.Username(), password) + } + + err = req.Write(proxyConn) + if err != nil { + if err := proxyConn.Close(); err != nil { + return nil, err + } + return nil, err + } + + resp, err := http.ReadResponse(bufio.NewReader(proxyConn), req) + if err != nil { + if err := proxyConn.Close(); err != nil { + return nil, err + } + return nil, err + } + if err := resp.Body.Close(); err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + if err := proxyConn.Close(); err != nil { + return nil, err + } + return nil, fmt.Errorf("failed to connect to proxy: %q", resp.Status) + } + + return proxyConn, nil +} + +func (c *httpConnectProxy) Dial(network, addr string) (net.Conn, error) { + return c.DialContext(context.Background(), network, addr) +} + +func newHTTPConnectProxy(proxyURL *url.URL, forward netProxy.Dialer) (netProxy.Dialer, error) { + return &httpConnectProxy{forward, proxyURL}, nil +} + +func init() { + // Register new proxy types + netProxy.RegisterDialerType("http", newHTTPConnectProxy) + netProxy.RegisterDialerType("https", newHTTPConnectProxy) +} + +// contextDialerShim allows cancellation of the dial from a context even if the underlying +// dialer does not implement `proxy.ContextDialer`. Arguably, this shouldn't actually get run, +// unless a new proxy type is added that doesn't implement `proxy.ContextDialer`, as all the +// standard library dialers implement `proxy.ContextDialer`. +type contextDialerShim struct { + dialer netProxy.Dialer +} + +func (cd *contextDialerShim) Dial(network, addr string) (net.Conn, error) { + return cd.dialer.Dial(network, addr) +} + +func (cd *contextDialerShim) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + var ( + conn net.Conn + done = make(chan struct{}, 1) + err error + ) + + go func() { + conn, err = cd.dialer.Dial(network, addr) + close(done) + if conn != nil && ctx.Err() != nil { + _ = conn.Close() + } + }() + + select { + case <-ctx.Done(): + err = ctx.Err() + case <-done: + } + + return conn, err +} diff --git a/plugins/common/proxy/dialer.go b/plugins/common/proxy/dialer.go new file mode 100644 index 000000000..844d12ac7 --- /dev/null +++ b/plugins/common/proxy/dialer.go @@ -0,0 +1,37 @@ +package proxy + +import ( + "context" + "net" + "time" + + netProxy "golang.org/x/net/proxy" +) + +type ProxiedDialer struct { + dialer netProxy.Dialer +} + +func (pd *ProxiedDialer) Dial(network, addr string) (net.Conn, error) { + return pd.dialer.Dial(network, addr) +} + +func (pd *ProxiedDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + if contextDialer, ok := pd.dialer.(netProxy.ContextDialer); ok { + return contextDialer.DialContext(ctx, network, addr) + } + + contextDialer := contextDialerShim{pd.dialer} + return contextDialer.DialContext(ctx, network, addr) +} + +func (pd *ProxiedDialer) DialTimeout(network, addr string, timeout time.Duration) (net.Conn, error) { + ctx := context.Background() + if timeout.Seconds() != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + return pd.DialContext(ctx, network, addr) +} diff --git a/plugins/common/proxy/proxy.go b/plugins/common/proxy/proxy.go index 00efbb7ae..39823ef0f 100644 --- a/plugins/common/proxy/proxy.go +++ b/plugins/common/proxy/proxy.go @@ -4,21 +4,54 @@ import ( "fmt" "net/http" "net/url" + + "golang.org/x/net/proxy" ) type HTTPProxy struct { - HTTPProxyURL string `toml:"http_proxy_url"` + UseSystemProxy bool `toml:"use_system_proxy"` + HTTPProxyURL string `toml:"http_proxy_url"` } type proxyFunc func(req *http.Request) (*url.URL, error) func (p *HTTPProxy) Proxy() (proxyFunc, error) { - if len(p.HTTPProxyURL) > 0 { + if p.UseSystemProxy { + return http.ProxyFromEnvironment, nil + } else if len(p.HTTPProxyURL) > 0 { address, err := url.Parse(p.HTTPProxyURL) if err != nil { return nil, fmt.Errorf("error parsing proxy url %q: %w", p.HTTPProxyURL, err) } return http.ProxyURL(address), nil } - return http.ProxyFromEnvironment, nil + + return nil, nil +} + +type TCPProxy struct { + UseProxy bool `toml:"use_proxy"` + ProxyURL string `toml:"proxy_url"` +} + +func (p *TCPProxy) Proxy() (*ProxiedDialer, error) { + var dialer proxy.Dialer + if p.UseProxy { + if len(p.ProxyURL) > 0 { + parsed, err := url.Parse(p.ProxyURL) + if err != nil { + return nil, fmt.Errorf("error parsing proxy url %q: %w", p.ProxyURL, err) + } + + if dialer, err = proxy.FromURL(parsed, proxy.Direct); err != nil { + return nil, err + } + } else { + dialer = proxy.FromEnvironment() + } + } else { + dialer = proxy.Direct + } + + return &ProxiedDialer{dialer}, nil } diff --git a/plugins/inputs/cloudwatch/README.md b/plugins/inputs/cloudwatch/README.md index befa7591b..052209250 100644 --- a/plugins/inputs/cloudwatch/README.md +++ b/plugins/inputs/cloudwatch/README.md @@ -46,7 +46,8 @@ API endpoint. In the following order the plugin will attempt to authenticate. ## ex: endpoint_url = "http://localhost:8000" # endpoint_url = "" - ## Set http_proxy (telegraf uses the system wide proxy settings if it's is not set) + ## Set http_proxy + # use_system_proxy = false # http_proxy_url = "http://localhost:8888" # The minimum period for Cloudwatch metrics is 1 minute (60s). However not all diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index 9672ff88a..939f146de 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -3,6 +3,7 @@ package cloudwatch import ( "context" "net/http" + "net/url" "testing" "time" @@ -360,13 +361,18 @@ func TestUpdateWindow(t *testing.T) { func TestProxyFunction(t *testing.T) { c := &CloudWatch{ - HTTPProxy: proxy.HTTPProxy{HTTPProxyURL: "http://www.penguins.com"}, + HTTPProxy: proxy.HTTPProxy{ + HTTPProxyURL: "http://www.penguins.com", + }, } proxyFunction, err := c.HTTPProxy.Proxy() require.NoError(t, err) - proxyResult, err := proxyFunction(&http.Request{}) + u, err := url.Parse("https://monitoring.us-west-1.amazonaws.com/") + require.NoError(t, err) + + proxyResult, err := proxyFunction(&http.Request{URL: u}) require.NoError(t, err) require.Equal(t, "www.penguins.com", proxyResult.Host) } diff --git a/plugins/inputs/x509_cert/README.md b/plugins/inputs/x509_cert/README.md index d48059ded..51560e4b5 100644 --- a/plugins/inputs/x509_cert/README.md +++ b/plugins/inputs/x509_cert/README.md @@ -32,6 +32,10 @@ When using a UDP address as a certificate source, the server must support # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" # tls_server_name = "myhost.example.org" + + ## Set the proxy URL + # use_proxy = true + # proxy_url = "http://localhost:8888" ``` ## Metrics diff --git a/plugins/inputs/x509_cert/x509_cert.go b/plugins/inputs/x509_cert/x509_cert.go index ece23693e..ada4f89b6 100644 --- a/plugins/inputs/x509_cert/x509_cert.go +++ b/plugins/inputs/x509_cert/x509_cert.go @@ -22,6 +22,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal/globpath" + "github.com/influxdata/telegraf/plugins/common/proxy" _tls "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -38,6 +39,7 @@ type X509Cert struct { ExcludeRootCerts bool `toml:"exclude_root_certs"` tlsCfg *tls.Config _tls.ClientConfig + proxy.TCPProxy locations []*url.URL globpaths []*globpath.GlobPath Log telegraf.Logger @@ -126,7 +128,12 @@ func (c *X509Cert) getCert(u *url.URL, timeout time.Duration) ([]*x509.Certifica protocol = "tcp" fallthrough case "tcp", "tcp4", "tcp6": - ipConn, err := net.DialTimeout(protocol, u.Host, timeout) + dialer, err := c.Proxy() + if err != nil { + return nil, err + } + + ipConn, err := dialer.DialTimeout(protocol, u.Host, timeout) if err != nil { return nil, err } diff --git a/plugins/inputs/x509_cert/x509_cert_test.go b/plugins/inputs/x509_cert/x509_cert_test.go index 9c91701fe..503209eb5 100644 --- a/plugins/inputs/x509_cert/x509_cert_test.go +++ b/plugins/inputs/x509_cert/x509_cert_test.go @@ -6,6 +6,8 @@ import ( "fmt" "math/big" "net" + "net/http" + "net/http/httptest" "net/url" "os" "path/filepath" @@ -320,6 +322,25 @@ func TestGatherUDPCertIntegration(t *testing.T) { require.True(t, acc.HasMeasurement("x509_cert")) } +func TestGatherTCPCert(t *testing.T) { + ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + m := &X509Cert{ + Sources: []string{ts.URL}, + Log: testutil.Logger{}, + } + require.NoError(t, m.Init()) + + var acc testutil.Accumulator + require.NoError(t, m.Gather(&acc)) + + require.Len(t, acc.Errors, 0) + require.True(t, acc.HasMeasurement("x509_cert")) +} + func TestGatherCertIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -327,6 +348,7 @@ func TestGatherCertIntegration(t *testing.T) { m := &X509Cert{ Sources: []string{"https://www.influxdata.com:443"}, + Log: testutil.Logger{}, } require.NoError(t, m.Init()) @@ -343,6 +365,7 @@ func TestGatherCertMustNotTimeoutIntegration(t *testing.T) { duration := time.Duration(15) * time.Second m := &X509Cert{ Sources: []string{"https://www.influxdata.com:443"}, + Log: testutil.Logger{}, Timeout: config.Duration(duration), } require.NoError(t, m.Init()) diff --git a/plugins/outputs/datadog/README.md b/plugins/outputs/datadog/README.md index f17ee6f5e..2afef5dae 100644 --- a/plugins/outputs/datadog/README.md +++ b/plugins/outputs/datadog/README.md @@ -17,7 +17,8 @@ This plugin writes to the [Datadog Metrics API][metrics] and requires an ## Write URL override; useful for debugging. # url = "https://app.datadoghq.com/api/v1/series" - ## Set http_proxy (telegraf uses the system wide proxy settings if it isn't set) + ## Set http_proxy + # use_system_proxy = false # http_proxy_url = "http://localhost:8888" ## Override the default (none) compression used to send data. diff --git a/plugins/outputs/websocket/README.md b/plugins/outputs/websocket/README.md index 63e4aed2f..4f5eb6956 100644 --- a/plugins/outputs/websocket/README.md +++ b/plugins/outputs/websocket/README.md @@ -35,6 +35,10 @@ It can output data in any of the [supported output formats][formats]. # socks5_username = "alice" # socks5_password = "pass123" + ## Optional HTTP proxy to use + # use_system_proxy = false + # http_proxy_url = "http://localhost:8888" + ## Data format to output. ## Each data format has it's own unique set of configuration options, read ## more about them here: