From d83e88b95d9a2b6a1a7a1c42dd80d0d3cc21a478 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Sun, 11 Nov 2018 22:54:42 +0100 Subject: [PATCH] fixed the libev handler to be compatible with amqp-cpp 4.0, and we now also close the connection if the server stops sending heartbeats --- include/amqpcpp/libev.h | 299 ++++++++++++++++++++++++++++------------ 1 file changed, 209 insertions(+), 90 deletions(-) diff --git a/include/amqpcpp/libev.h b/include/amqpcpp/libev.h index facad3a..bb53378 100644 --- a/include/amqpcpp/libev.h +++ b/include/amqpcpp/libev.h @@ -33,6 +33,25 @@ namespace AMQP { class LibEvHandler : public TcpHandler { private: + /** + * Internal interface for the object that is being watched + */ + class Watchable + { + public: + /** + * The method that is called when a filedescriptor becomes active + * @param fd + * @param events + */ + virtual void onActive(int fd, int events) = 0; + + /** + * Method that is called when the timer expires + */ + virtual void onExpired() = 0; + }; + /** * Helper class that wraps a libev I/O watcher */ @@ -59,28 +78,28 @@ private: */ static void callback(struct ev_loop *loop, struct ev_io *watcher, int revents) { - // retrieve the connection - TcpConnection *connection = static_cast(watcher->data); + // retrieve the watched object + Watchable *object = static_cast(watcher->data); - // tell the connection that its filedescriptor is active - connection->process(watcher->fd, revents); + // tell the object that its filedescriptor is active + object->onActive(watcher->fd, revents); } public: /** * Constructor * @param loop The current event loop - * @param connection The connection being watched + * @param object The object being watched * @param fd The filedescriptor being watched * @param events The events that should be monitored */ - Watcher(struct ev_loop *loop, TcpConnection *connection, int fd, int events) : _loop(loop) + Watcher(struct ev_loop *loop, Watchable *object, int fd, int events) : _loop(loop) { // initialize the libev structure ev_io_init(&_io, callback, fd, events); - // store the connection in the data "void*" - _io.data = connection; + // store the object in the data "void*" + _io.data = object; // start the watcher ev_io_start(_loop, &_io); @@ -102,6 +121,13 @@ private: // stop the watcher ev_io_stop(_loop, &_io); } + + /** + * Check if a filedescriptor is covered by the watcher + * @param fd + * @return bool + */ + bool contains(int fd) const { return _io.fd == fd; } /** * Change the events for which the filedescriptor is monitored @@ -121,11 +147,18 @@ private: }; /** - * Timer class to periodically fire a heartbeat + * Wrapper around a connection, this will monitor the filedescriptors + * and run a timer to send out heartbeats */ - class Timer + class Wrapper : private Watchable { private: + /** + * The connection that is monitored + * @var TcpConnection + */ + TcpConnection *_connection; + /** * The event loop to which it is attached * @var struct ev_loop @@ -133,10 +166,35 @@ private: struct ev_loop *_loop; /** - * The actual watcher structure + * The watcher for the timer * @var struct ev_io */ struct ev_timer _timer; + + /** + * IO-watchers to monitor filedescriptors + * @var std::vector + */ + std::vector> _watchers; + + /** + * When should we send the next heartbeat? + * @var ev_tstamp + */ + ev_tstamp _next; + + /** + * When does the connection expire / was the server for a too longer period of time idle? + * @var ev_tstamp + */ + ev_tstamp _expire; + + /** + * Interval between heartbeats + * @var uint16_t + */ + uint16_t _interval; + /** * Callback method that is called by libev when the timer expires @@ -146,11 +204,60 @@ private: */ static void callback(struct ev_loop *loop, struct ev_timer *timer, int revents) { - // retrieve the connection - TcpConnection *connection = static_cast(timer->data); + // retrieve the object + Watchable *object = static_cast(timer->data); - // send the heartbeat - connection->heartbeat(); + // tell the object that it expired + object->onExpired(); + } + + /** + * Method that is called when the timer expired + */ + virtual void onExpired() override + { + // get the current time + ev_tstamp now = ev_now(_loop); + + // should we send out a new heartbeat? + if (now >= _next) + { + // send a heartbeat frame + _connection->heartbeat(); + + // remember when we should send out the next one + _next += _interval; + } + + // was the server idle for a too longer period of time? + if (now >= _expire) + { + // close the connection with immediate effect (this will destruct the connection) + // @todo do we want to report an error too? + _connection->close(true); + } + else + { + // find the earliest thing that expires + _timer.repeat = std::min(_next, _expire) - now; + + // restart the timer + ev_timer_again(_loop, &_timer); + } + } + + /** + * Method that is called when a filedescriptor becomes active + * @param fd the filedescriptor that is active + * @param events the events that are active (readable/writable) + */ + virtual void onActive(int fd, int events) override + { + // if the server is readable, we have some extra time before it expires + if (events & EV_READ) _expire = ev_now(_loop) + _interval * 2; + + // pass on to the connection + _connection->process(fd, events); } @@ -161,16 +268,18 @@ private: * @param connection The TCP connection * @param interval Timer interval */ - Timer(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout) : _loop(loop) + Wrapper(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout) : + _connection(connection), + _loop(loop), + _next(ev_now(loop) + timeout), + _expire(ev_now(loop) + timeout * 2), + _interval(timeout) { - // store the connection in the data "void*" - _timer.data = connection; + // store the object in the data "void*" + _timer.data = this; // initialize the libev structure - ev_timer_init(&_timer, callback, 60.0, 60.0); - - // set the timer - ev_timer_set(&_timer, timeout, timeout); + ev_timer_init(&_timer, callback, timeout, timeout); // and start it ev_timer_start(_loop, &_timer); @@ -184,13 +293,13 @@ private: * * @param that The object to not move or copy */ - Timer(Watcher &&that) = delete; - Timer(const Watcher &that) = delete; + Wrapper(Wrapper &&that) = delete; + Wrapper(const Wrapper &that) = delete; /** * Destructor */ - virtual ~Timer() + virtual ~Wrapper() { // restore loop refcount ev_ref(_loop); @@ -199,6 +308,15 @@ private: ev_timer_stop(_loop, &_timer); } + /** + * Expose the selected heartbeat interval + * @return uint16_t + */ + uint16_t interval() const + { + return _interval; + } + /** * Check if the timer is associated with a certain connection * @param connection @@ -206,7 +324,42 @@ private: */ bool contains(const AMQP::TcpConnection *connection) const { - return _timer.data == connection; + // compare the connections + return _connection == connection; + } + + /** + * Monitor a filedescriptor + * @param fd + * @param events + */ + void monitor(int fd, int events) + { + // should we remove? + if (events == 0) + { + // remove the io-watcher + _watchers.erase(std::remove_if(_watchers.begin(), _watchers.end(), [fd](const std::unique_ptr &watcher) -> bool { + + // compare filedescriptors + return watcher->contains(fd); + }), _watchers.end()); + } + else + { + // look in the array for this filedescriptor + for (auto &watcher : _watchers) + { + // do we have a match? + if (watcher->fd() != fd) continue; + + // change the events (and leap out) + return watcher->events(events); + } + + // we should monitor a new filedescriptor + _watchers.emplace_back(new Watcher(_loop, this, fd, events)); + } } }; @@ -217,33 +370,34 @@ private: struct ev_loop *_loop; /** - * All I/O watchers that are active, indexed by their filedescriptor - * @var std::map + * Each connection is wrapped + * @var std::vector */ - std::map> _watchers; - - /** - * A timer to periodically send out heartbeats, per connection - * @var Timer - */ - std::vector> _timers; + std::vector> _wrappers; /** - * Remove all settings that is associated with a certain connection + * Lookup a connection-wrapper * @param connection + * @return Wrapper */ - void remove(const AMQP::TcpConnection *connection) + Wrapper *lookup(TcpConnection *connection) { - // remove from the timers (std::remove moves the matching elements to the end) - auto iter = std::remove_if(_timers.begin(), _timers.end(), [connection](std::unique_ptr &timer) -> bool { - - // connection should match - return timer->contains(connection); - }); + // look for the appropriate connection + for (auto &wrapper : _wrappers) + { + // do we have a match? + if (wrapper->contains(connection)) return wrapper.get(); + } - // no remove the timers for real - _timers.erase(iter, _timers.end()); + // we need a new wrapper + auto *wrapper = new Wrapper(_loop, connection, 60); + + // add to the wrappers + _wrappers.emplace_back(wrapper); + + // done + return wrapper; } /** @@ -254,28 +408,8 @@ private: */ virtual void monitor(TcpConnection *connection, int fd, int flags) override { - // do we already have this filedescriptor - auto iter = _watchers.find(fd); - - // was it found? - if (iter == _watchers.end()) - { - // we did not yet have this watcher - but that is ok if no filedescriptor was registered - if (flags == 0) return; - - // construct a new watcher, and put it in the map - _watchers[fd] = std::unique_ptr(new Watcher(_loop, connection, fd, flags)); - } - else if (flags == 0) - { - // the watcher does already exist, but we no longer have to watch this watcher - _watchers.erase(iter); - } - else - { - // change the events - iter->second->events(flags); - } + // lookup the appropriate wrapper and start monitoring + lookup(connection)->monitor(fd, flags); } protected: @@ -287,35 +421,20 @@ protected: */ virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override { - // skip if no heartbeats are needed - if (interval == 0) return 0; - - // set the timer - _timers.emplace_back(new Timer(_loop, connection, interval)); - - // we agree with the interval - return interval; + // lookup the wrapper + return lookup(connection)->interval(); } /** - * Method that is called when the TCP connection ends up in an error state - * @param connection The TCP connection - * @param message Error message - */ - virtual void onError(TcpConnection *connection, const char *message) override - { - // forget about the timer for this connection - remove(connection); - } - - /** - * Method that is called when the TCP connection is closed + * Method that is called when the TCP connection is destructed * @param connection The TCP connection */ - virtual void onClosed(TcpConnection *connection) override + virtual void onDetached(TcpConnection *connection) override { - // forget about the timer for this connection - remove(connection); + // remove from the array + _wrappers.erase(std::remove_if(_wrappers.begin(), _wrappers.end(), [connection](const std::unique_ptr &wrapper) -> bool { + return wrapper->contains(connection); + }), _wrappers.end()); } public: