parent
d5a7f3cdd1
commit
7aa321efb8
|
|
@ -83,6 +83,12 @@ protected:
|
|||
*/
|
||||
boost::asio::posix::stream_descriptor _socket;
|
||||
|
||||
/**
|
||||
* The boost asynchronous deadline timer.
|
||||
* @var class boost::asio::deadline_timer
|
||||
*/
|
||||
boost::asio::deadline_timer _timer;
|
||||
|
||||
/**
|
||||
* A boolean that indicates if the watcher is monitoring for read events.
|
||||
* @var _read True if reads are being monitored else false.
|
||||
|
|
@ -109,6 +115,7 @@ protected:
|
|||
|
||||
using handler_cb = boost::function<void(boost::system::error_code,std::size_t)>;
|
||||
using io_handler = boost::function<void(const boost::system::error_code&, const std::size_t)>;
|
||||
using timer_handler = boost::function<void(boost::system::error_code)>;
|
||||
|
||||
/**
|
||||
* Builds a io handler callback that executes the io callback in a strand.
|
||||
|
|
@ -167,6 +174,35 @@ protected:
|
|||
return get_dispatch_wrapper(fn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Binds and returns a lamba function handler for the io operation.
|
||||
* @param connection The connection being watched.
|
||||
* @param timeout The file descripter being watched.
|
||||
* @return handler callback
|
||||
*/
|
||||
timer_handler get_timer_handler(TcpConnection *const connection, const uint16_t timeout)
|
||||
{
|
||||
const auto fn = boost::bind(&Watcher::timeout_handler,
|
||||
this,
|
||||
boost::placeholders::_1,
|
||||
PTR_FROM_THIS(Watcher),
|
||||
connection,
|
||||
timeout);
|
||||
|
||||
const strand_weak_ptr wpstrand = _wpstrand;
|
||||
|
||||
return [fn, wpstrand](const boost::system::error_code &ec)
|
||||
{
|
||||
const strand_shared_ptr strand = wpstrand.lock();
|
||||
if (!strand)
|
||||
{
|
||||
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled));
|
||||
return;
|
||||
}
|
||||
boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec));
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler method that is called by boost's io_context when the socket pumps a read event.
|
||||
* @param ec The status of the callback.
|
||||
|
|
@ -235,6 +271,39 @@ protected:
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback method that is called by libev when the timer expires
|
||||
* @param ec error code returned from loop
|
||||
* @param loop The loop in which the event was triggered
|
||||
* @param connection
|
||||
* @param timeout
|
||||
*/
|
||||
void timeout_handler(const boost::system::error_code &ec,
|
||||
std::weak_ptr<Watcher> awpThis,
|
||||
TcpConnection *const connection,
|
||||
const uint16_t timeout)
|
||||
{
|
||||
// Resolve any potential problems with dangling pointers
|
||||
// (remember we are using async).
|
||||
const std::shared_ptr<Watcher> apTimer = awpThis.lock();
|
||||
if (!apTimer) { return; }
|
||||
|
||||
if (!ec)
|
||||
{
|
||||
if (connection)
|
||||
{
|
||||
// send the heartbeat
|
||||
connection->heartbeat();
|
||||
}
|
||||
|
||||
// Reschedule the timer for the future:
|
||||
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
|
||||
|
||||
// Posts the timer event
|
||||
_timer.async_wait(get_timer_handler(connection, timeout));
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor- initialises the watcher and assigns the filedescriptor to
|
||||
|
|
@ -248,7 +317,8 @@ protected:
|
|||
const int fd) :
|
||||
_iocontext(io_context),
|
||||
_wpstrand(wpstrand),
|
||||
_socket(io_context)
|
||||
_socket(io_context),
|
||||
_timer(io_context)
|
||||
{
|
||||
_socket.assign(fd);
|
||||
|
||||
|
|
@ -271,6 +341,7 @@ protected:
|
|||
_read = false;
|
||||
_write = false;
|
||||
_socket.release();
|
||||
stop_timer();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -305,155 +376,31 @@ protected:
|
|||
get_write_handler(connection, fd));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Timer class to periodically fire a heartbeat
|
||||
*/
|
||||
class Timer : public std::enable_shared_from_this<Timer>
|
||||
{
|
||||
private:
|
||||
|
||||
/**
|
||||
* The boost asio io_context which is responsible for detecting events.
|
||||
* @var class boost::asio::io_context&
|
||||
*/
|
||||
boost::asio::io_context & _iocontext;
|
||||
|
||||
using strand_weak_ptr = std::weak_ptr<boost::asio::io_context::strand>;
|
||||
|
||||
/**
|
||||
* The boost asio io_context::strand managed pointer.
|
||||
* @var class std::shared_ptr<boost::asio::io_context>
|
||||
*/
|
||||
strand_weak_ptr _wpstrand;
|
||||
|
||||
/**
|
||||
* The boost asynchronous deadline timer.
|
||||
* @var class boost::asio::deadline_timer
|
||||
*/
|
||||
boost::asio::deadline_timer _timer;
|
||||
|
||||
using handler_fn = boost::function<void(boost::system::error_code)>;
|
||||
|
||||
/**
|
||||
* Binds and returns a lamba function handler for the io operation.
|
||||
* @param connection The connection being watched.
|
||||
* @param timeout The file descripter being watched.
|
||||
* @return handler callback
|
||||
*/
|
||||
handler_fn get_handler(TcpConnection *const connection, const uint16_t timeout)
|
||||
{
|
||||
const auto fn = boost::bind(&Timer::timeout,
|
||||
this,
|
||||
boost::placeholders::_1,
|
||||
PTR_FROM_THIS(Timer),
|
||||
connection,
|
||||
timeout);
|
||||
|
||||
const strand_weak_ptr wpstrand = _wpstrand;
|
||||
|
||||
return [fn, wpstrand](const boost::system::error_code &ec)
|
||||
{
|
||||
const strand_shared_ptr strand = wpstrand.lock();
|
||||
if (!strand)
|
||||
{
|
||||
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled));
|
||||
return;
|
||||
}
|
||||
boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec));
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback method that is called by libev when the timer expires
|
||||
* @param ec error code returned from loop
|
||||
* @param loop The loop in which the event was triggered
|
||||
* @param connection
|
||||
* @param timeout
|
||||
*/
|
||||
void timeout(const boost::system::error_code &ec,
|
||||
std::weak_ptr<Timer> awpThis,
|
||||
TcpConnection *const connection,
|
||||
const uint16_t timeout)
|
||||
{
|
||||
// Resolve any potential problems with dangling pointers
|
||||
// (remember we are using async).
|
||||
const std::shared_ptr<Timer> apTimer = awpThis.lock();
|
||||
if (!apTimer) { return; }
|
||||
|
||||
if (!ec)
|
||||
{
|
||||
if (connection)
|
||||
{
|
||||
// send the heartbeat
|
||||
connection->heartbeat();
|
||||
}
|
||||
|
||||
// Reschedule the timer for the future:
|
||||
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
|
||||
|
||||
// Posts the timer event
|
||||
_timer.async_wait(get_handler(connection, timeout));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the timer
|
||||
*/
|
||||
void stop()
|
||||
{
|
||||
// do nothing if it was never set
|
||||
_timer.cancel();
|
||||
}
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
* @param io_context The boost asio io_context.
|
||||
* @param wpstrand A weak pointer to a io_context::strand instance.
|
||||
*/
|
||||
Timer(boost::asio::io_context &io_context,
|
||||
const strand_weak_ptr wpstrand) :
|
||||
_iocontext(io_context),
|
||||
_wpstrand(wpstrand),
|
||||
_timer(io_context)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Timers cannot be copied or moved
|
||||
*
|
||||
* @param that The object to not move or copy
|
||||
*/
|
||||
Timer(Timer &&that) = delete;
|
||||
Timer(const Timer &that) = delete;
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
~Timer()
|
||||
{
|
||||
// stop the timer
|
||||
stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Change the expire time
|
||||
* @param connection
|
||||
* @param timeout
|
||||
*/
|
||||
void set(TcpConnection *connection, uint16_t timeout)
|
||||
void set_timer(TcpConnection *connection, uint16_t timeout)
|
||||
{
|
||||
// stop timer in case it was already set
|
||||
stop();
|
||||
stop_timer();
|
||||
|
||||
// Reschedule the timer for the future:
|
||||
_timer.expires_from_now(boost::posix_time::seconds(timeout));
|
||||
|
||||
// Posts the timer event
|
||||
_timer.async_wait(get_handler(connection, timeout));
|
||||
_timer.async_wait(get_timer_handler(connection, timeout));
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the timer
|
||||
*/
|
||||
void stop_timer()
|
||||
{
|
||||
// do nothing if it was never set
|
||||
_timer.cancel();
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -477,13 +424,6 @@ protected:
|
|||
*/
|
||||
std::map<int, std::shared_ptr<Watcher> > _watchers;
|
||||
|
||||
/**
|
||||
* The boost asio io_context::deadline_timer managed pointer.
|
||||
* THIS IS DISABLED FOR NOW BECAUSE THIS BREAKS IF THERE IS MORE THAN ONE CONNECTION
|
||||
* @var class std::shared_ptr<Timer>
|
||||
*/
|
||||
//std::shared_ptr<Timer> _timer;
|
||||
|
||||
/**
|
||||
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
|
||||
* @param connection The TCP connection object that is reporting
|
||||
|
|
@ -531,20 +471,22 @@ protected:
|
|||
* @param interval The suggested interval from the server
|
||||
* @return uint16_t The interval to use
|
||||
*/
|
||||
/* THIS IS DISABLED FOR NOW BECAUSE THIS BREAKS IF THERE IS MORE THAN ONE CONNECTION */
|
||||
/*
|
||||
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override
|
||||
{
|
||||
// skip if no heartbeats are needed
|
||||
if (interval == 0) return 0;
|
||||
|
||||
const auto fd = connection->fileno();
|
||||
|
||||
auto iter = _watchers.find(fd);
|
||||
if (iter == _watchers.end()) return 0;
|
||||
|
||||
// set the timer
|
||||
_timer->set(connection, interval);
|
||||
iter->second->set_timer(connection, interval);
|
||||
|
||||
// we agree with the interval
|
||||
return interval;
|
||||
}
|
||||
*/
|
||||
|
||||
public:
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue