diff --git a/include/amqpcpp/libev.h b/include/amqpcpp/libev.h index 1fc1f32..facad3a 100644 --- a/include/amqpcpp/libev.h +++ b/include/amqpcpp/libev.h @@ -20,7 +20,6 @@ * Dependencies */ #include - #include "amqpcpp/linux_tcp.h" /** @@ -154,36 +153,30 @@ private: connection->heartbeat(); } - /** - * Stop the timer - */ - void stop() - { - // do nothing if it was never set - if (_timer.data == nullptr) return; - - // restore loop refcount - ev_ref(_loop); - - // stop the timer - ev_timer_stop(_loop, &_timer); - - // restore data nullptr to indicate that timer is not set - _timer.data = nullptr; - } public: /** * Constructor * @param loop The current event loop + * @param connection The TCP connection + * @param interval Timer interval */ - Timer(struct ev_loop *loop) : _loop(loop) + Timer(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout) : _loop(loop) { - // there is no data yet - _timer.data = nullptr; + // store the connection in the data "void*" + _timer.data = connection; // initialize the libev structure ev_timer_init(&_timer, callback, 60.0, 60.0); + + // set the timer + ev_timer_set(&_timer, timeout, timeout); + + // and start it + ev_timer_start(_loop, &_timer); + + // the timer should not keep the event loop active + ev_unref(_loop); } /** @@ -199,31 +192,21 @@ private: */ virtual ~Timer() { + // restore loop refcount + ev_ref(_loop); + // stop the timer - stop(); + ev_timer_stop(_loop, &_timer); } - + /** - * Change the expire time + * Check if the timer is associated with a certain connection * @param connection - * @param timeout + * @return bool */ - void set(TcpConnection *connection, uint16_t timeout) + bool contains(const AMQP::TcpConnection *connection) const { - // stop timer in case it was already set - stop(); - - // store the connection in the data "void*" - _timer.data = connection; - - // set the timer - ev_timer_set(&_timer, timeout, timeout); - - // and start it - ev_timer_start(_loop, &_timer); - - // the timer should not keep the event loop active - ev_unref(_loop); + return _timer.data == connection; } }; @@ -240,12 +223,29 @@ private: std::map> _watchers; /** - * A timer to periodically send out heartbeats + * A timer to periodically send out heartbeats, per connection * @var Timer */ - Timer _timer; + std::vector> _timers; + /** + * Remove all settings that is associated with a certain connection + * @param connection + */ + void remove(const AMQP::TcpConnection *connection) + { + // remove from the timers (std::remove moves the matching elements to the end) + auto iter = std::remove_if(_timers.begin(), _timers.end(), [connection](std::unique_ptr &timer) -> bool { + + // connection should match + return timer->contains(connection); + }); + + // no remove the timers for real + _timers.erase(iter, _timers.end()); + } + /** * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability * @param connection The TCP connection object that is reporting @@ -291,18 +291,39 @@ protected: if (interval == 0) return 0; // set the timer - _timer.set(connection, interval); + _timers.emplace_back(new Timer(_loop, connection, interval)); // we agree with the interval return interval; } + /** + * Method that is called when the TCP connection ends up in an error state + * @param connection The TCP connection + * @param message Error message + */ + virtual void onError(TcpConnection *connection, const char *message) override + { + // forget about the timer for this connection + remove(connection); + } + + /** + * Method that is called when the TCP connection is closed + * @param connection The TCP connection + */ + virtual void onClosed(TcpConnection *connection) override + { + // forget about the timer for this connection + remove(connection); + } + public: /** * Constructor * @param loop The event loop to wrap */ - LibEvHandler(struct ev_loop *loop) : _loop(loop), _timer(loop) {} + LibEvHandler(struct ev_loop *loop) : _loop(loop) {} /** * Destructor