diff --git a/include/libboostasio.h b/include/libboostasio.h index 897beb3..b8e0758 100644 --- a/include/libboostasio.h +++ b/include/libboostasio.h @@ -92,7 +92,7 @@ private: TcpConnection *connection, int fd) { - // Resolve any potential problems with dangling pointers + // Resolve any potential problems with dangling pointers // (remember we are using async). const std::shared_ptr apWatcher = awpWatcher.lock(); if (!apWatcher) { return; } @@ -133,7 +133,7 @@ private: TcpConnection *connection, int fd) { - // Resolve any potential problems with dangling pointers + // Resolve any potential problems with dangling pointers // (remember we are using async). const std::shared_ptr apWatcher = awpWatcher.lock(); if (!apWatcher) { return; } @@ -246,6 +246,111 @@ private: } }; + /** + * Timer class to periodically fire a heartbeat + */ + class Timer : public std::enable_shared_from_this + { + private: + + /** + * The boost asio io_service which is responsible for detecting events. + * @var class boost::asio::io_service& + */ + boost::asio::io_service & _ioservice; + + /** + * The boost asynchronous deadline timer. + * @var class boost::asio::deadline_timer + */ + boost::asio::deadline_timer _timer; + + /** + * Callback method that is called by libev when the timer expires + * @param loop The loop in which the event was triggered + * @param timer Internal timer object + * @param revents The events that triggered this call + */ + void timeout(const boost::system::error_code &ec, + std::weak_ptr awpThis, + TcpConnection *connection, + uint16_t timeout) + { + // Resolve any potential problems with dangling pointers + // (remember we are using async). + const std::shared_ptr apTimer = awpThis.lock(); + if (!apTimer) { return; } + + if (!ec) + { + if (connection) + { + // send the heartbeat + connection->heartbeat(); + } + + // Reschedule the timer for 1 second in the future: + _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); + + // Posts the timer event + _timer.async_wait(boost::bind(&Timer::timeout,this,boost::placeholders::_1, shared_from_this(), connection, timeout)); + } + } + + /** + * Stop the timer + */ + void stop() + { + // do nothing if it was never set + _timer.cancel(); + } + + public: + /** + * Constructor + * @param loop The current event loop + */ + Timer(boost::asio::io_service &io_service) : + _ioservice(io_service), + _timer(_ioservice) + { + + } + + /** + * Timers cannot be copied or moved + * + * @param that The object to not move or copy + */ + Timer(Timer &&that) = delete; + Timer(const Timer &that) = delete; + + /** + * Destructor + */ + ~Timer() + { + // stop the timer + stop(); + } + + /** + * Change the expire time + * @param connection + * @param timeout + */ + void set(TcpConnection *connection, uint16_t timeout) + { + // stop timer in case it was already set + stop(); + + + _timer.expires_from_now(boost::posix_time::seconds(timeout)); + _timer.async_wait(boost::bind(&Timer::timeout,this,boost::placeholders::_1, shared_from_this(),connection, timeout)); + } + }; + /** * The boost asio io_service. * @var class boost::asio::io_service& @@ -260,6 +365,9 @@ private: std::map > _watchers; + std::shared_ptr _timer; + + /** * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability * @param connection The TCP connection object that is reporting @@ -293,6 +401,24 @@ private: } } +protected: + /** + * Method that is called when the heartbeat frequency is negotiated between the server and the client. + * @param connection The connection that suggested a heartbeat interval + * @param interval The suggested interval from the server + * @return uint16_t The interval to use + */ + virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override + { + // skip if no heartbeats are needed + if (interval == 0) return 0; + + // set the timer + _timer->set(connection, interval); + + // we agree with the interval + return interval; + } public: @@ -307,7 +433,12 @@ public: * Constructor * @param loop The boost io_service to wrap */ - explicit LibBoostAsioHandler(boost::asio::io_service &io_service) : _ioservice(io_service) {} + explicit LibBoostAsioHandler(boost::asio::io_service &io_service) : + _ioservice(io_service), + _timer(std::make_shared(_ioservice)) + { + + } /** * Handler cannot be copied or moved