feat(outputs.amqp): Add proxy support (#11649)
This commit is contained in:
parent
f56bc051b4
commit
35803efd2f
|
|
@ -90,6 +90,10 @@ For an introduction to AMQP see:
|
||||||
## Use TLS but skip chain & host verification
|
## Use TLS but skip chain & host verification
|
||||||
# insecure_skip_verify = false
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## Optional Proxy Configuration
|
||||||
|
# use_proxy = false
|
||||||
|
# proxy_url = "localhost:8888"
|
||||||
|
|
||||||
## If true use batch serialization format instead of line based delimiting.
|
## If true use batch serialization format instead of line based delimiting.
|
||||||
## Only applies to data formats which are not line based such as JSON.
|
## Only applies to data formats which are not line based such as JSON.
|
||||||
## Recommended to set to true.
|
## Recommended to set to true.
|
||||||
|
|
@ -120,3 +124,10 @@ Exchange types that do not use a routing key, `direct` and `header`, always use
|
||||||
the empty string as the routing key.
|
the empty string as the routing key.
|
||||||
|
|
||||||
Metrics are published in batches based on the final routing key.
|
Metrics are published in batches based on the final routing key.
|
||||||
|
|
||||||
|
### Proxy
|
||||||
|
|
||||||
|
If you want to use a proxy, you need to set `use_proxy = true`. This will
|
||||||
|
use the system's proxy settings to determine the proxy URL. If you need to
|
||||||
|
specify a proxy URL manually, you can do so by using `proxy_url`, overriding
|
||||||
|
the system settings.
|
||||||
|
|
|
||||||
|
|
@ -12,12 +12,14 @@ import (
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/plugins/common/proxy"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
|
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
|
||||||
|
//
|
||||||
//go:embed sample.conf
|
//go:embed sample.conf
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
|
|
@ -63,6 +65,7 @@ type AMQP struct {
|
||||||
ContentEncoding string `toml:"content_encoding"`
|
ContentEncoding string `toml:"content_encoding"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
|
proxy.TCPProxy
|
||||||
|
|
||||||
serializer serializers.Serializer
|
serializer serializers.Serializer
|
||||||
connect func(*ClientConfig) (Client, error)
|
connect func(*ClientConfig) (Client, error)
|
||||||
|
|
@ -283,6 +286,12 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
|
||||||
}
|
}
|
||||||
clientConfig.tlsConfig = tlsConfig
|
clientConfig.tlsConfig = tlsConfig
|
||||||
|
|
||||||
|
dialer, err := q.TCPProxy.Proxy()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
clientConfig.dialer = dialer
|
||||||
|
|
||||||
var auth []amqp.Authentication
|
var auth []amqp.Authentication
|
||||||
if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
|
if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
|
||||||
auth = []amqp.Authentication{&externalAuth{}}
|
auth = []amqp.Authentication{&externalAuth{}}
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import (
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/common/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClientConfig struct {
|
type ClientConfig struct {
|
||||||
|
|
@ -26,6 +27,7 @@ type ClientConfig struct {
|
||||||
tlsConfig *tls.Config
|
tlsConfig *tls.Config
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
auth []amqp.Authentication
|
auth []amqp.Authentication
|
||||||
|
dialer *proxy.ProxiedDialer
|
||||||
log telegraf.Logger
|
log telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -50,7 +52,7 @@ func newClient(config *ClientConfig) (*client, error) {
|
||||||
TLSClientConfig: config.tlsConfig,
|
TLSClientConfig: config.tlsConfig,
|
||||||
SASL: config.auth, // if nil, it will be PLAIN taken from url
|
SASL: config.auth, // if nil, it will be PLAIN taken from url
|
||||||
Dial: func(network, addr string) (net.Conn, error) {
|
Dial: func(network, addr string) (net.Conn, error) {
|
||||||
return net.DialTimeout(network, addr, config.timeout)
|
return config.dialer.DialTimeout(network, addr, config.timeout)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
|
||||||
|
|
@ -75,6 +75,10 @@
|
||||||
## Use TLS but skip chain & host verification
|
## Use TLS but skip chain & host verification
|
||||||
# insecure_skip_verify = false
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## Optional Proxy Configuration
|
||||||
|
# use_proxy = false
|
||||||
|
# proxy_url = "localhost:8888"
|
||||||
|
|
||||||
## If true use batch serialization format instead of line based delimiting.
|
## If true use batch serialization format instead of line based delimiting.
|
||||||
## Only applies to data formats which are not line based such as JSON.
|
## Only applies to data formats which are not line based such as JSON.
|
||||||
## Recommended to set to true.
|
## Recommended to set to true.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue