Added timer boost asio event handler to perform health checks.

This commit is contained in:
zerodefect 2017-10-13 09:26:32 +01:00
parent d1b2139af0
commit e3e0c6da19
1 changed files with 134 additions and 3 deletions

View File

@ -92,7 +92,7 @@ private:
TcpConnection *connection,
int fd)
{
// Resolve any potential problems with dangling pointers
// Resolve any potential problems with dangling pointers
// (remember we are using async).
const std::shared_ptr<Watcher> apWatcher = awpWatcher.lock();
if (!apWatcher) { return; }
@ -133,7 +133,7 @@ private:
TcpConnection *connection,
int fd)
{
// Resolve any potential problems with dangling pointers
// Resolve any potential problems with dangling pointers
// (remember we are using async).
const std::shared_ptr<Watcher> apWatcher = awpWatcher.lock();
if (!apWatcher) { return; }
@ -246,6 +246,111 @@ private:
}
};
/**
* Timer class to periodically fire a heartbeat
*/
class Timer : public std::enable_shared_from_this<Timer>
{
private:
/**
* The boost asio io_service which is responsible for detecting events.
* @var class boost::asio::io_service&
*/
boost::asio::io_service & _ioservice;
/**
* The boost asynchronous deadline timer.
* @var class boost::asio::deadline_timer
*/
boost::asio::deadline_timer _timer;
/**
* Callback method that is called by libev when the timer expires
* @param loop The loop in which the event was triggered
* @param timer Internal timer object
* @param revents The events that triggered this call
*/
void timeout(const boost::system::error_code &ec,
std::weak_ptr<Timer> awpThis,
TcpConnection *connection,
uint16_t timeout)
{
// Resolve any potential problems with dangling pointers
// (remember we are using async).
const std::shared_ptr<Timer> apTimer = awpThis.lock();
if (!apTimer) { return; }
if (!ec)
{
if (connection)
{
// send the heartbeat
connection->heartbeat();
}
// Reschedule the timer for 1 second in the future:
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
// Posts the timer event
_timer.async_wait(boost::bind(&Timer::timeout,this,boost::placeholders::_1, shared_from_this(), connection, timeout));
}
}
/**
* Stop the timer
*/
void stop()
{
// do nothing if it was never set
_timer.cancel();
}
public:
/**
* Constructor
* @param loop The current event loop
*/
Timer(boost::asio::io_service &io_service) :
_ioservice(io_service),
_timer(_ioservice)
{
}
/**
* Timers cannot be copied or moved
*
* @param that The object to not move or copy
*/
Timer(Timer &&that) = delete;
Timer(const Timer &that) = delete;
/**
* Destructor
*/
~Timer()
{
// stop the timer
stop();
}
/**
* Change the expire time
* @param connection
* @param timeout
*/
void set(TcpConnection *connection, uint16_t timeout)
{
// stop timer in case it was already set
stop();
_timer.expires_from_now(boost::posix_time::seconds(timeout));
_timer.async_wait(boost::bind(&Timer::timeout,this,boost::placeholders::_1, shared_from_this(),connection, timeout));
}
};
/**
* The boost asio io_service.
* @var class boost::asio::io_service&
@ -260,6 +365,9 @@ private:
std::map<int, std::shared_ptr<Watcher> > _watchers;
std::shared_ptr<Timer> _timer;
/**
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
* @param connection The TCP connection object that is reporting
@ -293,6 +401,24 @@ private:
}
}
protected:
/**
* Method that is called when the heartbeat frequency is negotiated between the server and the client.
* @param connection The connection that suggested a heartbeat interval
* @param interval The suggested interval from the server
* @return uint16_t The interval to use
*/
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override
{
// skip if no heartbeats are needed
if (interval == 0) return 0;
// set the timer
_timer->set(connection, interval);
// we agree with the interval
return interval;
}
public:
@ -307,7 +433,12 @@ public:
* Constructor
* @param loop The boost io_service to wrap
*/
explicit LibBoostAsioHandler(boost::asio::io_service &io_service) : _ioservice(io_service) {}
explicit LibBoostAsioHandler(boost::asio::io_service &io_service) :
_ioservice(io_service),
_timer(std::make_shared<Timer>(_ioservice))
{
}
/**
* Handler cannot be copied or moved