From 12ffe885c1216b0cba04a3a32c055bc3ce16dfd1 Mon Sep 17 00:00:00 2001 From: zerodefect Date: Thu, 19 Oct 2017 14:44:29 +0100 Subject: [PATCH 1/5] Cleaned up existing boost handler. There were a few bits that needed tidying up. --- include/libboostasio.h | 84 ++++++++++++++++++++++++++++++------------ 1 file changed, 61 insertions(+), 23 deletions(-) diff --git a/include/libboostasio.h b/include/libboostasio.h index b8e0758..9e90215 100644 --- a/include/libboostasio.h +++ b/include/libboostasio.h @@ -19,8 +19,19 @@ #include #include #include +#include +// boost::function< void(boost::system::error_code ec) > f = +#define STRAND_HANDLER(_fn) \ +[this, fn = _fn, strand = _strand](const boost::system::error_code ec) \ +{ \ + const std::shared_ptr apStrand = strand.lock(); \ + if (!apStrand) { return; } \ + \ + apStrand->dispatch(boost::bind(fn,ec)); \ +} + /** * Set up namespace */ @@ -87,10 +98,10 @@ private: * @param fd The file descriptor being watched. * @note The handler will get called if a read is cancelled. */ - void read_handler(boost::system::error_code ec, - std::weak_ptr awpWatcher, - TcpConnection *connection, - int fd) + void read_handler(const boost::system::error_code ec, + const std::weak_ptr awpWatcher, + TcpConnection *const connection, + const int fd) { // Resolve any potential problems with dangling pointers // (remember we are using async). @@ -128,10 +139,10 @@ private: * @param fd The file descriptor being watched. * @note The handler will get called if a write is cancelled. */ - void write_handler(boost::system::error_code ec, - std::weak_ptr awpWatcher, - TcpConnection *connection, - int fd) + void write_handler(const boost::system::error_code ec, + const std::weak_ptr awpWatcher, + TcpConnection *const connection, + const int fd) { // Resolve any potential problems with dangling pointers // (remember we are using async). @@ -168,14 +179,13 @@ private: * @param io_service The boost io_service * @param fd The filedescriptor being watched */ - Watcher(boost::asio::io_service &io_service, int fd) : + Watcher(boost::asio::io_service &io_service, const int fd) : _ioservice(io_service), _socket(_ioservice) { _socket.assign(fd); _socket.non_blocking(true); - } /** @@ -205,6 +215,7 @@ private: // 1. Handle reads? _read = ((events & AMQP::readable) != 0); + // Read requsted but no read pending? if (_read && !_read_pending) { _read_pending = true; @@ -226,6 +237,7 @@ private: // 2. Handle writes? _write = ((events & AMQP::writable) != 0); + // Write requested but no write pending? if (_write && !_write_pending) { _write_pending = true; @@ -247,8 +259,8 @@ private: }; /** - * Timer class to periodically fire a heartbeat - */ + * Timer class to periodically fire a heartbeat + */ class Timer : public std::enable_shared_from_this { private: @@ -258,7 +270,7 @@ private: * @var class boost::asio::io_service& */ boost::asio::io_service & _ioservice; - + /** * The boost asynchronous deadline timer. * @var class boost::asio::deadline_timer @@ -273,8 +285,8 @@ private: */ void timeout(const boost::system::error_code &ec, std::weak_ptr awpThis, - TcpConnection *connection, - uint16_t timeout) + TcpConnection *const connection, + const uint16_t timeout) { // Resolve any potential problems with dangling pointers // (remember we are using async). @@ -289,11 +301,21 @@ private: connection->heartbeat(); } - // Reschedule the timer for 1 second in the future: + // Reschedule the timer for the future: _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); // Posts the timer event - _timer.async_wait(boost::bind(&Timer::timeout,this,boost::placeholders::_1, shared_from_this(), connection, timeout)); + _timer.async_wait(boost::bind(&Timer::timeout, + this, + boost::placeholders::_1, +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this() +#else + shared_from_this(), +#endif + connection, + timeout)); } } @@ -345,9 +367,18 @@ private: // stop timer in case it was already set stop(); - _timer.expires_from_now(boost::posix_time::seconds(timeout)); - _timer.async_wait(boost::bind(&Timer::timeout,this,boost::placeholders::_1, shared_from_this(),connection, timeout)); + _timer.async_wait(boost::bind(&Timer::timeout, + this, + boost::placeholders::_1, +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this() +#else + shared_from_this(), +#endif + connection, + timeout)); } }; @@ -374,7 +405,9 @@ private: * @param fd The filedescriptor to be monitored * @param flags Should the object be monitored for readability or writability? */ - void monitor(TcpConnection *connection, int fd, int flags) override + void monitor(TcpConnection *const connection, + const int fd, + const int flags) override { // do we already have this filedescriptor auto iter = _watchers.find(fd); @@ -386,8 +419,13 @@ private: if (flags == 0){ return; } // construct a new pair (watcher/timer), and put it in the map - _watchers[fd] = std::make_shared(_ioservice, fd); - _watchers[fd]->events(connection, fd, flags); + const std::shared_ptr apWatcher = + std::make_shared(_ioservice, fd); + + _watchers[fd] = apWatcher; + + // explicitly set the events to monitor + apWatcher->events(connection, fd, flags); } else if (flags == 0) { @@ -431,7 +469,7 @@ public: /** * Constructor - * @param loop The boost io_service to wrap + * @param io_service The boost io_service to wrap */ explicit LibBoostAsioHandler(boost::asio::io_service &io_service) : _ioservice(io_service), From a9daf7e7699b598a61d89c15cb8d90cf31792e85 Mon Sep 17 00:00:00 2001 From: zerodefect Date: Thu, 19 Oct 2017 15:59:40 +0100 Subject: [PATCH 2/5] Added strand to LibBoostAsioHandler.h to ensure that 1 thread goes through the 'gate' (from the io_service set of threads) at any one time. --- include/libboostasio.h | 163 +++++++++++++++++++++++++++-------------- 1 file changed, 110 insertions(+), 53 deletions(-) diff --git a/include/libboostasio.h b/include/libboostasio.h index 9e90215..5ed6489 100644 --- a/include/libboostasio.h +++ b/include/libboostasio.h @@ -16,18 +16,39 @@ /** * Dependencies */ +#include + #include #include #include #include -// boost::function< void(boost::system::error_code ec) > f = -#define STRAND_HANDLER(_fn) \ -[this, fn = _fn, strand = _strand](const boost::system::error_code ec) \ +/////////////////////////////////////////////////////////////////// +#define STRAND_SOCKET_HANDLER(_fn) \ +[fn = _fn, strand = _strand](const boost::system::error_code &ec, \ + const std::size_t bytes_transferred) \ { \ const std::shared_ptr apStrand = strand.lock(); \ - if (!apStrand) { return; } \ + if (!apStrand) \ + { \ + fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled),std::size_t{0}); \ + return; \ + } \ + \ + apStrand->dispatch(boost::bind(fn,ec,bytes_transferred)); \ +} + +/////////////////////////////////////////////////////////////////// +#define STRAND_TIMER_HANDLER(_fn) \ +[fn = _fn, strand = _strand](const boost::system::error_code &ec) \ +{ \ + const std::shared_ptr apStrand = strand.lock(); \ + if (!apStrand) \ + { \ + fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); \ + return; \ + } \ \ apStrand->dispatch(boost::bind(fn,ec)); \ } @@ -57,6 +78,12 @@ private: * @var class boost::asio::io_service& */ boost::asio::io_service & _ioservice; + + /** + * The boost asio io_service::strand managed pointer. + * @var class std::shared_ptr + */ + std::weak_ptr _strand; /** * The boost tcp socket. @@ -98,7 +125,8 @@ private: * @param fd The file descriptor being watched. * @note The handler will get called if a read is cancelled. */ - void read_handler(const boost::system::error_code ec, + void read_handler(const boost::system::error_code &ec, + const std::size_t bytes_transferred, const std::weak_ptr awpWatcher, TcpConnection *const connection, const int fd) @@ -115,19 +143,21 @@ private: connection->process(fd, AMQP::readable); _read_pending = true; - - _socket.async_read_some(boost::asio::null_buffers(), - boost::bind(&Watcher::read_handler, - this, - boost::placeholders::_1, + + _socket.async_read_some(boost::asio::null_buffers(), + STRAND_SOCKET_HANDLER( + boost::bind(&Watcher::read_handler, + this, + boost::placeholders::_1, + boost::placeholders::_2, // C++17 has 'weak_from_this()' support. #if __cplusplus >= 201701L - weak_from_this() + weak_from_this() #else - shared_from_this(), + shared_from_this(), #endif - connection, - fd)); + connection, + fd))); } } @@ -140,6 +170,7 @@ private: * @note The handler will get called if a write is cancelled. */ void write_handler(const boost::system::error_code ec, + const std::size_t bytes_transferred, const std::weak_ptr awpWatcher, TcpConnection *const connection, const int fd) @@ -158,17 +189,19 @@ private: _write_pending = true; _socket.async_write_some(boost::asio::null_buffers(), - boost::bind(&Watcher::write_handler, - this, - boost::placeholders::_1, + STRAND_SOCKET_HANDLER( + boost::bind(&Watcher::write_handler, + this, + boost::placeholders::_1, + boost::placeholders::_2, // C++17 has 'weak_from_this()' support. #if __cplusplus >= 201701L - weak_from_this() + weak_from_this() #else - shared_from_this(), + shared_from_this(), #endif - connection, - fd)); + connection, + fd))); } } @@ -179,8 +212,11 @@ private: * @param io_service The boost io_service * @param fd The filedescriptor being watched */ - Watcher(boost::asio::io_service &io_service, const int fd) : + Watcher(boost::asio::io_service &io_service, + const std::weak_ptr strand, + const int fd) : _ioservice(io_service), + _strand(strand), _socket(_ioservice) { _socket.assign(fd); @@ -221,17 +257,19 @@ private: _read_pending = true; _socket.async_read_some(boost::asio::null_buffers(), - boost::bind(&Watcher::read_handler, - this, - boost::placeholders::_1, + STRAND_SOCKET_HANDLER( + boost::bind(&Watcher::read_handler, + this, + boost::placeholders::_1, + boost::placeholders::_2, // C++17 has 'weak_from_this()' support. #if __cplusplus >= 201701L - weak_from_this() + weak_from_this() #else - shared_from_this(), + shared_from_this(), #endif - connection, - fd)); + connection, + fd))); } // 2. Handle writes? @@ -243,17 +281,19 @@ private: _write_pending = true; _socket.async_write_some(boost::asio::null_buffers(), - boost::bind(&Watcher::write_handler, - this, - boost::placeholders::_1, + STRAND_SOCKET_HANDLER( + boost::bind(&Watcher::write_handler, + this, + boost::placeholders::_1, + boost::placeholders::_2, // C++17 has 'weak_from_this()' support. #if __cplusplus >= 201701L - weak_from_this() + weak_from_this() #else - shared_from_this(), + shared_from_this(), #endif - connection, - fd)); + connection, + fd))); } } }; @@ -271,6 +311,12 @@ private: */ boost::asio::io_service & _ioservice; + /** + * The boost asio io_service::strand managed pointer. + * @var class std::shared_ptr + */ + std::weak_ptr _strand; + /** * The boost asynchronous deadline timer. * @var class boost::asio::deadline_timer @@ -305,17 +351,18 @@ private: _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); // Posts the timer event - _timer.async_wait(boost::bind(&Timer::timeout, - this, - boost::placeholders::_1, + _timer.async_wait(STRAND_TIMER_HANDLER( + boost::bind(&Timer::timeout, + this, + boost::placeholders::_1, // C++17 has 'weak_from_this()' support. #if __cplusplus >= 201701L - weak_from_this() + weak_from_this() #else - shared_from_this(), + shared_from_this(), #endif - connection, - timeout)); + connection, + timeout))); } } @@ -333,8 +380,10 @@ private: * Constructor * @param loop The current event loop */ - Timer(boost::asio::io_service &io_service) : + Timer(boost::asio::io_service &io_service, + const std::weak_ptr strand) : _ioservice(io_service), + _strand(strand), _timer(_ioservice) { @@ -368,17 +417,18 @@ private: stop(); _timer.expires_from_now(boost::posix_time::seconds(timeout)); - _timer.async_wait(boost::bind(&Timer::timeout, - this, - boost::placeholders::_1, + _timer.async_wait(STRAND_TIMER_HANDLER( + boost::bind(&Timer::timeout, + this, + boost::placeholders::_1, // C++17 has 'weak_from_this()' support. #if __cplusplus >= 201701L - weak_from_this() + weak_from_this() #else - shared_from_this(), + shared_from_this(), #endif - connection, - timeout)); + connection, + timeout))); } }; @@ -388,6 +438,12 @@ private: */ boost::asio::io_service & _ioservice; + /** + * The boost asio io_service::strand managed pointer. + * @var class std::shared_ptr + */ + std::shared_ptr _strand; + /** * All I/O watchers that are active, indexed by their filedescriptor @@ -420,7 +476,7 @@ private: // construct a new pair (watcher/timer), and put it in the map const std::shared_ptr apWatcher = - std::make_shared(_ioservice, fd); + std::make_shared(_ioservice, _strand, fd); _watchers[fd] = apWatcher; @@ -473,7 +529,8 @@ public: */ explicit LibBoostAsioHandler(boost::asio::io_service &io_service) : _ioservice(io_service), - _timer(std::make_shared(_ioservice)) + _strand(std::make_shared(_ioservice)), + _timer(std::make_shared(_ioservice,_strand)) { } From cdd9918e6a79ac6297988cd3d796d1898cb74035 Mon Sep 17 00:00:00 2001 From: zerodefect Date: Thu, 19 Oct 2017 16:06:41 +0100 Subject: [PATCH 3/5] Cleaned up docs/comments in LibBoostAsioHandler --- include/libboostasio.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/include/libboostasio.h b/include/libboostasio.h index 5ed6489..c1c92cf 100644 --- a/include/libboostasio.h +++ b/include/libboostasio.h @@ -120,6 +120,7 @@ private: /** * Handler method that is called by boost's io_service when the socket pumps a read event. * @param ec The status of the callback. + * @param bytes_transferred The number of bytes transferred. * @param awpWatcher A weak pointer to this object. * @param connection The connection being watched. * @param fd The file descriptor being watched. @@ -164,6 +165,7 @@ private: /** * Handler method that is called by boost's io_service when the socket pumps a write event. * @param ec The status of the callback. + * @param bytes_transferred The number of bytes transferred. * @param awpWatcher A weak pointer to this object. * @param connection The connection being watched. * @param fd The file descriptor being watched. @@ -210,6 +212,7 @@ private: * Constructor- initialises the watcher and assigns the filedescriptor to * a boost socket for monitoring. * @param io_service The boost io_service + * @param strand A weak pointer to a io_service::strand instance. * @param fd The filedescriptor being watched */ Watcher(boost::asio::io_service &io_service, @@ -378,7 +381,8 @@ private: public: /** * Constructor - * @param loop The current event loop + * @param io_service The boost asio io_service. + * @param strand A weak pointer to a io_service::strand instance. */ Timer(boost::asio::io_service &io_service, const std::weak_ptr strand) : From 5410f1abdd8a155f6a072c24a47ec49c5079d881 Mon Sep 17 00:00:00 2001 From: zerodefect Date: Thu, 19 Oct 2017 16:14:21 +0100 Subject: [PATCH 4/5] Replaced tabs with spaces in LibBoostAsioHandler --- include/libboostasio.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/libboostasio.h b/include/libboostasio.h index c1c92cf..0a76c76 100644 --- a/include/libboostasio.h +++ b/include/libboostasio.h @@ -33,8 +33,8 @@ if (!apStrand) \ { \ fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled),std::size_t{0}); \ - return; \ - } \ + return; \ + } \ \ apStrand->dispatch(boost::bind(fn,ec,bytes_transferred)); \ } @@ -47,8 +47,8 @@ if (!apStrand) \ { \ fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); \ - return; \ - } \ + return; \ + } \ \ apStrand->dispatch(boost::bind(fn,ec)); \ } From 034e72bbc2aa3924bb5aac3a0d1238e3781b25b9 Mon Sep 17 00:00:00 2001 From: zerodefect Date: Mon, 23 Oct 2017 11:40:49 +0100 Subject: [PATCH 5/5] Added example/test application to demonstrate boost asio io_service handler. --- include/libboostasio.h | 21 ++++++++------- tests/libboostasio.cpp | 59 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 9 deletions(-) create mode 100644 tests/libboostasio.cpp diff --git a/include/libboostasio.h b/include/libboostasio.h index 0a76c76..2df986c 100644 --- a/include/libboostasio.h +++ b/include/libboostasio.h @@ -3,9 +3,11 @@ * * Implementation for the AMQP::TcpHandler that is optimized for boost::asio. You can * use this class instead of a AMQP::TcpHandler class, just pass the boost asio service - * to the constructor and you're all set + * to the constructor and you're all set. See tests/libboostasio.cpp for example. * * @author Gavin Smith + * + * */ /** @@ -19,9 +21,10 @@ #include #include +#include +#include #include #include -#include /////////////////////////////////////////////////////////////////// @@ -78,7 +81,7 @@ private: * @var class boost::asio::io_service& */ boost::asio::io_service & _ioservice; - + /** * The boost asio io_service::strand managed pointer. * @var class std::shared_ptr @@ -127,7 +130,7 @@ private: * @note The handler will get called if a read is cancelled. */ void read_handler(const boost::system::error_code &ec, - const std::size_t bytes_transferred, + const std::size_t bytes_transferred, const std::weak_ptr awpWatcher, TcpConnection *const connection, const int fd) @@ -144,8 +147,8 @@ private: connection->process(fd, AMQP::readable); _read_pending = true; - - _socket.async_read_some(boost::asio::null_buffers(), + + _socket.async_read_some(boost::asio::null_buffers(), STRAND_SOCKET_HANDLER( boost::bind(&Watcher::read_handler, this, @@ -313,13 +316,13 @@ private: * @var class boost::asio::io_service& */ boost::asio::io_service & _ioservice; - + /** * The boost asio io_service::strand managed pointer. * @var class std::shared_ptr */ std::weak_ptr _strand; - + /** * The boost asynchronous deadline timer. * @var class boost::asio::deadline_timer @@ -483,7 +486,7 @@ private: std::make_shared(_ioservice, _strand, fd); _watchers[fd] = apWatcher; - + // explicitly set the events to monitor apWatcher->events(connection, fd, flags); } diff --git a/tests/libboostasio.cpp b/tests/libboostasio.cpp new file mode 100644 index 0000000..ee9555c --- /dev/null +++ b/tests/libboostasio.cpp @@ -0,0 +1,59 @@ +/** + * LibBoostAsio.cpp + * + * Test program to check AMQP functionality based on Boost's asio io_service. + * + * @author Gavin Smith + * + * Compile with g++ libboostasio.cpp -o boost_test -lpthread -lboost_system -lamqpcpp + */ + +/** + * Dependencies + */ +#include +#include +#include + + +#include +#include + +/** + * Main program + * @return int + */ +int main() +{ + + // access to the boost asio handler + // note: we suggest use of 2 threads - normally one is fin (we are simply demonstrating thread safety). + boost::asio::io_service service(4); + + // create a work object to process our events. + boost::asio::io_service::work work(service); + + // handler for libev + AMQP::LibBoostAsioHandler handler(service); + + // make a connection + AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost/")); + + // we need a channel too + AMQP::TcpChannel channel(&connection); + + // create a temporary queue + channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) { + + // report the name of the temporary queue + std::cout << "declared queue " << name << std::endl; + + // now we can close the connection + connection.close(); + }); + + // run the handler + // a t the moment, one will need SIGINT to stop. In time, should add signal handling through boost API. + return service.run(); +} +