Cleaned up the handler upon conducting some additional testing.

This commit is contained in:
zerodefect 2017-10-12 17:11:38 +01:00
parent e9cb9fbe92
commit d1b2139af0
1 changed files with 100 additions and 21 deletions

View File

@ -13,6 +13,14 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <boost/asio/io_service.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/bind.hpp>
/** /**
* Set up namespace * Set up namespace
*/ */
@ -20,7 +28,7 @@ namespace AMQP {
/** /**
* Class definition * Class definition
* @note Because of a limitation with boost::asio on Windows, this will only work on POSIX based systems - see https://github.com/chriskohlhoff/asio/issues/70 * @note Because of a limitation on Windows, this will only work on POSIX based systems - see https://github.com/chriskohlhoff/asio/issues/70
*/ */
class LibBoostAsioHandler : public virtual TcpHandler class LibBoostAsioHandler : public virtual TcpHandler
{ {
@ -29,7 +37,7 @@ private:
/** /**
* Helper class that wraps a boost io_service socket monitor. * Helper class that wraps a boost io_service socket monitor.
*/ */
class Watcher class Watcher : public virtual std::enable_shared_from_this<Watcher>
{ {
private: private:
@ -51,31 +59,62 @@ private:
* A boolean that indicates if the watcher is monitoring for read events. * A boolean that indicates if the watcher is monitoring for read events.
* @var _read True if reads are being monitored else false. * @var _read True if reads are being monitored else false.
*/ */
bool _read{false}; bool _read{false};
/**
* A boolean that indicates if the watcher has a pending read event.
* @var _read True if read is pending else false.
*/
bool _read_pending{false};
/** /**
* A boolean that indicates if the watcher is monitoring for write events. * A boolean that indicates if the watcher is monitoring for write events.
* @var _read True if writes are being monitored else false. * @var _read True if writes are being monitored else false.
*/ */
bool _write{false}; bool _write{false};
/**
* A boolean that indicates if the watcher has a pending write event.
* @var _read True if read is pending else false.
*/
bool _write_pending{false};
/** /**
* Handler method that is called by boost's io_service when the socket pumps a read event. * Handler method that is called by boost's io_service when the socket pumps a read event.
* @param ec The status of the callback. * @param ec The status of the callback.
* @param awpWatcher A weak pointer to this object.
* @param connection The connection being watched. * @param connection The connection being watched.
* @param fd The file descriptor being watched. * @param fd The file descriptor being watched.
* @note The handler will get called if a read is cancelled. * @note The handler will get called if a read is cancelled.
*/ */
void read_handler(boost::system::error_code ec, TcpConnection *connection, int fd) void read_handler(boost::system::error_code ec,
std::weak_ptr<Watcher> awpWatcher,
TcpConnection *connection,
int fd)
{ {
if (!ec && _read) // Resolve any potential problems with dangling pointers
// (remember we are using async).
const std::shared_ptr<Watcher> apWatcher = awpWatcher.lock();
if (!apWatcher) { return; }
_read_pending = false;
if ((!ec || ec == boost::asio::error::would_block) && _read)
{ {
connection->process(fd, AMQP::readable); connection->process(fd, AMQP::readable);
_read_pending = true;
_socket.async_read_some(boost::asio::null_buffers(), _socket.async_read_some(boost::asio::null_buffers(),
boost::bind(&Watcher::read_handler, boost::bind(&Watcher::read_handler,
this, this,
boost::placeholders::_1, boost::placeholders::_1,
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this()
#else
shared_from_this(),
#endif
connection, connection,
fd)); fd));
} }
@ -84,20 +123,39 @@ private:
/** /**
* Handler method that is called by boost's io_service when the socket pumps a write event. * Handler method that is called by boost's io_service when the socket pumps a write event.
* @param ec The status of the callback. * @param ec The status of the callback.
* @param awpWatcher A weak pointer to this object.
* @param connection The connection being watched. * @param connection The connection being watched.
* @param fd The file descriptor being watched. * @param fd The file descriptor being watched.
* @note The handler will get called if a write is cancelled. * @note The handler will get called if a write is cancelled.
*/ */
void write_handler(boost::system::error_code ec, TcpConnection *connection, int fd) void write_handler(boost::system::error_code ec,
std::weak_ptr<Watcher> awpWatcher,
TcpConnection *connection,
int fd)
{ {
if (!ec && _write) // Resolve any potential problems with dangling pointers
// (remember we are using async).
const std::shared_ptr<Watcher> apWatcher = awpWatcher.lock();
if (!apWatcher) { return; }
_write_pending = false;
if ((!ec || ec == boost::asio::error::would_block) && _write)
{ {
connection->process(fd, AMQP::writable); connection->process(fd, AMQP::writable);
_write_pending = true;
_socket.async_write_some(boost::asio::null_buffers(), _socket.async_write_some(boost::asio::null_buffers(),
boost::bind(&Watcher::write_handler, boost::bind(&Watcher::write_handler,
this, this,
boost::placeholders::_1, boost::placeholders::_1,
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this()
#else
shared_from_this(),
#endif
connection, connection,
fd)); fd));
} }
@ -105,13 +163,12 @@ private:
public: public:
/** /**
* Constructor * Constructor- initialises the watcher and assigns the filedescriptor to
* a boost socket for monitoring.
* @param io_service The boost io_service * @param io_service The boost io_service
* @param connection The connection being watched
* @param fd The filedescriptor being watched * @param fd The filedescriptor being watched
* @param events The events that should be monitored
*/ */
Watcher(boost::asio::io_service &io_service, TcpConnection *connection, int fd, int events) : Watcher(boost::asio::io_service &io_service, int fd) :
_ioservice(io_service), _ioservice(io_service),
_socket(_ioservice) _socket(_ioservice)
{ {
@ -119,8 +176,6 @@ private:
_socket.non_blocking(true); _socket.non_blocking(true);
// configure monitoring
this->events(connection,fd,events);
} }
/** /**
@ -136,6 +191,8 @@ private:
*/ */
~Watcher() ~Watcher()
{ {
_read = false;
_write = false;
_socket.release(); _socket.release();
} }
@ -145,18 +202,23 @@ private:
*/ */
void events(TcpConnection *connection, int fd, int events) void events(TcpConnection *connection, int fd, int events)
{ {
bool bRead(_read);
bool bWrite(_write);
// 1. Handle reads? // 1. Handle reads?
_read = ((events & AMQP::readable) != 0); _read = ((events & AMQP::readable) != 0);
if (!bRead && _read) if (_read && !_read_pending)
{ {
_read_pending = true;
_socket.async_read_some(boost::asio::null_buffers(), _socket.async_read_some(boost::asio::null_buffers(),
boost::bind(&Watcher::read_handler, boost::bind(&Watcher::read_handler,
this, this,
boost::placeholders::_1, boost::placeholders::_1,
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this()
#else
shared_from_this(),
#endif
connection, connection,
fd)); fd));
} }
@ -164,12 +226,20 @@ private:
// 2. Handle writes? // 2. Handle writes?
_write = ((events & AMQP::writable) != 0); _write = ((events & AMQP::writable) != 0);
if (!bWrite && _write) if (_write && !_write_pending)
{ {
_write_pending = true;
_socket.async_write_some(boost::asio::null_buffers(), _socket.async_write_some(boost::asio::null_buffers(),
boost::bind(&Watcher::write_handler, boost::bind(&Watcher::write_handler,
this, this,
boost::placeholders::_1, boost::placeholders::_1,
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this()
#else
shared_from_this(),
#endif
connection, connection,
fd)); fd));
} }
@ -187,7 +257,7 @@ private:
* All I/O watchers that are active, indexed by their filedescriptor * All I/O watchers that are active, indexed by their filedescriptor
* @var std::map<int,Watcher> * @var std::map<int,Watcher>
*/ */
std::map<int, std::unique_ptr<Watcher> > _watchers; std::map<int, std::shared_ptr<Watcher> > _watchers;
/** /**
@ -208,7 +278,8 @@ private:
if (flags == 0){ return; } if (flags == 0){ return; }
// construct a new pair (watcher/timer), and put it in the map // construct a new pair (watcher/timer), and put it in the map
_watchers[fd] = std::make_unique<Watcher>(_ioservice, connection, fd, flags); _watchers[fd] = std::make_shared<Watcher>(_ioservice, fd);
_watchers[fd]->events(connection, fd, flags);
} }
else if (flags == 0) else if (flags == 0)
{ {
@ -246,6 +317,14 @@ public:
LibBoostAsioHandler(LibBoostAsioHandler &&that) = delete; LibBoostAsioHandler(LibBoostAsioHandler &&that) = delete;
LibBoostAsioHandler(const LibBoostAsioHandler &that) = delete; LibBoostAsioHandler(const LibBoostAsioHandler &that) = delete;
/**
* Returns a reference to the boost io_service object that is being used.
* @return The boost io_service object.
*/
boost::asio::io_service &service()
{
return _ioservice;
}
/** /**
* Destructor * Destructor