diff --git a/include/amqpcpp/connection.h b/include/amqpcpp/connection.h index ade2c3b..ee4bb22 100644 --- a/include/amqpcpp/connection.h +++ b/include/amqpcpp/connection.h @@ -196,6 +196,15 @@ public: return _implementation.expected(); } + /** + * Is the connection ready to accept instructions / has passed the login handshake? + * @return bool + */ + bool ready() const + { + return _implementation.ready(); + } + /** * Is the connection in a usable state, or is it already closed or * in the process of being closed? diff --git a/include/amqpcpp/connectionimpl.h b/include/amqpcpp/connectionimpl.h index aa040b3..e9fa67d 100644 --- a/include/amqpcpp/connectionimpl.h +++ b/include/amqpcpp/connectionimpl.h @@ -218,10 +218,11 @@ public: } /** - * Are we fully connected? + * Are we fully connected and ready for instructions? This is true after the initial + * protocol and login handshake were completed. * @return bool */ - bool connected() const + bool ready() const { // state must be connected return _state == state_connected; diff --git a/include/amqpcpp/libev.h b/include/amqpcpp/libev.h index 416f5cd..2e6f25a 100644 --- a/include/amqpcpp/libev.h +++ b/include/amqpcpp/libev.h @@ -181,26 +181,21 @@ private: * When should we send the next heartbeat? * @var ev_tstamp */ - ev_tstamp _next; + ev_tstamp _next = 0.0; /** * When does the connection expire / was the server for a too longer period of time idle? + * During connection setup, this member is used as the connect-timeout. * @var ev_tstamp */ ev_tstamp _expire; /** - * Are heartbeats enabled? - * @var bool - */ - bool _enabled = false; - - /** - * Interval between heartbeats + * Interval between heartbeats (we should send every interval / 2 a new heartbeat) + * Value zero means heartbeats are disabled. * @var uint16_t */ - uint16_t _interval; - + uint16_t _interval = 0; /** * Callback method that is called by libev when the timer expires @@ -222,35 +217,49 @@ private: */ virtual void onExpired() override { - // do nothing if heartbeats are not enabled - if (!_enabled) return; - // get the current time ev_tstamp now = ev_now(_loop); - // should we send out a new heartbeat? - if (now >= _next) + // if the onNegotiate method was not yet called, and not heartbeat interval was negotiated + if (_interval == 0) { - // send a heartbeat frame - _connection->heartbeat(); + // there is a theoretical scenario in which the onNegotiate method + // was overridden, and the connection is still connected + if (_connection->ready()) return; - // remember when we should send out the next one - _next += _interval; - } - - // was the server idle for a too longer period of time? - if (now >= _expire) - { - // close the connection with immediate effect (this will destruct the connection) + // the timer expired because the connection could not be set up in time, + // close the connection with immediate effect _connection->close(true); + + // done + return; } else { - // find the earliest thing that expires - _timer.repeat = std::min(_next, _expire) - now; - - // restart the timer - ev_timer_again(_loop, &_timer); + // the connection is alive, and heartbeats are needed, should we send a new one? + if (now >= _next) + { + // send a heartbeat frame + _connection->heartbeat(); + + // remember when we should send out the next one + _next += std::max(_interval / 2, 1); + } + + // was the server idle for a too longer period of time? + if (now >= _expire) + { + // close the connection with immediate effect (this will destruct the connection) + _connection->close(true); + } + else + { + // find the earliest thing that expires + _timer.repeat = std::min(_next, _expire) - now; + + // restart the timer + ev_timer_again(_loop, &_timer); + } } } @@ -262,7 +271,7 @@ private: virtual void onActive(int fd, int events) override { // if the server is readable, we have some extra time before it expires - if (events & EV_READ) _expire = ev_now(_loop) + _interval * 2; + if (events & EV_READ) _expire = ev_now(_loop) + _interval; // pass on to the connection _connection->process(fd, events); @@ -274,20 +283,20 @@ private: * Constructor * @param loop The current event loop * @param connection The TCP connection - * @param interval Timer interval + * @param timeout Connect timeout */ - Wrapper(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout) : + Wrapper(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout = 60) : _connection(connection), _loop(loop), - _next(ev_now(loop) + timeout), - _expire(ev_now(loop) + timeout * 2), - _interval(timeout) + _next(0.0), + _expire(ev_now(loop) + timeout), + _interval(0) { // store the object in the data "void*" _timer.data = this; // initialize the libev structure - ev_timer_init(&_timer, callback, timeout, timeout); + ev_timer_init(&_timer, callback, timeout, 0.0); // start the timer (this is the time that we reserve for setting up the connection) ev_timer_start(_loop, &_timer); @@ -315,18 +324,32 @@ private: // stop the timer ev_timer_stop(_loop, &_timer); } - + /** * Start the timer (and expose the interval) - * @return uint16_t + * @param interval the heartbeat interval proposed by the server + * @return uint16_t the heartbeat interval that we accepted */ - uint16_t start() + uint16_t start(uint16_t interval) { - // remember that heartbeats are enabled - _enabled = true; + // we now know for sure that the connection was set up + _interval = interval; - // expose the interval (the timer is already running, so we do not have to explicitly start it) - return _interval; + // if heartbeats are disabled we do not have to set it + if (_interval == 0) return 0; + + // calculate current time + auto now = ev_now(_loop); + + // we also know when the next heartbeat should be sent + _next = now + std::max(1, _interval / 2); + + // find the earliest thing that expires + // @todo does this work? + _timer.repeat = std::min(_next, _expire) - now; + + // restart the timer + ev_timer_again(_loop, &_timer); } /** @@ -377,16 +400,15 @@ private: * @var struct ev_loop* */ struct ev_loop *_loop; - + /** * Each connection is wrapped * @var std::vector */ std::vector> _wrappers; - /** - * Lookup a connection-wrapper + * Lookup a connection-wrapper, when the wrapper is not found, we construct one * @param connection * @return Wrapper */ @@ -400,7 +422,7 @@ private: } // we need a new wrapper - auto *wrapper = new Wrapper(_loop, connection, 60); + auto *wrapper = new Wrapper(_loop, connection); // add to the wrappers _wrappers.emplace_back(wrapper); @@ -415,7 +437,7 @@ private: * @param fd The filedescriptor to be monitored * @param flags Should the object be monitored for readability or writability? */ - virtual void monitor(TcpConnection *connection, int fd, int flags) override + virtual void monitor(TcpConnection *connection, int fd, int flags) override final { // lookup the appropriate wrapper and start monitoring lookup(connection)->monitor(fd, flags); @@ -431,7 +453,7 @@ protected: virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override { // lookup the wrapper - return lookup(connection)->start(); + return lookup(connection)->start(interval); } /** diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index 2040793..5095be1 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -236,6 +236,15 @@ public: */ bool close(bool immediate = false); + /** + * Is the connection connected, meaning: it has passed the login handshake? + * @return bool + */ + bool ready() const + { + return _connection.ready(); + } + /** * Is the connection in a usable state / not yet closed or being closed * When a connection is usable, you can send further commands over it. When it is diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index c275aa4..f3c5709 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -81,7 +81,7 @@ void ChannelImpl::onError(const ErrorCallback &callback) if (_connection == nullptr) return callback("Channel is not linked to a connection"); // if the connection is valid, this is a pure channel error - if (_connection->connected()) return callback("Channel is in an error state, but the connection is valid"); + if (_connection->ready()) return callback("Channel is in an error state, but the connection is valid"); // the connection is closing down if (_connection->closing()) return callback("Channel is in an error state, the AMQP connection is closing down");