From d1b2139af05b215550386dc9792b51130f384190 Mon Sep 17 00:00:00 2001 From: zerodefect Date: Thu, 12 Oct 2017 17:11:38 +0100 Subject: [PATCH] Cleaned up the handler upon conducting some additional testing. --- include/libboostasio.h | 121 ++++++++++++++++++++++++++++++++++------- 1 file changed, 100 insertions(+), 21 deletions(-) diff --git a/include/libboostasio.h b/include/libboostasio.h index dc815ec..897beb3 100644 --- a/include/libboostasio.h +++ b/include/libboostasio.h @@ -13,6 +13,14 @@ */ #pragma once +/** + * Dependencies + */ +#include +#include +#include + + /** * Set up namespace */ @@ -20,7 +28,7 @@ namespace AMQP { /** * Class definition - * @note Because of a limitation with boost::asio on Windows, this will only work on POSIX based systems - see https://github.com/chriskohlhoff/asio/issues/70 + * @note Because of a limitation on Windows, this will only work on POSIX based systems - see https://github.com/chriskohlhoff/asio/issues/70 */ class LibBoostAsioHandler : public virtual TcpHandler { @@ -29,7 +37,7 @@ private: /** * Helper class that wraps a boost io_service socket monitor. */ - class Watcher + class Watcher : public virtual std::enable_shared_from_this { private: @@ -51,31 +59,62 @@ private: * A boolean that indicates if the watcher is monitoring for read events. * @var _read True if reads are being monitored else false. */ - bool _read{false}; + bool _read{false}; + + /** + * A boolean that indicates if the watcher has a pending read event. + * @var _read True if read is pending else false. + */ + bool _read_pending{false}; /** * A boolean that indicates if the watcher is monitoring for write events. * @var _read True if writes are being monitored else false. */ - bool _write{false}; + bool _write{false}; + + /** + * A boolean that indicates if the watcher has a pending write event. + * @var _read True if read is pending else false. + */ + bool _write_pending{false}; /** * Handler method that is called by boost's io_service when the socket pumps a read event. * @param ec The status of the callback. + * @param awpWatcher A weak pointer to this object. * @param connection The connection being watched. * @param fd The file descriptor being watched. * @note The handler will get called if a read is cancelled. */ - void read_handler(boost::system::error_code ec, TcpConnection *connection, int fd) + void read_handler(boost::system::error_code ec, + std::weak_ptr awpWatcher, + TcpConnection *connection, + int fd) { - if (!ec && _read) + // Resolve any potential problems with dangling pointers + // (remember we are using async). + const std::shared_ptr apWatcher = awpWatcher.lock(); + if (!apWatcher) { return; } + + _read_pending = false; + + if ((!ec || ec == boost::asio::error::would_block) && _read) { connection->process(fd, AMQP::readable); + _read_pending = true; + _socket.async_read_some(boost::asio::null_buffers(), boost::bind(&Watcher::read_handler, this, boost::placeholders::_1, +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this() +#else + shared_from_this(), +#endif connection, fd)); } @@ -84,20 +123,39 @@ private: /** * Handler method that is called by boost's io_service when the socket pumps a write event. * @param ec The status of the callback. + * @param awpWatcher A weak pointer to this object. * @param connection The connection being watched. * @param fd The file descriptor being watched. * @note The handler will get called if a write is cancelled. */ - void write_handler(boost::system::error_code ec, TcpConnection *connection, int fd) + void write_handler(boost::system::error_code ec, + std::weak_ptr awpWatcher, + TcpConnection *connection, + int fd) { - if (!ec && _write) + // Resolve any potential problems with dangling pointers + // (remember we are using async). + const std::shared_ptr apWatcher = awpWatcher.lock(); + if (!apWatcher) { return; } + + _write_pending = false; + + if ((!ec || ec == boost::asio::error::would_block) && _write) { connection->process(fd, AMQP::writable); + _write_pending = true; + _socket.async_write_some(boost::asio::null_buffers(), boost::bind(&Watcher::write_handler, this, boost::placeholders::_1, +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this() +#else + shared_from_this(), +#endif connection, fd)); } @@ -105,13 +163,12 @@ private: public: /** - * Constructor + * Constructor- initialises the watcher and assigns the filedescriptor to + * a boost socket for monitoring. * @param io_service The boost io_service - * @param connection The connection being watched * @param fd The filedescriptor being watched - * @param events The events that should be monitored */ - Watcher(boost::asio::io_service &io_service, TcpConnection *connection, int fd, int events) : + Watcher(boost::asio::io_service &io_service, int fd) : _ioservice(io_service), _socket(_ioservice) { @@ -119,8 +176,6 @@ private: _socket.non_blocking(true); - // configure monitoring - this->events(connection,fd,events); } /** @@ -136,6 +191,8 @@ private: */ ~Watcher() { + _read = false; + _write = false; _socket.release(); } @@ -145,18 +202,23 @@ private: */ void events(TcpConnection *connection, int fd, int events) { - bool bRead(_read); - bool bWrite(_write); - // 1. Handle reads? _read = ((events & AMQP::readable) != 0); - if (!bRead && _read) + if (_read && !_read_pending) { + _read_pending = true; + _socket.async_read_some(boost::asio::null_buffers(), boost::bind(&Watcher::read_handler, this, boost::placeholders::_1, +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this() +#else + shared_from_this(), +#endif connection, fd)); } @@ -164,12 +226,20 @@ private: // 2. Handle writes? _write = ((events & AMQP::writable) != 0); - if (!bWrite && _write) + if (_write && !_write_pending) { + _write_pending = true; + _socket.async_write_some(boost::asio::null_buffers(), boost::bind(&Watcher::write_handler, this, boost::placeholders::_1, +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this() +#else + shared_from_this(), +#endif connection, fd)); } @@ -187,7 +257,7 @@ private: * All I/O watchers that are active, indexed by their filedescriptor * @var std::map */ - std::map > _watchers; + std::map > _watchers; /** @@ -208,7 +278,8 @@ private: if (flags == 0){ return; } // construct a new pair (watcher/timer), and put it in the map - _watchers[fd] = std::make_unique(_ioservice, connection, fd, flags); + _watchers[fd] = std::make_shared(_ioservice, fd); + _watchers[fd]->events(connection, fd, flags); } else if (flags == 0) { @@ -246,6 +317,14 @@ public: LibBoostAsioHandler(LibBoostAsioHandler &&that) = delete; LibBoostAsioHandler(const LibBoostAsioHandler &that) = delete; + /** + * Returns a reference to the boost io_service object that is being used. + * @return The boost io_service object. + */ + boost::asio::io_service &service() + { + return _ioservice; + } /** * Destructor