feat: add socks5 proxy support for kafka output plugin (#8192)
This commit is contained in:
parent
c1a41383c6
commit
0d529d89ea
1
go.mod
1
go.mod
|
|
@ -26,6 +26,7 @@ require (
|
||||||
github.com/antchfx/xpath v1.2.0
|
github.com/antchfx/xpath v1.2.0
|
||||||
github.com/apache/thrift v0.15.0
|
github.com/apache/thrift v0.15.0
|
||||||
github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740
|
github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740
|
||||||
|
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
|
||||||
github.com/aws/aws-sdk-go-v2 v1.13.0
|
github.com/aws/aws-sdk-go-v2 v1.13.0
|
||||||
github.com/aws/aws-sdk-go-v2/config v1.8.3
|
github.com/aws/aws-sdk-go-v2/config v1.8.3
|
||||||
github.com/aws/aws-sdk-go-v2/credentials v1.4.3
|
github.com/aws/aws-sdk-go-v2/credentials v1.4.3
|
||||||
|
|
|
||||||
2
go.sum
2
go.sum
|
|
@ -291,6 +291,8 @@ github.com/armon/go-metrics v0.3.3 h1:a9F4rlj7EWWrbj7BYw8J8+x+ZZkJeqzNyRk8hdPF+r
|
||||||
github.com/armon/go-metrics v0.3.3/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
|
github.com/armon/go-metrics v0.3.3/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
|
||||||
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||||
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||||
|
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
|
||||||
|
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
|
||||||
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
|
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
|
||||||
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
|
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
|
||||||
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
|
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"golang.org/x/net/proxy"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Socks5ProxyConfig struct {
|
||||||
|
Socks5ProxyEnabled bool `toml:"socks5_enabled"`
|
||||||
|
Socks5ProxyAddress string `toml:"socks5_address"`
|
||||||
|
Socks5ProxyUsername string `toml:"socks5_username"`
|
||||||
|
Socks5ProxyPassword string `toml:"socks5_password"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Socks5ProxyConfig) GetDialer() (proxy.Dialer, error) {
|
||||||
|
var auth *proxy.Auth
|
||||||
|
if c.Socks5ProxyPassword != "" || c.Socks5ProxyUsername != "" {
|
||||||
|
auth = new(proxy.Auth)
|
||||||
|
auth.User = c.Socks5ProxyUsername
|
||||||
|
auth.Password = c.Socks5ProxyPassword
|
||||||
|
}
|
||||||
|
return proxy.SOCKS5("tcp", c.Socks5ProxyAddress, auth, proxy.Direct)
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,70 @@
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-socks5"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSocks5ProxyConfig(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
proxyAddress = "0.0.0.0:12345"
|
||||||
|
proxyUsername = "user"
|
||||||
|
proxyPassword = "password"
|
||||||
|
)
|
||||||
|
|
||||||
|
l, err := net.Listen("tcp", "0.0.0.0:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
server, err := socks5.New(&socks5.Config{
|
||||||
|
AuthMethods: []socks5.Authenticator{socks5.UserPassAuthenticator{
|
||||||
|
Credentials: socks5.StaticCredentials{
|
||||||
|
proxyUsername: proxyPassword,
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
go func() { require.NoError(t, server.ListenAndServe("tcp", proxyAddress)) }()
|
||||||
|
|
||||||
|
conf := Socks5ProxyConfig{
|
||||||
|
Socks5ProxyEnabled: true,
|
||||||
|
Socks5ProxyAddress: proxyAddress,
|
||||||
|
Socks5ProxyUsername: proxyUsername,
|
||||||
|
Socks5ProxyPassword: proxyPassword,
|
||||||
|
}
|
||||||
|
dialer, err := conf.GetDialer()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var proxyConn net.Conn
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
proxyConn, err = dialer.Dial("tcp", l.Addr().String())
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
require.NotNil(t, proxyConn)
|
||||||
|
defer func() { require.NoError(t, proxyConn.Close()) }()
|
||||||
|
|
||||||
|
serverConn, err := l.Accept()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() { require.NoError(t, serverConn.Close()) }()
|
||||||
|
|
||||||
|
writePayload := []byte("test")
|
||||||
|
_, err = proxyConn.Write(writePayload)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
receivePayload := make([]byte, 4)
|
||||||
|
_, err = serverConn.Read(receivePayload)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, writePayload, receivePayload)
|
||||||
|
}
|
||||||
|
|
@ -113,6 +113,12 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
|
||||||
## Use TLS but skip chain & host verification
|
## Use TLS but skip chain & host verification
|
||||||
# insecure_skip_verify = false
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## Optional SOCKS5 proxy to use when connecting to brokers
|
||||||
|
# socks5_enabled = true
|
||||||
|
# socks5_address = "127.0.0.1:1080"
|
||||||
|
# socks5_username = "alice"
|
||||||
|
# socks5_password = "pass123"
|
||||||
|
|
||||||
## Optional SASL Config
|
## Optional SASL Config
|
||||||
# sasl_username = "kafka"
|
# sasl_username = "kafka"
|
||||||
# sasl_password = "secret"
|
# sasl_password = "secret"
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/common/kafka"
|
"github.com/influxdata/telegraf/plugins/common/kafka"
|
||||||
|
"github.com/influxdata/telegraf/plugins/common/proxy"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
)
|
)
|
||||||
|
|
@ -32,6 +33,8 @@ type Kafka struct {
|
||||||
RoutingTag string `toml:"routing_tag"`
|
RoutingTag string `toml:"routing_tag"`
|
||||||
RoutingKey string `toml:"routing_key"`
|
RoutingKey string `toml:"routing_key"`
|
||||||
|
|
||||||
|
proxy.Socks5ProxyConfig
|
||||||
|
|
||||||
// Legacy TLS config options
|
// Legacy TLS config options
|
||||||
// TLS client certificate
|
// TLS client certificate
|
||||||
Certificate string
|
Certificate string
|
||||||
|
|
@ -189,6 +192,12 @@ var sampleConfig = `
|
||||||
## Use TLS but skip chain & host verification
|
## Use TLS but skip chain & host verification
|
||||||
# insecure_skip_verify = false
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## Optional SOCKS5 proxy to use when connecting to brokers
|
||||||
|
# socks5_enabled = true
|
||||||
|
# socks5_address = "127.0.0.1:1080"
|
||||||
|
# socks5_username = "alice"
|
||||||
|
# socks5_password = "pass123"
|
||||||
|
|
||||||
## Optional SASL Config
|
## Optional SASL Config
|
||||||
# sasl_username = "kafka"
|
# sasl_username = "kafka"
|
||||||
# sasl_password = "secret"
|
# sasl_password = "secret"
|
||||||
|
|
@ -292,6 +301,16 @@ func (k *Kafka) Init() error {
|
||||||
k.TLSKey = k.Key
|
k.TLSKey = k.Key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if k.Socks5ProxyEnabled {
|
||||||
|
config.Net.Proxy.Enable = true
|
||||||
|
|
||||||
|
dialer, err := k.Socks5ProxyConfig.GetDialer()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("connecting to proxy server failed: %s", err)
|
||||||
|
}
|
||||||
|
config.Net.Proxy.Dialer = dialer
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue