Merge pull request #165 from lion10243/libboostasio-fix

Libboostasio fix
This commit is contained in:
Emiel Bruijntjes 2018-01-25 09:22:42 +01:00 committed by GitHub
commit 0f76aac40f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 112 additions and 112 deletions

View File

@ -26,35 +26,12 @@
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/bind.hpp>
///////////////////////////////////////////////////////////////////
#define STRAND_SOCKET_HANDLER(_fn) \
[fn = _fn, strand = _strand](const boost::system::error_code &ec, \
const std::size_t bytes_transferred) \
{ \
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock(); \
if (!apStrand) \
{ \
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled),std::size_t{0}); \
return; \
} \
\
apStrand->dispatch(boost::bind(fn,ec,bytes_transferred)); \
}
///////////////////////////////////////////////////////////////////
#define STRAND_TIMER_HANDLER(_fn) \
[fn = _fn, strand = _strand](const boost::system::error_code &ec) \
{ \
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock(); \
if (!apStrand) \
{ \
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); \
return; \
} \
\
apStrand->dispatch(boost::bind(fn,ec)); \
}
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
#define PTR_FROM_THIS weak_from_this
#else
#define PTR_FROM_THIS shared_from_this
#endif
/**
* Set up namespace
@ -120,6 +97,64 @@ private:
*/
bool _write_pending{false};
using handler_cb = boost::function<void(boost::system::error_code,std::size_t)>;
using io_handler = boost::function<void(const boost::system::error_code&, const std::size_t)>;
/**
* Builds a io handler callback that executes the io callback in a strand.
* @param io_handler The handler callback to dispatch
* @return handler_cb A function wrapping the execution of the handler function in a io_service::strand.
*/
handler_cb get_dispatch_wrapper(io_handler fn)
{
return [fn, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred)
{
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock();
if (!apStrand)
{
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled), std::size_t{0});
return;
}
apStrand->dispatch(boost::bind(fn, ec, bytes_transferred));
};
}
/**
* Binds and returns a read handler for the io operation.
* @param connection The connection being watched.
* @param fd The file descripter being watched.
* @return handler callback
*/
handler_cb get_read_handler(TcpConnection *const connection, const int fd)
{
auto fn = boost::bind(&Watcher::read_handler,
this,
_1,
_2,
PTR_FROM_THIS(),
connection,
fd);
return get_dispatch_wrapper(fn);
}
/**
* Binds and returns a read handler for the io operation.
* @param connection The connection being watched.
* @param fd The file descripter being watched.
* @return handler callback
*/
handler_cb get_write_handler(TcpConnection *const connection, const int fd)
{
auto fn = boost::bind(&Watcher::write_handler,
this,
_1,
_2,
PTR_FROM_THIS(),
connection,
fd);
return get_dispatch_wrapper(fn);
}
/**
* Handler method that is called by boost's io_service when the socket pumps a read event.
* @param ec The status of the callback.
@ -147,21 +182,10 @@ private:
connection->process(fd, AMQP::readable);
_read_pending = true;
_socket.async_read_some(boost::asio::null_buffers(),
STRAND_SOCKET_HANDLER(
boost::bind(&Watcher::read_handler,
this,
boost::arg<1>(),
boost::arg<2>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
fd)));
_socket.async_read_some(
boost::asio::null_buffers(),
get_read_handler(connection, fd));
}
}
@ -193,20 +217,9 @@ private:
_write_pending = true;
_socket.async_write_some(boost::asio::null_buffers(),
STRAND_SOCKET_HANDLER(
boost::bind(&Watcher::write_handler,
this,
boost::arg<1>(),
boost::arg<2>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
fd)));
_socket.async_write_some(
boost::asio::null_buffers(),
get_write_handler(connection, fd));
}
}
@ -262,20 +275,9 @@ private:
{
_read_pending = true;
_socket.async_read_some(boost::asio::null_buffers(),
STRAND_SOCKET_HANDLER(
boost::bind(&Watcher::read_handler,
this,
boost::arg<1>(),
boost::arg<2>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
fd)));
_socket.async_read_some(
boost::asio::null_buffers(),
get_read_handler(connection, fd));
}
// 2. Handle writes?
@ -286,20 +288,9 @@ private:
{
_write_pending = true;
_socket.async_write_some(boost::asio::null_buffers(),
STRAND_SOCKET_HANDLER(
boost::bind(&Watcher::write_handler,
this,
boost::arg<1>(),
boost::arg<2>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
fd)));
_socket.async_write_some(
boost::asio::null_buffers(),
get_write_handler(connection, fd));
}
}
};
@ -329,11 +320,39 @@ private:
*/
boost::asio::deadline_timer _timer;
using handler_fn = boost::function<void(boost::system::error_code)>;
/**
* Binds and returns a lamba function handler for the io operation.
* @param connection The connection being watched.
* @param timeout The file descripter being watched.
* @return handler callback
*/
handler_fn get_handler(TcpConnection *const connection, const uint16_t timeout)
{
auto fn = boost::bind(&Timer::timeout,
this,
_1,
PTR_FROM_THIS(),
connection,
timeout);
return [fn, strand = _strand](const boost::system::error_code &ec)
{
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock();
if (!apStrand)
{
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled));
return;
}
apStrand->dispatch(boost::bind(fn, ec));
};
}
/**
* Callback method that is called by libev when the timer expires
* @param ec error code returned from loop
* @param loop The loop in which the event was triggered
* @param timer Internal timer object
* @param revents The events that triggered this call
* @param connection
* @param timeout
*/
void timeout(const boost::system::error_code &ec,
std::weak_ptr<Timer> awpThis,
@ -357,18 +376,7 @@ private:
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
// Posts the timer event
_timer.async_wait(STRAND_TIMER_HANDLER(
boost::bind(&Timer::timeout,
this,
boost::arg<1>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
timeout)));
_timer.async_wait(get_handler(connection, timeout));
}
}
@ -423,19 +431,11 @@ private:
// stop timer in case it was already set
stop();
// Reschedule the timer for the future:
_timer.expires_from_now(boost::posix_time::seconds(timeout));
_timer.async_wait(STRAND_TIMER_HANDLER(
boost::bind(&Timer::timeout,
this,
boost::arg<1>(),
// C++17 has 'weak_from_this()' support.
#if __cplusplus >= 201701L
weak_from_this(),
#else
shared_from_this(),
#endif
connection,
timeout)));
// Posts the timer event
_timer.async_wait(get_handler(connection, timeout));
}
};