Merge pull request #355 from CopernicaMarketingSoftware/connect-timeout

Implement connectTimeout option on the TcpConnection initial resolve
This commit is contained in:
Emiel Bruijntjes 2020-10-05 14:30:51 +02:00 committed by GitHub
commit 668bf8229c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 15 deletions

View File

@ -26,7 +26,7 @@ namespace AMQP {
*/
TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) :
_handler(handler),
_state(new TcpResolver(this, address.hostname(), address.port(), address.secure())),
_state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), address.option("connectTimeout", 5))),
_connection(this, address.login(), address.vhost())
{
// tell the handler

View File

@ -53,6 +53,12 @@ private:
* @var uint16_t
*/
uint16_t _port;
/**
* Timeout for the connect call in seconds.
* @var int
*/
int _timeout;
/**
* A pipe that is used to send back the socket that is connected to RabbitMQ
@ -93,6 +99,9 @@ private:
// get address info
AddressInfo addresses(_hostname.data(), _port);
// an fdset to monitor for writability
fd_set writeset;
// iterate over the addresses
for (size_t i = 0; i < addresses.size(); ++i)
{
@ -101,12 +110,47 @@ private:
// move on on failure
if (_socket < 0) continue;
// connect to the socket
// turn socket into a non-blocking socket and set the close-on-exec bit
fcntl(_socket, F_SETFL, O_NONBLOCK | O_CLOEXEC);
// try to connect non-blocking
if (connect(_socket, addresses[i]->ai_addr, addresses[i]->ai_addrlen) == 0) break;
// we set the timeout to a timeout, with 5 seconds as the default
struct timeval timeout{_timeout,0};
// reset the fdset
FD_ZERO(&writeset);
// set the fd to monitor for writing
FD_SET(_socket, &writeset);
// perform a select, wait for something to happen on one of the fds
int ret = select(_socket + 1, nullptr, &writeset, nullptr, &timeout);
// log the error for the time being
_error = strerror(errno);
if (ret == 0) _error = "connection timed out";
// otherwise, select might've failed
else if (ret < 0) _error = strerror(errno);
// otherwise the connect failed/succeeded
else
{
// the error
int err = 0;
socklen_t len = 4;
// get the options
getsockopt(_socket, SOL_SOCKET, SO_ERROR, &err, &len);
// if the error is zero, we break, socket is now valid
if (err == 0) break;
// set the error with the value
_error = strerror(err);
}
// close socket because connect failed
::close(_socket);
@ -118,9 +162,6 @@ private:
// connection succeeded, mark socket as non-blocking
if (_socket >= 0)
{
// turn socket into a non-blocking socket and set the close-on-exec bit
fcntl(_socket, F_SETFL, O_NONBLOCK | O_CLOEXEC);
// we want to enable "nodelay" on sockets (otherwise all send operations are s-l-o-w
int optval = 1;
@ -138,11 +179,8 @@ private:
_error = error.what();
}
// notify the master thread by sending a byte over the pipe
if (!_pipe.notify())
{
_error = strerror(errno);
}
// notify the master thread by sending a byte over the pipe, store error if this fails
if (!_pipe.notify()) _error = strerror(errno);
}
public:
@ -152,12 +190,14 @@ public:
* @param hostname The hostname for the lookup
* @param portnumber The portnumber for the lookup
* @param secure Do we need a secure tls connection when ready?
* @param timeout timeout per connection attempt
*/
TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure) :
TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure, int timeout) :
TcpExtState(parent),
_hostname(std::move(hostname)),
_secure(secure),
_port(port)
_port(port),
_timeout(timeout)
{
// tell the event loop to monitor the filedescriptor of the pipe
parent->onIdle(this, _pipe.in(), readable);