Added strand to LibBoostAsioHandler.h to ensure that 1 thread goes through the 'gate' (from the io_service set of threads) at any one time.

This commit is contained in:
zerodefect 2017-10-19 15:59:40 +01:00
parent 12ffe885c1
commit a9daf7e769
1 changed files with 110 additions and 53 deletions

View File

@ -16,18 +16,39 @@
/** /**
* Dependencies * Dependencies
*/ */
#include <memory>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_service.hpp>
#include <boost/asio/posix/stream_descriptor.hpp> #include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/function.hpp> #include <boost/function.hpp>
// boost::function< void(boost::system::error_code ec) > f = ///////////////////////////////////////////////////////////////////
#define STRAND_HANDLER(_fn) \ #define STRAND_SOCKET_HANDLER(_fn) \
[this, fn = _fn, strand = _strand](const boost::system::error_code ec) \ [fn = _fn, strand = _strand](const boost::system::error_code &ec, \
const std::size_t bytes_transferred) \
{ \ { \
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock(); \ const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock(); \
if (!apStrand) { return; } \ if (!apStrand) \
{ \
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled),std::size_t{0}); \
return; \
} \
\
apStrand->dispatch(boost::bind(fn,ec,bytes_transferred)); \
}
///////////////////////////////////////////////////////////////////
#define STRAND_TIMER_HANDLER(_fn) \
[fn = _fn, strand = _strand](const boost::system::error_code &ec) \
{ \
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock(); \
if (!apStrand) \
{ \
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); \
return; \
} \
\ \
apStrand->dispatch(boost::bind(fn,ec)); \ apStrand->dispatch(boost::bind(fn,ec)); \
} }
@ -58,6 +79,12 @@ private:
*/ */
boost::asio::io_service & _ioservice; boost::asio::io_service & _ioservice;
/**
* The boost asio io_service::strand managed pointer.
* @var class std::shared_ptr<boost::asio::io_service>
*/
std::weak_ptr<boost::asio::io_service::strand> _strand;
/** /**
* The boost tcp socket. * The boost tcp socket.
* @var class boost::asio::ip::tcp::socket * @var class boost::asio::ip::tcp::socket
@ -98,7 +125,8 @@ private:
* @param fd The file descriptor being watched. * @param fd The file descriptor being watched.
* @note The handler will get called if a read is cancelled. * @note The handler will get called if a read is cancelled.
*/ */
void read_handler(const boost::system::error_code ec, void read_handler(const boost::system::error_code &ec,
const std::size_t bytes_transferred,
const std::weak_ptr<Watcher> awpWatcher, const std::weak_ptr<Watcher> awpWatcher,
TcpConnection *const connection, TcpConnection *const connection,
const int fd) const int fd)
@ -117,17 +145,19 @@ private:
_read_pending = true; _read_pending = true;
_socket.async_read_some(boost::asio::null_buffers(), _socket.async_read_some(boost::asio::null_buffers(),
boost::bind(&Watcher::read_handler, STRAND_SOCKET_HANDLER(
this, boost::bind(&Watcher::read_handler,
boost::placeholders::_1, this,
boost::placeholders::_1,
boost::placeholders::_2,
// C++17 has 'weak_from_this()' support. // C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L #if __cplusplus >= 201701L
weak_from_this() weak_from_this()
#else #else
shared_from_this(), shared_from_this(),
#endif #endif
connection, connection,
fd)); fd)));
} }
} }
@ -140,6 +170,7 @@ private:
* @note The handler will get called if a write is cancelled. * @note The handler will get called if a write is cancelled.
*/ */
void write_handler(const boost::system::error_code ec, void write_handler(const boost::system::error_code ec,
const std::size_t bytes_transferred,
const std::weak_ptr<Watcher> awpWatcher, const std::weak_ptr<Watcher> awpWatcher,
TcpConnection *const connection, TcpConnection *const connection,
const int fd) const int fd)
@ -158,17 +189,19 @@ private:
_write_pending = true; _write_pending = true;
_socket.async_write_some(boost::asio::null_buffers(), _socket.async_write_some(boost::asio::null_buffers(),
boost::bind(&Watcher::write_handler, STRAND_SOCKET_HANDLER(
this, boost::bind(&Watcher::write_handler,
boost::placeholders::_1, this,
boost::placeholders::_1,
boost::placeholders::_2,
// C++17 has 'weak_from_this()' support. // C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L #if __cplusplus >= 201701L
weak_from_this() weak_from_this()
#else #else
shared_from_this(), shared_from_this(),
#endif #endif
connection, connection,
fd)); fd)));
} }
} }
@ -179,8 +212,11 @@ private:
* @param io_service The boost io_service * @param io_service The boost io_service
* @param fd The filedescriptor being watched * @param fd The filedescriptor being watched
*/ */
Watcher(boost::asio::io_service &io_service, const int fd) : Watcher(boost::asio::io_service &io_service,
const std::weak_ptr<boost::asio::io_service::strand> strand,
const int fd) :
_ioservice(io_service), _ioservice(io_service),
_strand(strand),
_socket(_ioservice) _socket(_ioservice)
{ {
_socket.assign(fd); _socket.assign(fd);
@ -221,17 +257,19 @@ private:
_read_pending = true; _read_pending = true;
_socket.async_read_some(boost::asio::null_buffers(), _socket.async_read_some(boost::asio::null_buffers(),
boost::bind(&Watcher::read_handler, STRAND_SOCKET_HANDLER(
this, boost::bind(&Watcher::read_handler,
boost::placeholders::_1, this,
boost::placeholders::_1,
boost::placeholders::_2,
// C++17 has 'weak_from_this()' support. // C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L #if __cplusplus >= 201701L
weak_from_this() weak_from_this()
#else #else
shared_from_this(), shared_from_this(),
#endif #endif
connection, connection,
fd)); fd)));
} }
// 2. Handle writes? // 2. Handle writes?
@ -243,17 +281,19 @@ private:
_write_pending = true; _write_pending = true;
_socket.async_write_some(boost::asio::null_buffers(), _socket.async_write_some(boost::asio::null_buffers(),
boost::bind(&Watcher::write_handler, STRAND_SOCKET_HANDLER(
this, boost::bind(&Watcher::write_handler,
boost::placeholders::_1, this,
boost::placeholders::_1,
boost::placeholders::_2,
// C++17 has 'weak_from_this()' support. // C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L #if __cplusplus >= 201701L
weak_from_this() weak_from_this()
#else #else
shared_from_this(), shared_from_this(),
#endif #endif
connection, connection,
fd)); fd)));
} }
} }
}; };
@ -271,6 +311,12 @@ private:
*/ */
boost::asio::io_service & _ioservice; boost::asio::io_service & _ioservice;
/**
* The boost asio io_service::strand managed pointer.
* @var class std::shared_ptr<boost::asio::io_service>
*/
std::weak_ptr<boost::asio::io_service::strand> _strand;
/** /**
* The boost asynchronous deadline timer. * The boost asynchronous deadline timer.
* @var class boost::asio::deadline_timer * @var class boost::asio::deadline_timer
@ -305,17 +351,18 @@ private:
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
// Posts the timer event // Posts the timer event
_timer.async_wait(boost::bind(&Timer::timeout, _timer.async_wait(STRAND_TIMER_HANDLER(
this, boost::bind(&Timer::timeout,
boost::placeholders::_1, this,
boost::placeholders::_1,
// C++17 has 'weak_from_this()' support. // C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L #if __cplusplus >= 201701L
weak_from_this() weak_from_this()
#else #else
shared_from_this(), shared_from_this(),
#endif #endif
connection, connection,
timeout)); timeout)));
} }
} }
@ -333,8 +380,10 @@ private:
* Constructor * Constructor
* @param loop The current event loop * @param loop The current event loop
*/ */
Timer(boost::asio::io_service &io_service) : Timer(boost::asio::io_service &io_service,
const std::weak_ptr<boost::asio::io_service::strand> strand) :
_ioservice(io_service), _ioservice(io_service),
_strand(strand),
_timer(_ioservice) _timer(_ioservice)
{ {
@ -368,17 +417,18 @@ private:
stop(); stop();
_timer.expires_from_now(boost::posix_time::seconds(timeout)); _timer.expires_from_now(boost::posix_time::seconds(timeout));
_timer.async_wait(boost::bind(&Timer::timeout, _timer.async_wait(STRAND_TIMER_HANDLER(
this, boost::bind(&Timer::timeout,
boost::placeholders::_1, this,
boost::placeholders::_1,
// C++17 has 'weak_from_this()' support. // C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L #if __cplusplus >= 201701L
weak_from_this() weak_from_this()
#else #else
shared_from_this(), shared_from_this(),
#endif #endif
connection, connection,
timeout)); timeout)));
} }
}; };
@ -388,6 +438,12 @@ private:
*/ */
boost::asio::io_service & _ioservice; boost::asio::io_service & _ioservice;
/**
* The boost asio io_service::strand managed pointer.
* @var class std::shared_ptr<boost::asio::io_service>
*/
std::shared_ptr<boost::asio::io_service::strand> _strand;
/** /**
* All I/O watchers that are active, indexed by their filedescriptor * All I/O watchers that are active, indexed by their filedescriptor
@ -420,7 +476,7 @@ private:
// construct a new pair (watcher/timer), and put it in the map // construct a new pair (watcher/timer), and put it in the map
const std::shared_ptr<Watcher> apWatcher = const std::shared_ptr<Watcher> apWatcher =
std::make_shared<Watcher>(_ioservice, fd); std::make_shared<Watcher>(_ioservice, _strand, fd);
_watchers[fd] = apWatcher; _watchers[fd] = apWatcher;
@ -473,7 +529,8 @@ public:
*/ */
explicit LibBoostAsioHandler(boost::asio::io_service &io_service) : explicit LibBoostAsioHandler(boost::asio::io_service &io_service) :
_ioservice(io_service), _ioservice(io_service),
_timer(std::make_shared<Timer>(_ioservice)) _strand(std::make_shared<boost::asio::io_service::strand>(_ioservice)),
_timer(std::make_shared<Timer>(_ioservice,_strand))
{ {
} }