diff --git a/include/libboostasio.h b/include/libboostasio.h index dd229f9..10bac80 100644 --- a/include/libboostasio.h +++ b/include/libboostasio.h @@ -26,37 +26,12 @@ #include #include - -template -void strand_sock_dispatch_func( - Func fn, - std::weak_ptr strand, - const boost::system::error_code &ec, - const std::size_t bytes_transferred -) { - const std::shared_ptr apStrand = strand.lock(); - if (!apStrand) - { - fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled), std::size_t{0}); - return; - } - apStrand->dispatch(boost::bind(fn, ec, bytes_transferred)); -} - -template -void strand_timer_dispatch_func( - Func fn, - std::weak_ptr strand, - const boost::system::error_code &ec -) { - const std::shared_ptr apStrand = strand.lock(); - if (!apStrand) - { - fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); - return; - } - apStrand->dispatch(boost::bind(fn,ec)); -} +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L +#define PTR_FROM_THIS weak_from_this +#else +#define PTR_FROM_THIS shared_from_this +#endif /** * Set up namespace @@ -122,6 +97,52 @@ private: */ bool _write_pending{false}; + using handler_cb = boost::function; + using io_handler = boost::function; + + /** + * Builds a io handler callback that executes the io callback in a strand. + * @param io_handler The handler callback to dispatch + * @return handler_cb A function wrapping the execution of the handler function in a io_service::strand. + */ + handler_cb get_dispatch_wrapper(io_handler fn) + { + return [fn, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred) + { + const std::shared_ptr apStrand = strand.lock(); + if (!apStrand) + { + fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled), std::size_t{0}); + return; + } + apStrand->dispatch(boost::bind(fn, ec, bytes_transferred)); + }; + } + + /** + * Binds and returns a read handler for the io operation. + * @param connection The connection being watched. + * @param fd The file descripter being watched. + * @return handler callback + */ + handler_cb get_read_handler(TcpConnection *const connection, const int fd) + { + auto fn = boost::bind(&Watcher::read_handler, this, _1, _2, PTR_FROM_THIS(), connection, fd); + return get_dispatch_wrapper(fn); + } + + /** + * Binds and returns a read handler for the io operation. + * @param connection The connection being watched. + * @param fd The file descripter being watched. + * @return handler callback + */ + handler_cb get_write_handler(TcpConnection *const connection, const int fd) + { + auto fn = boost::bind(&Watcher::write_handler, this, _1, _2, PTR_FROM_THIS(), connection, fd); + return get_dispatch_wrapper(fn); + } + /** * Handler method that is called by boost's io_service when the socket pumps a read event. * @param ec The status of the callback. @@ -149,25 +170,8 @@ private: connection->process(fd, AMQP::readable); _read_pending = true; - - auto func = boost::bind(&Watcher::read_handler, - this, - boost::arg<1>(), - boost::arg<2>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - fd); - _socket.async_read_some( - boost::asio::null_buffers(), - [f = func, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred) { - strand_sock_dispatch_func(f, strand, ec, bytes_transferred); - } - ); + + _socket.async_read_some(boost::asio::null_buffers(), get_read_handler(connection, fd)); } } @@ -199,24 +203,7 @@ private: _write_pending = true; - auto func = boost::bind(&Watcher::write_handler, - this, - boost::arg<1>(), - boost::arg<2>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - fd); - _socket.async_write_some( - boost::asio::null_buffers(), - [f = func, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred) { - strand_sock_dispatch_func(f, strand, ec, bytes_transferred); - } - ); + _socket.async_write_some(boost::asio::null_buffers(), get_write_handler(connection, fd)); } } @@ -272,24 +259,7 @@ private: { _read_pending = true; - auto func = boost::bind(&Watcher::read_handler, - this, - boost::arg<1>(), - boost::arg<2>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - fd); - _socket.async_read_some( - boost::asio::null_buffers(), - [f = func, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred) { - strand_sock_dispatch_func(f, strand, ec, bytes_transferred); - } - ); + _socket.async_read_some(boost::asio::null_buffers(), get_read_handler(connection, fd)); } // 2. Handle writes? @@ -300,24 +270,7 @@ private: { _write_pending = true; - auto func = boost::bind(&Watcher::write_handler, - this, - boost::arg<1>(), - boost::arg<2>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - fd); - _socket.async_write_some( - boost::asio::null_buffers(), - [f = func, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred) { - strand_sock_dispatch_func(f, strand, ec, bytes_transferred); - } - ); + _socket.async_write_some(boost::asio::null_buffers(), get_write_handler(connection, fd)); } } }; @@ -347,11 +300,39 @@ private: */ boost::asio::deadline_timer _timer; + using handler_fn = boost::function; + /** + * Binds and returns a lamba function handler for the io operation. + * @param connection The connection being watched. + * @param timeout The file descripter being watched. + * @return handler callback + */ + handler_fn get_handler(TcpConnection *const connection, const uint16_t timeout) + { + auto fn = boost::bind(&Timer::timeout, + this, + _1, + PTR_FROM_THIS(), + connection, + timeout); + return [fn, this](const boost::system::error_code &ec) + { + const std::shared_ptr apStrand = _strand.lock(); + if (!apStrand) + { + fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); + return; + } + apStrand->dispatch(boost::bind(fn, ec)); + }; + } + /** * Callback method that is called by libev when the timer expires + * @param ec error code returned from loop * @param loop The loop in which the event was triggered - * @param timer Internal timer object - * @param revents The events that triggered this call + * @param connection + * @param timeout */ void timeout(const boost::system::error_code &ec, std::weak_ptr awpThis, @@ -371,27 +352,11 @@ private: connection->heartbeat(); } - auto func = boost::bind(&Timer::timeout, - this, - boost::arg<1>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - timeout); - // Reschedule the timer for the future: _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); // Posts the timer event - _timer.async_wait( - [f = func, strand = _strand](const boost::system::error_code &ec) { - strand_timer_dispatch_func(f, strand, ec); - } - ); + _timer.async_wait(get_handler(connection, timeout)); } } @@ -446,23 +411,11 @@ private: // stop timer in case it was already set stop(); - auto func = boost::bind(&Timer::timeout, - this, - boost::arg<1>(), -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L - weak_from_this(), -#else - shared_from_this(), -#endif - connection, - timeout); + // Reschedule the timer for the future: _timer.expires_from_now(boost::posix_time::seconds(timeout)); - _timer.async_wait( - [f = func, strand = _strand](const boost::system::error_code &ec) { - strand_timer_dispatch_func(f, strand, ec); - } - ); + + // Posts the timer event + _timer.async_wait(get_handler(connection, timeout)); } };