feat(outputs.influxdb): Add option to define local address (#15225)
This commit is contained in:
parent
996e104e0c
commit
bf55d782f0
|
|
@ -34,6 +34,10 @@ to use them.
|
||||||
# urls = ["udp://127.0.0.1:8089"]
|
# urls = ["udp://127.0.0.1:8089"]
|
||||||
# urls = ["http://127.0.0.1:8086"]
|
# urls = ["http://127.0.0.1:8086"]
|
||||||
|
|
||||||
|
## Local address to bind when connecting to the server
|
||||||
|
## If empty or not set, the local address is automatically chosen.
|
||||||
|
# local_address = ""
|
||||||
|
|
||||||
## The target database for metrics; will be created as needed.
|
## The target database for metrics; will be created as needed.
|
||||||
## For UDP url endpoint database needs to be configured on server side.
|
## For UDP url endpoint database needs to be configured on server side.
|
||||||
# database = "telegraf"
|
# database = "telegraf"
|
||||||
|
|
|
||||||
|
|
@ -87,6 +87,7 @@ func (r WriteResponseError) Error() string {
|
||||||
|
|
||||||
type HTTPConfig struct {
|
type HTTPConfig struct {
|
||||||
URL *url.URL
|
URL *url.URL
|
||||||
|
LocalAddr *net.TCPAddr
|
||||||
UserAgent string
|
UserAgent string
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
Username config.Secret
|
Username config.Secret
|
||||||
|
|
@ -164,9 +165,15 @@ func NewHTTPClient(cfg HTTPConfig) (*httpClient, error) {
|
||||||
var transport *http.Transport
|
var transport *http.Transport
|
||||||
switch cfg.URL.Scheme {
|
switch cfg.URL.Scheme {
|
||||||
case "http", "https":
|
case "http", "https":
|
||||||
|
var dialerFunc func(ctx context.Context, network, addr string) (net.Conn, error)
|
||||||
|
if cfg.LocalAddr != nil {
|
||||||
|
dialer := &net.Dialer{LocalAddr: cfg.LocalAddr}
|
||||||
|
dialerFunc = dialer.DialContext
|
||||||
|
}
|
||||||
transport = &http.Transport{
|
transport = &http.Transport{
|
||||||
Proxy: proxy,
|
Proxy: proxy,
|
||||||
TLSClientConfig: cfg.TLSConfig,
|
TLSClientConfig: cfg.TLSConfig,
|
||||||
|
DialContext: dialerFunc,
|
||||||
}
|
}
|
||||||
case "unix":
|
case "unix":
|
||||||
transport = &http.Transport{
|
transport = &http.Transport{
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,10 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
|
@ -37,6 +40,7 @@ type Client interface {
|
||||||
// InfluxDB struct is the primary data structure for the plugin
|
// InfluxDB struct is the primary data structure for the plugin
|
||||||
type InfluxDB struct {
|
type InfluxDB struct {
|
||||||
URLs []string `toml:"urls"`
|
URLs []string `toml:"urls"`
|
||||||
|
LocalAddr string `toml:"local_address"`
|
||||||
Username config.Secret `toml:"username"`
|
Username config.Secret `toml:"username"`
|
||||||
Password config.Secret `toml:"password"`
|
Password config.Secret `toml:"password"`
|
||||||
Database string `toml:"database"`
|
Database string `toml:"database"`
|
||||||
|
|
@ -89,16 +93,54 @@ func (i *InfluxDB) Connect() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var localIP *net.IPAddr
|
||||||
|
var localPort int
|
||||||
|
if i.LocalAddr != "" {
|
||||||
|
var err error
|
||||||
|
// Resolve the local address into IP address and the given port if any
|
||||||
|
addr, sPort, err := net.SplitHostPort(i.LocalAddr)
|
||||||
|
if err != nil {
|
||||||
|
if !strings.Contains(err.Error(), "missing port") {
|
||||||
|
return fmt.Errorf("invalid local address: %w", err)
|
||||||
|
}
|
||||||
|
addr = i.LocalAddr
|
||||||
|
}
|
||||||
|
localIP, err = net.ResolveIPAddr("ip", addr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot resolve local address: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if sPort != "" {
|
||||||
|
p, err := strconv.ParseUint(sPort, 10, 16)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid port: %w", err)
|
||||||
|
}
|
||||||
|
localPort = int(p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
switch parts.Scheme {
|
switch parts.Scheme {
|
||||||
case "udp", "udp4", "udp6":
|
case "udp", "udp4", "udp6":
|
||||||
c, err := i.udpClient(parts)
|
var c Client
|
||||||
|
var err error
|
||||||
|
if i.LocalAddr == "" {
|
||||||
|
c, err = i.udpClient(parts, nil)
|
||||||
|
} else {
|
||||||
|
c, err = i.udpClient(parts, &net.UDPAddr{IP: localIP.IP, Port: localPort, Zone: localIP.Zone})
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
i.clients = append(i.clients, c)
|
i.clients = append(i.clients, c)
|
||||||
case "http", "https", "unix":
|
case "http", "https", "unix":
|
||||||
c, err := i.httpClient(ctx, parts, proxy)
|
var c Client
|
||||||
|
var err error
|
||||||
|
if i.LocalAddr == "" {
|
||||||
|
c, err = i.httpClient(ctx, parts, nil, proxy)
|
||||||
|
} else {
|
||||||
|
c, err = i.httpClient(ctx, parts, &net.TCPAddr{IP: localIP.IP, Port: localPort, Zone: localIP.Zone}, proxy)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -159,7 +201,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
return errors.New("could not write any address")
|
return errors.New("could not write any address")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *InfluxDB) udpClient(address *url.URL) (Client, error) {
|
func (i *InfluxDB) udpClient(address *url.URL, localAddr *net.UDPAddr) (Client, error) {
|
||||||
serializer := &influx.Serializer{UintSupport: i.InfluxUintSupport}
|
serializer := &influx.Serializer{UintSupport: i.InfluxUintSupport}
|
||||||
if err := serializer.Init(); err != nil {
|
if err := serializer.Init(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -167,6 +209,7 @@ func (i *InfluxDB) udpClient(address *url.URL) (Client, error) {
|
||||||
|
|
||||||
udpConfig := &UDPConfig{
|
udpConfig := &UDPConfig{
|
||||||
URL: address,
|
URL: address,
|
||||||
|
LocalAddr: localAddr,
|
||||||
MaxPayloadSize: int(i.UDPPayload),
|
MaxPayloadSize: int(i.UDPPayload),
|
||||||
Serializer: serializer,
|
Serializer: serializer,
|
||||||
Log: i.Log,
|
Log: i.Log,
|
||||||
|
|
@ -180,7 +223,7 @@ func (i *InfluxDB) udpClient(address *url.URL) (Client, error) {
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *InfluxDB) httpClient(ctx context.Context, address *url.URL, proxy *url.URL) (Client, error) {
|
func (i *InfluxDB) httpClient(ctx context.Context, address *url.URL, localAddr *net.TCPAddr, proxy *url.URL) (Client, error) {
|
||||||
tlsConfig, err := i.ClientConfig.TLSConfig()
|
tlsConfig, err := i.ClientConfig.TLSConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -193,6 +236,7 @@ func (i *InfluxDB) httpClient(ctx context.Context, address *url.URL, proxy *url.
|
||||||
|
|
||||||
httpConfig := &HTTPConfig{
|
httpConfig := &HTTPConfig{
|
||||||
URL: address,
|
URL: address,
|
||||||
|
LocalAddr: localAddr,
|
||||||
Timeout: time.Duration(i.Timeout),
|
Timeout: time.Duration(i.Timeout),
|
||||||
TLSConfig: tlsConfig,
|
TLSConfig: tlsConfig,
|
||||||
UserAgent: i.UserAgent,
|
UserAgent: i.UserAgent,
|
||||||
|
|
|
||||||
|
|
@ -214,3 +214,23 @@ func TestWriteRecreateDatabaseIfDatabaseNotFound(t *testing.T) {
|
||||||
// We only have one URL, so we expect an error
|
// We only have one URL, so we expect an error
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInfluxDBLocalAddress(t *testing.T) {
|
||||||
|
output := influxdb.InfluxDB{
|
||||||
|
URLs: []string{"http://localhost:8086"},
|
||||||
|
LocalAddr: "localhost",
|
||||||
|
|
||||||
|
CreateHTTPClientF: func(_ *influxdb.HTTPConfig) (influxdb.Client, error) {
|
||||||
|
return &MockClient{
|
||||||
|
DatabaseF: func() string {
|
||||||
|
return "telegraf"
|
||||||
|
},
|
||||||
|
CreateDatabaseF: func() error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, output.Connect())
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,10 @@
|
||||||
# urls = ["udp://127.0.0.1:8089"]
|
# urls = ["udp://127.0.0.1:8089"]
|
||||||
# urls = ["http://127.0.0.1:8086"]
|
# urls = ["http://127.0.0.1:8086"]
|
||||||
|
|
||||||
|
## Local address to bind when connecting to the server
|
||||||
|
## If empty or not set, the local address is automatically chosen.
|
||||||
|
# local_address = ""
|
||||||
|
|
||||||
## The target database for metrics; will be created as needed.
|
## The target database for metrics; will be created as needed.
|
||||||
## For UDP url endpoint database needs to be configured on server side.
|
## For UDP url endpoint database needs to be configured on server side.
|
||||||
# database = "telegraf"
|
# database = "telegraf"
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ type Conn interface {
|
||||||
type UDPConfig struct {
|
type UDPConfig struct {
|
||||||
MaxPayloadSize int
|
MaxPayloadSize int
|
||||||
URL *url.URL
|
URL *url.URL
|
||||||
|
LocalAddr *net.UDPAddr
|
||||||
Serializer *influx.Serializer
|
Serializer *influx.Serializer
|
||||||
Dialer Dialer
|
Dialer Dialer
|
||||||
Log telegraf.Logger
|
Log telegraf.Logger
|
||||||
|
|
@ -55,7 +56,7 @@ func NewUDPClient(config UDPConfig) (*udpClient, error) {
|
||||||
|
|
||||||
dialer := config.Dialer
|
dialer := config.Dialer
|
||||||
if dialer == nil {
|
if dialer == nil {
|
||||||
dialer = &netDialer{net.Dialer{}}
|
dialer = &netDialer{net.Dialer{LocalAddr: config.LocalAddr}}
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &udpClient{
|
client := &udpClient{
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue