Cleaned up existing boost handler. There were a few bits that needed tidying up.
This commit is contained in:
parent
9559dfb880
commit
12ffe885c1
|
|
@ -19,8 +19,19 @@
|
||||||
#include <boost/asio/io_service.hpp>
|
#include <boost/asio/io_service.hpp>
|
||||||
#include <boost/asio/posix/stream_descriptor.hpp>
|
#include <boost/asio/posix/stream_descriptor.hpp>
|
||||||
#include <boost/bind.hpp>
|
#include <boost/bind.hpp>
|
||||||
|
#include <boost/function.hpp>
|
||||||
|
|
||||||
|
|
||||||
|
// boost::function< void(boost::system::error_code ec) > f =
|
||||||
|
#define STRAND_HANDLER(_fn) \
|
||||||
|
[this, fn = _fn, strand = _strand](const boost::system::error_code ec) \
|
||||||
|
{ \
|
||||||
|
const std::shared_ptr<boost::asio::io_service::strand> apStrand = strand.lock(); \
|
||||||
|
if (!apStrand) { return; } \
|
||||||
|
\
|
||||||
|
apStrand->dispatch(boost::bind(fn,ec)); \
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
*/
|
*/
|
||||||
|
|
@ -87,10 +98,10 @@ private:
|
||||||
* @param fd The file descriptor being watched.
|
* @param fd The file descriptor being watched.
|
||||||
* @note The handler will get called if a read is cancelled.
|
* @note The handler will get called if a read is cancelled.
|
||||||
*/
|
*/
|
||||||
void read_handler(boost::system::error_code ec,
|
void read_handler(const boost::system::error_code ec,
|
||||||
std::weak_ptr<Watcher> awpWatcher,
|
const std::weak_ptr<Watcher> awpWatcher,
|
||||||
TcpConnection *connection,
|
TcpConnection *const connection,
|
||||||
int fd)
|
const int fd)
|
||||||
{
|
{
|
||||||
// Resolve any potential problems with dangling pointers
|
// Resolve any potential problems with dangling pointers
|
||||||
// (remember we are using async).
|
// (remember we are using async).
|
||||||
|
|
@ -128,10 +139,10 @@ private:
|
||||||
* @param fd The file descriptor being watched.
|
* @param fd The file descriptor being watched.
|
||||||
* @note The handler will get called if a write is cancelled.
|
* @note The handler will get called if a write is cancelled.
|
||||||
*/
|
*/
|
||||||
void write_handler(boost::system::error_code ec,
|
void write_handler(const boost::system::error_code ec,
|
||||||
std::weak_ptr<Watcher> awpWatcher,
|
const std::weak_ptr<Watcher> awpWatcher,
|
||||||
TcpConnection *connection,
|
TcpConnection *const connection,
|
||||||
int fd)
|
const int fd)
|
||||||
{
|
{
|
||||||
// Resolve any potential problems with dangling pointers
|
// Resolve any potential problems with dangling pointers
|
||||||
// (remember we are using async).
|
// (remember we are using async).
|
||||||
|
|
@ -168,14 +179,13 @@ private:
|
||||||
* @param io_service The boost io_service
|
* @param io_service The boost io_service
|
||||||
* @param fd The filedescriptor being watched
|
* @param fd The filedescriptor being watched
|
||||||
*/
|
*/
|
||||||
Watcher(boost::asio::io_service &io_service, int fd) :
|
Watcher(boost::asio::io_service &io_service, const int fd) :
|
||||||
_ioservice(io_service),
|
_ioservice(io_service),
|
||||||
_socket(_ioservice)
|
_socket(_ioservice)
|
||||||
{
|
{
|
||||||
_socket.assign(fd);
|
_socket.assign(fd);
|
||||||
|
|
||||||
_socket.non_blocking(true);
|
_socket.non_blocking(true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -205,6 +215,7 @@ private:
|
||||||
// 1. Handle reads?
|
// 1. Handle reads?
|
||||||
_read = ((events & AMQP::readable) != 0);
|
_read = ((events & AMQP::readable) != 0);
|
||||||
|
|
||||||
|
// Read requsted but no read pending?
|
||||||
if (_read && !_read_pending)
|
if (_read && !_read_pending)
|
||||||
{
|
{
|
||||||
_read_pending = true;
|
_read_pending = true;
|
||||||
|
|
@ -226,6 +237,7 @@ private:
|
||||||
// 2. Handle writes?
|
// 2. Handle writes?
|
||||||
_write = ((events & AMQP::writable) != 0);
|
_write = ((events & AMQP::writable) != 0);
|
||||||
|
|
||||||
|
// Write requested but no write pending?
|
||||||
if (_write && !_write_pending)
|
if (_write && !_write_pending)
|
||||||
{
|
{
|
||||||
_write_pending = true;
|
_write_pending = true;
|
||||||
|
|
@ -247,8 +259,8 @@ private:
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Timer class to periodically fire a heartbeat
|
* Timer class to periodically fire a heartbeat
|
||||||
*/
|
*/
|
||||||
class Timer : public std::enable_shared_from_this<Timer>
|
class Timer : public std::enable_shared_from_this<Timer>
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
|
|
@ -258,7 +270,7 @@ private:
|
||||||
* @var class boost::asio::io_service&
|
* @var class boost::asio::io_service&
|
||||||
*/
|
*/
|
||||||
boost::asio::io_service & _ioservice;
|
boost::asio::io_service & _ioservice;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The boost asynchronous deadline timer.
|
* The boost asynchronous deadline timer.
|
||||||
* @var class boost::asio::deadline_timer
|
* @var class boost::asio::deadline_timer
|
||||||
|
|
@ -273,8 +285,8 @@ private:
|
||||||
*/
|
*/
|
||||||
void timeout(const boost::system::error_code &ec,
|
void timeout(const boost::system::error_code &ec,
|
||||||
std::weak_ptr<Timer> awpThis,
|
std::weak_ptr<Timer> awpThis,
|
||||||
TcpConnection *connection,
|
TcpConnection *const connection,
|
||||||
uint16_t timeout)
|
const uint16_t timeout)
|
||||||
{
|
{
|
||||||
// Resolve any potential problems with dangling pointers
|
// Resolve any potential problems with dangling pointers
|
||||||
// (remember we are using async).
|
// (remember we are using async).
|
||||||
|
|
@ -289,11 +301,21 @@ private:
|
||||||
connection->heartbeat();
|
connection->heartbeat();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reschedule the timer for 1 second in the future:
|
// Reschedule the timer for the future:
|
||||||
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
|
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
|
||||||
|
|
||||||
// Posts the timer event
|
// Posts the timer event
|
||||||
_timer.async_wait(boost::bind(&Timer::timeout,this,boost::placeholders::_1, shared_from_this(), connection, timeout));
|
_timer.async_wait(boost::bind(&Timer::timeout,
|
||||||
|
this,
|
||||||
|
boost::placeholders::_1,
|
||||||
|
// C++17 has 'weak_from_this()' support.
|
||||||
|
#if __cplusplus >= 201701L
|
||||||
|
weak_from_this()
|
||||||
|
#else
|
||||||
|
shared_from_this(),
|
||||||
|
#endif
|
||||||
|
connection,
|
||||||
|
timeout));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -345,9 +367,18 @@ private:
|
||||||
// stop timer in case it was already set
|
// stop timer in case it was already set
|
||||||
stop();
|
stop();
|
||||||
|
|
||||||
|
|
||||||
_timer.expires_from_now(boost::posix_time::seconds(timeout));
|
_timer.expires_from_now(boost::posix_time::seconds(timeout));
|
||||||
_timer.async_wait(boost::bind(&Timer::timeout,this,boost::placeholders::_1, shared_from_this(),connection, timeout));
|
_timer.async_wait(boost::bind(&Timer::timeout,
|
||||||
|
this,
|
||||||
|
boost::placeholders::_1,
|
||||||
|
// C++17 has 'weak_from_this()' support.
|
||||||
|
#if __cplusplus >= 201701L
|
||||||
|
weak_from_this()
|
||||||
|
#else
|
||||||
|
shared_from_this(),
|
||||||
|
#endif
|
||||||
|
connection,
|
||||||
|
timeout));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -374,7 +405,9 @@ private:
|
||||||
* @param fd The filedescriptor to be monitored
|
* @param fd The filedescriptor to be monitored
|
||||||
* @param flags Should the object be monitored for readability or writability?
|
* @param flags Should the object be monitored for readability or writability?
|
||||||
*/
|
*/
|
||||||
void monitor(TcpConnection *connection, int fd, int flags) override
|
void monitor(TcpConnection *const connection,
|
||||||
|
const int fd,
|
||||||
|
const int flags) override
|
||||||
{
|
{
|
||||||
// do we already have this filedescriptor
|
// do we already have this filedescriptor
|
||||||
auto iter = _watchers.find(fd);
|
auto iter = _watchers.find(fd);
|
||||||
|
|
@ -386,8 +419,13 @@ private:
|
||||||
if (flags == 0){ return; }
|
if (flags == 0){ return; }
|
||||||
|
|
||||||
// construct a new pair (watcher/timer), and put it in the map
|
// construct a new pair (watcher/timer), and put it in the map
|
||||||
_watchers[fd] = std::make_shared<Watcher>(_ioservice, fd);
|
const std::shared_ptr<Watcher> apWatcher =
|
||||||
_watchers[fd]->events(connection, fd, flags);
|
std::make_shared<Watcher>(_ioservice, fd);
|
||||||
|
|
||||||
|
_watchers[fd] = apWatcher;
|
||||||
|
|
||||||
|
// explicitly set the events to monitor
|
||||||
|
apWatcher->events(connection, fd, flags);
|
||||||
}
|
}
|
||||||
else if (flags == 0)
|
else if (flags == 0)
|
||||||
{
|
{
|
||||||
|
|
@ -431,7 +469,7 @@ public:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param loop The boost io_service to wrap
|
* @param io_service The boost io_service to wrap
|
||||||
*/
|
*/
|
||||||
explicit LibBoostAsioHandler(boost::asio::io_service &io_service) :
|
explicit LibBoostAsioHandler(boost::asio::io_service &io_service) :
|
||||||
_ioservice(io_service),
|
_ioservice(io_service),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue