From 45704965471233da16b5475cc481ecb416101407 Mon Sep 17 00:00:00 2001 From: aljar Date: Fri, 13 Nov 2020 17:10:30 +0100 Subject: [PATCH 1/4] Add option to select an IP randomly instead of using the order provided by getaddrinfo, which is proximity based. --- include/amqpcpp/linux_tcp/tcpconnection.h | 3 ++- src/linux_tcp/addressinfo.h | 30 ++++++++++++++++++++--- src/linux_tcp/tcpconnection.cpp | 6 ++--- src/linux_tcp/tcpresolver.h | 14 ++++++++--- 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index a8dae17..6167f79 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -192,8 +192,9 @@ public: * Constructor * @param handler User implemented handler object * @param hostname The address to connect to + * @param random Randomly select one of the IP addresses that belong to the hostname */ - TcpConnection(TcpHandler *handler, const Address &address); + TcpConnection(TcpHandler *handler, const Address &address, bool random = false); /** * No copying diff --git a/src/linux_tcp/addressinfo.h b/src/linux_tcp/addressinfo.h index bb72532..b9df839 100644 --- a/src/linux_tcp/addressinfo.h +++ b/src/linux_tcp/addressinfo.h @@ -1,12 +1,17 @@ /** * AddressInfo.h * - * Utility wrapper arround "getAddressInfo()" + * Utility wrapper around "getAddressInfo()" * * @author Emiel Bruijntjes * @copyright 2015 Copernica BV */ +/** + * Dependencies + */ +#include + /** * Include guard */ @@ -35,8 +40,9 @@ public: * Constructor * @param hostname * @param port + * @param randomOrder */ - AddressInfo(const char *hostname, uint16_t port = 5672) + AddressInfo(const char *hostname, uint16_t port = 5672, bool randomOrder = false) { // store portnumber in buffer auto portnumber = std::to_string(port); @@ -63,6 +69,25 @@ public: // store in vector _v.push_back(current); } + + // Do we want to have a random order of the addresses? + // This may be useful since getaddrinfo is sorting the addresses on proximity + // (e.g. https://lists.debian.org/debian-glibc/2007/09/msg00347.html), + // which may break loadbalancing.. + if (randomOrder) + { + // We need to seed the random number generator. Normally time is taken + // for this. Since we want to have randomness within a second we use + // more precision via timeval + struct timeval time; + gettimeofday(&time, nullptr); + + // We seed with the precision of miliseconds. + srand((time.tv_sec * 1000) + (time.tv_usec / 1000)); + + // shuffle the vector. + std::random_shuffle(_v.begin(), _v.end()); + } } /** @@ -99,4 +124,3 @@ public: * End of namespace */ } - diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index 7d9e8cc..316fd0e 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -23,10 +23,11 @@ namespace AMQP { * Constructor * @param handler User implemented handler object * @param hostname The address to connect to + * @param random Randomly select one of the IP addresses that belong to the hostname */ -TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) : +TcpConnection::TcpConnection(TcpHandler *handler, const Address &address, bool random) : _handler(handler), - _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), address.option("connectTimeout", 5))), + _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), address.option("connectTimeout", 5), random)), _connection(this, address.login(), address.vhost()) { // tell the handler @@ -286,4 +287,3 @@ void TcpConnection::onLost(TcpState *state) * End of namespace */ } - diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index b5c2fa4..a44038d 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -85,6 +85,12 @@ private: */ std::thread _thread; + /** + * Do we want to have random ordering + * @var bool + */ + bool _random; + /** * Run the thread @@ -98,7 +104,7 @@ private: if (_secure && !OpenSSL::valid()) throw std::runtime_error("Secure connection cannot be established: libssl.so cannot be loaded"); // get address info - AddressInfo addresses(_hostname.data(), _port); + AddressInfo addresses(_hostname.data(), _port, _random); // the pollfd structure, needed for poll() pollfd fd; @@ -188,13 +194,15 @@ public: * @param portnumber The portnumber for the lookup * @param secure Do we need a secure tls connection when ready? * @param timeout timeout per connection attempt + * @param random Do we want to have random oredering of the address of the host to connect to */ - TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure, int timeout) : + TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure, int timeout, bool random) : TcpExtState(parent), _hostname(std::move(hostname)), _secure(secure), _port(port), - _timeout(timeout) + _timeout(timeout), + _random(random) { // tell the event loop to monitor the filedescriptor of the pipe parent->onIdle(this, _pipe.in(), readable); From 11f1eaf2c34f1e26648fe1be5b192017338ca86b Mon Sep 17 00:00:00 2001 From: aljar Date: Fri, 13 Nov 2020 17:49:14 +0100 Subject: [PATCH 2/4] Don't use the old random_shuffle and don't set a global seed. --- src/linux_tcp/addressinfo.h | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/linux_tcp/addressinfo.h b/src/linux_tcp/addressinfo.h index b9df839..550bf96 100644 --- a/src/linux_tcp/addressinfo.h +++ b/src/linux_tcp/addressinfo.h @@ -10,7 +10,7 @@ /** * Dependencies */ -#include +#include /** * Include guard @@ -76,17 +76,14 @@ public: // which may break loadbalancing.. if (randomOrder) { - // We need to seed the random number generator. Normally time is taken - // for this. Since we want to have randomness within a second we use - // more precision via timeval - struct timeval time; - gettimeofday(&time, nullptr); + // create a random device for the seed of the random number generator + std::random_device rd; - // We seed with the precision of miliseconds. - srand((time.tv_sec * 1000) + (time.tv_usec / 1000)); + // Create the generator + std::mt19937 gen(rd()); - // shuffle the vector. - std::random_shuffle(_v.begin(), _v.end()); + // shuffle the vector. + std::shuffle(_v.begin(), _v.end(), gen); } } From fe1538e8ae2e6a7a15fcfee0f8f11b6c4f0deef3 Mon Sep 17 00:00:00 2001 From: aljar Date: Fri, 13 Nov 2020 18:05:07 +0100 Subject: [PATCH 3/4] Set option via address. --- include/amqpcpp/linux_tcp/tcpconnection.h | 3 +-- src/linux_tcp/tcpconnection.cpp | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index 6167f79..a8dae17 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -192,9 +192,8 @@ public: * Constructor * @param handler User implemented handler object * @param hostname The address to connect to - * @param random Randomly select one of the IP addresses that belong to the hostname */ - TcpConnection(TcpHandler *handler, const Address &address, bool random = false); + TcpConnection(TcpHandler *handler, const Address &address); /** * No copying diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index 316fd0e..70b4035 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -23,11 +23,10 @@ namespace AMQP { * Constructor * @param handler User implemented handler object * @param hostname The address to connect to - * @param random Randomly select one of the IP addresses that belong to the hostname */ -TcpConnection::TcpConnection(TcpHandler *handler, const Address &address, bool random) : +TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) : _handler(handler), - _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), address.option("connectTimeout", 5), random)), + _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), address.option("connectTimeout", 5), address.option("randomConnect", false))), _connection(this, address.login(), address.vhost()) { // tell the handler From f9d85f5d01c34d63cf91fa8d55e0c4203c2a9d92 Mon Sep 17 00:00:00 2001 From: aljar Date: Mon, 16 Nov 2020 16:30:29 +0100 Subject: [PATCH 4/4] Use address to pass the option and add some extra options for sorting ips. --- src/linux_tcp/address2str.h | 108 ++++++++++++++++++++++++++++++ src/linux_tcp/addressinfo.h | 112 +++++++++++++++++++++++++++----- src/linux_tcp/connectionorder.h | 73 +++++++++++++++++++++ src/linux_tcp/tcpconnection.cpp | 2 +- src/linux_tcp/tcpresolver.h | 12 ++-- 5 files changed, 283 insertions(+), 24 deletions(-) create mode 100644 src/linux_tcp/address2str.h create mode 100644 src/linux_tcp/connectionorder.h diff --git a/src/linux_tcp/address2str.h b/src/linux_tcp/address2str.h new file mode 100644 index 0000000..d3ca683 --- /dev/null +++ b/src/linux_tcp/address2str.h @@ -0,0 +1,108 @@ +/** + * Address2Str.h + * + * Helper class to get a stringed version of an address obtained from get_addrinfo + * + * @author Aljar Meesters +#include + +/** + * Opening namespace + */ +namespace AMQP { + +/** + * class implemenation + */ +class Address2str +{ +private: + /** + * The address as string + */ + char *_buffer = nullptr; + +public: + /** + * Constructor + * @param struct addrinfo * + */ + Address2str(struct addrinfo *info) + { + // Switch on family + switch(info->ai_family) + { + // If we are ip4 + case AF_INET: + { + // ugly cast + struct sockaddr_in *addr_in = (struct sockaddr_in *)(info->ai_addr); + + // create the buffer + _buffer = new char[INET_ADDRSTRLEN]; + + // get the info in our buffer + inet_ntop(AF_INET, &(addr_in->sin_addr), _buffer, INET_ADDRSTRLEN); + + // done + break; + } + + // if we are ipv6 + case AF_INET6: + { + // ugly cast + struct sockaddr_in6 *addr_in6 = (struct sockaddr_in6 *)(info->ai_addr); + + // crate buffer + _buffer = new char[INET6_ADDRSTRLEN]; + + // get the info in our buffer + inet_ntop(AF_INET6, &(addr_in6->sin6_addr), _buffer, INET6_ADDRSTRLEN); + + // done + break; + } + default: + // If it is not ipv4 or ipv6 we don't set it. + break; + } + } + + /** + * Destruction + */ + virtual ~Address2str() + { + // if we have a buffer, we should free it. + if (_buffer) delete[](_buffer); + } + + /** + * get the char + * @return char* + */ + const char *toChar() const + { + // expose the buffer. + return _buffer; + } +}; + + +/** + * ending of namespace + */ +} \ No newline at end of file diff --git a/src/linux_tcp/addressinfo.h b/src/linux_tcp/addressinfo.h index 550bf96..85c116e 100644 --- a/src/linux_tcp/addressinfo.h +++ b/src/linux_tcp/addressinfo.h @@ -11,6 +11,8 @@ * Dependencies */ #include +#include "connectionorder.h" +#include "address2str.h" /** * Include guard @@ -35,14 +37,103 @@ private: */ std::vector _v; + /** + * Helper function to order the vector of addrinfo based on the ordering received + * @param order + */ + void reorder(ConnectionOrder::Order order) + { + // witch on order + switch (order) + { + // Do we want to have a random order of the addresses? + // This may be useful since getaddrinfo is sorting the addresses on proximity + // (e.g. https://lists.debian.org/debian-glibc/2007/09/msg00347.html), + // which may break loadbalancing.. + case ConnectionOrder::Order::random: + { + // create a random device for the seed of the random number generator + std::random_device rd; + + // Create the generator + std::mt19937 gen(rd()); + + // shuffle the vector. + std::shuffle(_v.begin(), _v.end(), gen); + + // done + break; + } + // do we want to sort in ascending order + case ConnectionOrder::Order::ascending: + { + std::sort(_v.begin(), _v.end(), [] + (struct addrinfo * v1, struct addrinfo * v2) -> bool + { + // get the addresses + Address2str addr1(v1); + Address2str addr2(v2); + + // if addr1 doesn't have a proper address it should go to the + // back. Same holds for addr2 + if (addr1.toChar() == nullptr) return false; + if (addr2.toChar() == nullptr) return true; + + // make the comparison based on string comparison + return strcmp(addr1.toChar(), addr2.toChar()) < 0; + }); + + // done + break; + } + + // do we want to sort in descending order + case ConnectionOrder::Order::descending: + { + std::sort(_v.begin(), _v.end(), [] + (struct addrinfo * v1, struct addrinfo * v2) -> bool + { + // get the addresses + Address2str addr1(v1); + Address2str addr2(v2); + + // if addr1 doesn't have a proper address it should go to the + // back. Same holds for addr2 + if (addr1.toChar() == nullptr) return false; + if (addr2.toChar() == nullptr) return true; + + // make the comparison based on string comparison + return strcmp(addr1.toChar(), addr2.toChar()) > 0; + }); + + // done + break; + } + + // de we want to have reverse ordering of proximity + case ConnectionOrder::Order::reverse: + { + std::reverse(_v.begin(), _v.end()); + + // done + break; + } + + default: + // nothing to do, just default behaviour + break; + } + } + + public: /** * Constructor * @param hostname * @param port - * @param randomOrder + * @param order */ - AddressInfo(const char *hostname, uint16_t port = 5672, bool randomOrder = false) + AddressInfo(const char *hostname, uint16_t port = 5672, ConnectionOrder::Order order = ConnectionOrder::Order::standard) { // store portnumber in buffer auto portnumber = std::to_string(port); @@ -70,21 +161,8 @@ public: _v.push_back(current); } - // Do we want to have a random order of the addresses? - // This may be useful since getaddrinfo is sorting the addresses on proximity - // (e.g. https://lists.debian.org/debian-glibc/2007/09/msg00347.html), - // which may break loadbalancing.. - if (randomOrder) - { - // create a random device for the seed of the random number generator - std::random_device rd; - - // Create the generator - std::mt19937 gen(rd()); - - // shuffle the vector. - std::shuffle(_v.begin(), _v.end(), gen); - } + // Order the vector based on the provided ordering + reorder(order); } /** diff --git a/src/linux_tcp/connectionorder.h b/src/linux_tcp/connectionorder.h new file mode 100644 index 0000000..18cd203 --- /dev/null +++ b/src/linux_tcp/connectionorder.h @@ -0,0 +1,73 @@ +/** + * ConnectionOrder.h + * + * Class that give info on how we want to sellect the connection from a list of IPs + * + * @author Aljar Meesters + * @copyright 2020 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Setup namespace + */ +namespace AMQP +{ + +/** + * Class implementation + */ +class ConnectionOrder +{ +public: + /** + * Enum class holding the orders we support + * - standard: what is used by getaddrinfo, which is proximity based + * - reverse: reverse the standard order + * - random: random order + * - ascending: try the smallest IP address first + * - descending: try the largest IP address first + * @var enum Order + */ + enum class Order { standard, reverse, random, ascending, descending}; + +private: + /** + * The order for the connection + * @var Order + */ + Order _order; + +public: + /** + * Constructor + * @var order + */ + ConnectionOrder (const char *order) : _order(Order::standard) + { + // Set the orders based on the string + if (strcmp(order, "random") == 0) _order = Order::random; + else if (strcmp(order, "ascending") == 0 || strcmp(order, "asc") == 0) _order = Order::ascending; + else if (strcmp(order, "descending") == 0 || strcmp(order, "desc") == 0 ) _order = Order::descending; + + // If we don't ave a match we fall back to standard, which was set as default + } + + /** + * Get the order + * @return Order + */ + Order order() + { + return _order; + } +}; + +/** + * End of namespace + */ +} \ No newline at end of file diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index 70b4035..a4544ab 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -26,7 +26,7 @@ namespace AMQP { */ TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) : _handler(handler), - _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), address.option("connectTimeout", 5), address.option("randomConnect", false))), + _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), address.option("connectTimeout", 5), ConnectionOrder(address.option("connectionOrder")).order())), _connection(this, address.login(), address.vhost()) { // tell the handler diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index a44038d..3c437cc 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -86,10 +86,10 @@ private: std::thread _thread; /** - * Do we want to have random ordering + * How should the addresses be ordered when we want to connect * @var bool */ - bool _random; + ConnectionOrder::Order _order; /** @@ -104,7 +104,7 @@ private: if (_secure && !OpenSSL::valid()) throw std::runtime_error("Secure connection cannot be established: libssl.so cannot be loaded"); // get address info - AddressInfo addresses(_hostname.data(), _port, _random); + AddressInfo addresses(_hostname.data(), _port, _order); // the pollfd structure, needed for poll() pollfd fd; @@ -194,15 +194,15 @@ public: * @param portnumber The portnumber for the lookup * @param secure Do we need a secure tls connection when ready? * @param timeout timeout per connection attempt - * @param random Do we want to have random oredering of the address of the host to connect to + * @param order How should we oreder the addresses of the host to connect to */ - TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure, int timeout, bool random) : + TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure, int timeout, ConnectionOrder::Order order) : TcpExtState(parent), _hostname(std::move(hostname)), _secure(secure), _port(port), _timeout(timeout), - _random(random) + _order(order) { // tell the event loop to monitor the filedescriptor of the pipe parent->onIdle(this, _pipe.in(), readable);