add PTR_FROM_THIS define; remove io_handler code duplication via read/write/dispatch wrapper functions.
This commit is contained in:
parent
d71040b2e5
commit
8477fbb272
|
|
@ -26,37 +26,12 @@
|
||||||
#include <boost/asio/posix/stream_descriptor.hpp>
|
#include <boost/asio/posix/stream_descriptor.hpp>
|
||||||
#include <boost/bind.hpp>
|
#include <boost/bind.hpp>
|
||||||
|
|
||||||
|
// C++17 has 'weak_from_this()' support.
|
||||||
template<class Func>
|
#if __cplusplus >= 201701L
|
||||||
void strand_sock_dispatch_func(
|
#define PTR_FROM_THIS weak_from_this
|
||||||
Func fn,
|
#else
|
||||||
std::weak_ptr<boost::asio::io_service::strand> strand,
|
#define PTR_FROM_THIS shared_from_this
|
||||||
const boost::system::error_code &ec,
|
#endif
|
||||||
const std::size_t bytes_transferred
|
|
||||||
) {
|
|
||||||
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock();
|
|
||||||
if (!apStrand)
|
|
||||||
{
|
|
||||||
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled), std::size_t{0});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
apStrand->dispatch(boost::bind(fn, ec, bytes_transferred));
|
|
||||||
}
|
|
||||||
|
|
||||||
template<class Func>
|
|
||||||
void strand_timer_dispatch_func(
|
|
||||||
Func fn,
|
|
||||||
std::weak_ptr<boost::asio::io_service::strand> strand,
|
|
||||||
const boost::system::error_code &ec
|
|
||||||
) {
|
|
||||||
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock();
|
|
||||||
if (!apStrand)
|
|
||||||
{
|
|
||||||
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
apStrand->dispatch(boost::bind(fn,ec));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
|
|
@ -122,6 +97,52 @@ private:
|
||||||
*/
|
*/
|
||||||
bool _write_pending{false};
|
bool _write_pending{false};
|
||||||
|
|
||||||
|
using handler_cb = boost::function<void(boost::system::error_code,std::size_t)>;
|
||||||
|
using io_handler = boost::function<void(const boost::system::error_code&, const std::size_t)>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds a io handler callback that executes the io callback in a strand.
|
||||||
|
* @param io_handler The handler callback to dispatch
|
||||||
|
* @return handler_cb A function wrapping the execution of the handler function in a io_service::strand.
|
||||||
|
*/
|
||||||
|
handler_cb get_dispatch_wrapper(io_handler fn)
|
||||||
|
{
|
||||||
|
return [fn, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred)
|
||||||
|
{
|
||||||
|
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock();
|
||||||
|
if (!apStrand)
|
||||||
|
{
|
||||||
|
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled), std::size_t{0});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
apStrand->dispatch(boost::bind(fn, ec, bytes_transferred));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Binds and returns a read handler for the io operation.
|
||||||
|
* @param connection The connection being watched.
|
||||||
|
* @param fd The file descripter being watched.
|
||||||
|
* @return handler callback
|
||||||
|
*/
|
||||||
|
handler_cb get_read_handler(TcpConnection *const connection, const int fd)
|
||||||
|
{
|
||||||
|
auto fn = boost::bind(&Watcher::read_handler, this, _1, _2, PTR_FROM_THIS(), connection, fd);
|
||||||
|
return get_dispatch_wrapper(fn);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Binds and returns a read handler for the io operation.
|
||||||
|
* @param connection The connection being watched.
|
||||||
|
* @param fd The file descripter being watched.
|
||||||
|
* @return handler callback
|
||||||
|
*/
|
||||||
|
handler_cb get_write_handler(TcpConnection *const connection, const int fd)
|
||||||
|
{
|
||||||
|
auto fn = boost::bind(&Watcher::write_handler, this, _1, _2, PTR_FROM_THIS(), connection, fd);
|
||||||
|
return get_dispatch_wrapper(fn);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handler method that is called by boost's io_service when the socket pumps a read event.
|
* Handler method that is called by boost's io_service when the socket pumps a read event.
|
||||||
* @param ec The status of the callback.
|
* @param ec The status of the callback.
|
||||||
|
|
@ -149,25 +170,8 @@ private:
|
||||||
connection->process(fd, AMQP::readable);
|
connection->process(fd, AMQP::readable);
|
||||||
|
|
||||||
_read_pending = true;
|
_read_pending = true;
|
||||||
|
|
||||||
auto func = boost::bind(&Watcher::read_handler,
|
_socket.async_read_some(boost::asio::null_buffers(), get_read_handler(connection, fd));
|
||||||
this,
|
|
||||||
boost::arg<1>(),
|
|
||||||
boost::arg<2>(),
|
|
||||||
// C++17 has 'weak_from_this()' support.
|
|
||||||
#if __cplusplus >= 201701L
|
|
||||||
weak_from_this(),
|
|
||||||
#else
|
|
||||||
shared_from_this(),
|
|
||||||
#endif
|
|
||||||
connection,
|
|
||||||
fd);
|
|
||||||
_socket.async_read_some(
|
|
||||||
boost::asio::null_buffers(),
|
|
||||||
[f = func, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred) {
|
|
||||||
strand_sock_dispatch_func(f, strand, ec, bytes_transferred);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -199,24 +203,7 @@ private:
|
||||||
|
|
||||||
_write_pending = true;
|
_write_pending = true;
|
||||||
|
|
||||||
auto func = boost::bind(&Watcher::write_handler,
|
_socket.async_write_some(boost::asio::null_buffers(), get_write_handler(connection, fd));
|
||||||
this,
|
|
||||||
boost::arg<1>(),
|
|
||||||
boost::arg<2>(),
|
|
||||||
// C++17 has 'weak_from_this()' support.
|
|
||||||
#if __cplusplus >= 201701L
|
|
||||||
weak_from_this(),
|
|
||||||
#else
|
|
||||||
shared_from_this(),
|
|
||||||
#endif
|
|
||||||
connection,
|
|
||||||
fd);
|
|
||||||
_socket.async_write_some(
|
|
||||||
boost::asio::null_buffers(),
|
|
||||||
[f = func, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred) {
|
|
||||||
strand_sock_dispatch_func(f, strand, ec, bytes_transferred);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -272,24 +259,7 @@ private:
|
||||||
{
|
{
|
||||||
_read_pending = true;
|
_read_pending = true;
|
||||||
|
|
||||||
auto func = boost::bind(&Watcher::read_handler,
|
_socket.async_read_some(boost::asio::null_buffers(), get_read_handler(connection, fd));
|
||||||
this,
|
|
||||||
boost::arg<1>(),
|
|
||||||
boost::arg<2>(),
|
|
||||||
// C++17 has 'weak_from_this()' support.
|
|
||||||
#if __cplusplus >= 201701L
|
|
||||||
weak_from_this(),
|
|
||||||
#else
|
|
||||||
shared_from_this(),
|
|
||||||
#endif
|
|
||||||
connection,
|
|
||||||
fd);
|
|
||||||
_socket.async_read_some(
|
|
||||||
boost::asio::null_buffers(),
|
|
||||||
[f = func, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred) {
|
|
||||||
strand_sock_dispatch_func(f, strand, ec, bytes_transferred);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Handle writes?
|
// 2. Handle writes?
|
||||||
|
|
@ -300,24 +270,7 @@ private:
|
||||||
{
|
{
|
||||||
_write_pending = true;
|
_write_pending = true;
|
||||||
|
|
||||||
auto func = boost::bind(&Watcher::write_handler,
|
_socket.async_write_some(boost::asio::null_buffers(), get_write_handler(connection, fd));
|
||||||
this,
|
|
||||||
boost::arg<1>(),
|
|
||||||
boost::arg<2>(),
|
|
||||||
// C++17 has 'weak_from_this()' support.
|
|
||||||
#if __cplusplus >= 201701L
|
|
||||||
weak_from_this(),
|
|
||||||
#else
|
|
||||||
shared_from_this(),
|
|
||||||
#endif
|
|
||||||
connection,
|
|
||||||
fd);
|
|
||||||
_socket.async_write_some(
|
|
||||||
boost::asio::null_buffers(),
|
|
||||||
[f = func, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred) {
|
|
||||||
strand_sock_dispatch_func(f, strand, ec, bytes_transferred);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -347,11 +300,39 @@ private:
|
||||||
*/
|
*/
|
||||||
boost::asio::deadline_timer _timer;
|
boost::asio::deadline_timer _timer;
|
||||||
|
|
||||||
|
using handler_fn = boost::function<void(boost::system::error_code)>;
|
||||||
|
/**
|
||||||
|
* Binds and returns a lamba function handler for the io operation.
|
||||||
|
* @param connection The connection being watched.
|
||||||
|
* @param timeout The file descripter being watched.
|
||||||
|
* @return handler callback
|
||||||
|
*/
|
||||||
|
handler_fn get_handler(TcpConnection *const connection, const uint16_t timeout)
|
||||||
|
{
|
||||||
|
auto fn = boost::bind(&Timer::timeout,
|
||||||
|
this,
|
||||||
|
_1,
|
||||||
|
PTR_FROM_THIS(),
|
||||||
|
connection,
|
||||||
|
timeout);
|
||||||
|
return [fn, this](const boost::system::error_code &ec)
|
||||||
|
{
|
||||||
|
const std::shared_ptr<boost::asio::io_service::strand> apStrand = _strand.lock();
|
||||||
|
if (!apStrand)
|
||||||
|
{
|
||||||
|
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
apStrand->dispatch(boost::bind(fn, ec));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback method that is called by libev when the timer expires
|
* Callback method that is called by libev when the timer expires
|
||||||
|
* @param ec error code returned from loop
|
||||||
* @param loop The loop in which the event was triggered
|
* @param loop The loop in which the event was triggered
|
||||||
* @param timer Internal timer object
|
* @param connection
|
||||||
* @param revents The events that triggered this call
|
* @param timeout
|
||||||
*/
|
*/
|
||||||
void timeout(const boost::system::error_code &ec,
|
void timeout(const boost::system::error_code &ec,
|
||||||
std::weak_ptr<Timer> awpThis,
|
std::weak_ptr<Timer> awpThis,
|
||||||
|
|
@ -371,27 +352,11 @@ private:
|
||||||
connection->heartbeat();
|
connection->heartbeat();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto func = boost::bind(&Timer::timeout,
|
|
||||||
this,
|
|
||||||
boost::arg<1>(),
|
|
||||||
// C++17 has 'weak_from_this()' support.
|
|
||||||
#if __cplusplus >= 201701L
|
|
||||||
weak_from_this(),
|
|
||||||
#else
|
|
||||||
shared_from_this(),
|
|
||||||
#endif
|
|
||||||
connection,
|
|
||||||
timeout);
|
|
||||||
|
|
||||||
// Reschedule the timer for the future:
|
// Reschedule the timer for the future:
|
||||||
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
|
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
|
||||||
|
|
||||||
// Posts the timer event
|
// Posts the timer event
|
||||||
_timer.async_wait(
|
_timer.async_wait(get_handler(connection, timeout));
|
||||||
[f = func, strand = _strand](const boost::system::error_code &ec) {
|
|
||||||
strand_timer_dispatch_func(f, strand, ec);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -446,23 +411,11 @@ private:
|
||||||
// stop timer in case it was already set
|
// stop timer in case it was already set
|
||||||
stop();
|
stop();
|
||||||
|
|
||||||
auto func = boost::bind(&Timer::timeout,
|
// Reschedule the timer for the future:
|
||||||
this,
|
|
||||||
boost::arg<1>(),
|
|
||||||
// C++17 has 'weak_from_this()' support.
|
|
||||||
#if __cplusplus >= 201701L
|
|
||||||
weak_from_this(),
|
|
||||||
#else
|
|
||||||
shared_from_this(),
|
|
||||||
#endif
|
|
||||||
connection,
|
|
||||||
timeout);
|
|
||||||
_timer.expires_from_now(boost::posix_time::seconds(timeout));
|
_timer.expires_from_now(boost::posix_time::seconds(timeout));
|
||||||
_timer.async_wait(
|
|
||||||
[f = func, strand = _strand](const boost::system::error_code &ec) {
|
// Posts the timer event
|
||||||
strand_timer_dispatch_func(f, strand, ec);
|
_timer.async_wait(get_handler(connection, timeout));
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue