From b4a67ac6189bfc9e09ecf46f48487317336027fd Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Wed, 6 Mar 2019 12:53:35 +0100 Subject: [PATCH] fixed dealing with timers in libev handler --- include/amqpcpp/libev.h | 99 +++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 44 deletions(-) diff --git a/include/amqpcpp/libev.h b/include/amqpcpp/libev.h index b3ce7b8..99ff723 100644 --- a/include/amqpcpp/libev.h +++ b/include/amqpcpp/libev.h @@ -214,6 +214,16 @@ private: object->onExpired(); } + /** + * Do we need timers / is this a timed monitor? + * @return bool + */ + bool timed() const + { + // if neither timers are set + return _expire > 0.0 || _next > 0.0; + } + /** * Method that is called when the timer expired */ @@ -221,49 +231,50 @@ private: { // get the current time ev_tstamp now = ev_now(_loop); - + + // timer is no longer active, so the refcounter in the loop is restored + ev_ref(_loop); + // if the onNegotiate method was not yet called, and no heartbeat timeout was negotiated if (_timeout == 0) { - // there is a theoretical scenario in which the onNegotiate() method - // was overridden, so that the connection-timeout-timer expires, but - // the connection is ready anyway -- in that case we should ignore the timeout. - // this also occurs when heartbeats are disabled. - if (_connection->ready()) - { - // we send no heartbeats, so the timer will be stopped. - // restore the loop refcount - ev_ref(_loop); - - // done - return; - } + // this can happen in three situations: 1. a connect-timeout, 2. user space has + // told us that we're not interested in heartbeats, 3. rabbitmq does not want heartbeats, + // in either case we're no longer going to run further timers. + _next = _expire = 0.0; - // the timer expired because the connection could not be set up in time, - // close the connection with immediate effect - _connection->close(true); + // if we have a valid connection, user-space must have overridden the onNegotiate + // method, so we keep using the connection + if (_connection->ready()) return; + + // this is a connection timeout, close the connection from our side too + return (void)_connection->close(true); } - else + else if (now >= _expire) { - // was the server idle for a too long period of time? - if (now >= _expire) return (void) _connection->close(); - - // the connection is alive, and heartbeats are needed, should we send a new one? - if (now >= _next) - { - // send a heartbeat frame - _connection->heartbeat(); - - // remember when we should send out the next one - _next += std::max(_timeout / 2, 1); - } - - // reset the timer to trigger again later - _timer.repeat = std::min(_next, _expire) - now; + // the server was inactive for a too long period of time, reset state + _next = _expire = 0.0; _timeout = 0; - // restart the timer - ev_timer_again(_loop, &_timer); + // close the connection because server was inactive + return (void)_connection->close(); } + else if (now >= _next) + { + // it's time for the next heartbeat + _connection->heartbeat(); + + // remember when we should send out the next one + _next += std::max(_timeout / 2, 1); + } + + // reset the timer to trigger again later + _timer.repeat = std::min(_next, _expire) - now; + + // restart the timer + ev_timer_again(_loop, &_timer); + + // and because the timer is running again, we restore the refcounter + ev_unref(_loop); } /** @@ -273,15 +284,13 @@ private: */ virtual void onActive(int fd, int events) override { - // if the server is readable, we have some extra time before it expires - // the expire time is set to 1.5 * _timeout to close the connection when the - // third heartbeat is about to be sent + // if the server is readable, we have some extra time before it expires, the expire time + // is set to 1.5 * _timeout to close the connection when the third heartbeat is about to be sent if (_timeout != 0 && (events & EV_READ)) _expire = ev_now(_loop) + _timeout * 1.5; // pass on to the connection _connection->process(fd, events); } - public: /** @@ -324,13 +333,13 @@ private: virtual ~Wrapper() { // the timer was already stopped - if (_timeout == 0) return; - - // restore loop refcount - ev_ref(_loop); + if (!timed()) return; // stop the timer ev_timer_stop(_loop, &_timer); + + // restore loop refcount + ev_ref(_loop); } /** @@ -351,9 +360,11 @@ private: // we also know when the next heartbeat should be sent _next = now + std::max(_timeout / 2, 1); + + // because the server has just sent us some data, we will update the expire time too + _expire = now + _timeout * 1.5; // find the earliest thing that expires - // @todo does this work? _timer.repeat = std::min(_next, _expire) - now; // restart the timer