Implemented custom heartbeats in Livev

This commit is contained in:
Youri Moll 2018-12-13 13:01:38 +01:00
parent 6a02a62169
commit a92dc27653
5 changed files with 94 additions and 53 deletions

View File

@ -196,6 +196,15 @@ public:
return _implementation.expected(); return _implementation.expected();
} }
/**
* Is the connection ready to accept instructions / has passed the login handshake?
* @return bool
*/
bool ready() const
{
return _implementation.ready();
}
/** /**
* Is the connection in a usable state, or is it already closed or * Is the connection in a usable state, or is it already closed or
* in the process of being closed? * in the process of being closed?

View File

@ -218,10 +218,11 @@ public:
} }
/** /**
* Are we fully connected? * Are we fully connected and ready for instructions? This is true after the initial
* protocol and login handshake were completed.
* @return bool * @return bool
*/ */
bool connected() const bool ready() const
{ {
// state must be connected // state must be connected
return _state == state_connected; return _state == state_connected;

View File

@ -181,26 +181,21 @@ private:
* When should we send the next heartbeat? * When should we send the next heartbeat?
* @var ev_tstamp * @var ev_tstamp
*/ */
ev_tstamp _next; ev_tstamp _next = 0.0;
/** /**
* When does the connection expire / was the server for a too longer period of time idle? * When does the connection expire / was the server for a too longer period of time idle?
* During connection setup, this member is used as the connect-timeout.
* @var ev_tstamp * @var ev_tstamp
*/ */
ev_tstamp _expire; ev_tstamp _expire;
/** /**
* Are heartbeats enabled? * Interval between heartbeats (we should send every interval / 2 a new heartbeat)
* @var bool * Value zero means heartbeats are disabled.
*/
bool _enabled = false;
/**
* Interval between heartbeats
* @var uint16_t * @var uint16_t
*/ */
uint16_t _interval; uint16_t _interval = 0;
/** /**
* Callback method that is called by libev when the timer expires * Callback method that is called by libev when the timer expires
@ -222,35 +217,49 @@ private:
*/ */
virtual void onExpired() override virtual void onExpired() override
{ {
// do nothing if heartbeats are not enabled
if (!_enabled) return;
// get the current time // get the current time
ev_tstamp now = ev_now(_loop); ev_tstamp now = ev_now(_loop);
// should we send out a new heartbeat? // if the onNegotiate method was not yet called, and not heartbeat interval was negotiated
if (now >= _next) if (_interval == 0)
{ {
// send a heartbeat frame // there is a theoretical scenario in which the onNegotiate method
_connection->heartbeat(); // was overridden, and the connection is still connected
if (_connection->ready()) return;
// remember when we should send out the next one // the timer expired because the connection could not be set up in time,
_next += _interval; // close the connection with immediate effect
}
// was the server idle for a too longer period of time?
if (now >= _expire)
{
// close the connection with immediate effect (this will destruct the connection)
_connection->close(true); _connection->close(true);
// done
return;
} }
else else
{ {
// find the earliest thing that expires // the connection is alive, and heartbeats are needed, should we send a new one?
_timer.repeat = std::min(_next, _expire) - now; if (now >= _next)
{
// send a heartbeat frame
_connection->heartbeat();
// restart the timer // remember when we should send out the next one
ev_timer_again(_loop, &_timer); _next += std::max(_interval / 2, 1);
}
// was the server idle for a too longer period of time?
if (now >= _expire)
{
// close the connection with immediate effect (this will destruct the connection)
_connection->close(true);
}
else
{
// find the earliest thing that expires
_timer.repeat = std::min(_next, _expire) - now;
// restart the timer
ev_timer_again(_loop, &_timer);
}
} }
} }
@ -262,7 +271,7 @@ private:
virtual void onActive(int fd, int events) override virtual void onActive(int fd, int events) override
{ {
// if the server is readable, we have some extra time before it expires // if the server is readable, we have some extra time before it expires
if (events & EV_READ) _expire = ev_now(_loop) + _interval * 2; if (events & EV_READ) _expire = ev_now(_loop) + _interval;
// pass on to the connection // pass on to the connection
_connection->process(fd, events); _connection->process(fd, events);
@ -274,20 +283,20 @@ private:
* Constructor * Constructor
* @param loop The current event loop * @param loop The current event loop
* @param connection The TCP connection * @param connection The TCP connection
* @param interval Timer interval * @param timeout Connect timeout
*/ */
Wrapper(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout) : Wrapper(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout = 60) :
_connection(connection), _connection(connection),
_loop(loop), _loop(loop),
_next(ev_now(loop) + timeout), _next(0.0),
_expire(ev_now(loop) + timeout * 2), _expire(ev_now(loop) + timeout),
_interval(timeout) _interval(0)
{ {
// store the object in the data "void*" // store the object in the data "void*"
_timer.data = this; _timer.data = this;
// initialize the libev structure // initialize the libev structure
ev_timer_init(&_timer, callback, timeout, timeout); ev_timer_init(&_timer, callback, timeout, 0.0);
// start the timer (this is the time that we reserve for setting up the connection) // start the timer (this is the time that we reserve for setting up the connection)
ev_timer_start(_loop, &_timer); ev_timer_start(_loop, &_timer);
@ -318,15 +327,29 @@ private:
/** /**
* Start the timer (and expose the interval) * Start the timer (and expose the interval)
* @return uint16_t * @param interval the heartbeat interval proposed by the server
* @return uint16_t the heartbeat interval that we accepted
*/ */
uint16_t start() uint16_t start(uint16_t interval)
{ {
// remember that heartbeats are enabled // we now know for sure that the connection was set up
_enabled = true; _interval = interval;
// expose the interval (the timer is already running, so we do not have to explicitly start it) // if heartbeats are disabled we do not have to set it
return _interval; if (_interval == 0) return 0;
// calculate current time
auto now = ev_now(_loop);
// we also know when the next heartbeat should be sent
_next = now + std::max(1, _interval / 2);
// find the earliest thing that expires
// @todo does this work?
_timer.repeat = std::min(_next, _expire) - now;
// restart the timer
ev_timer_again(_loop, &_timer);
} }
/** /**
@ -384,9 +407,8 @@ private:
*/ */
std::vector<std::unique_ptr<Wrapper>> _wrappers; std::vector<std::unique_ptr<Wrapper>> _wrappers;
/** /**
* Lookup a connection-wrapper * Lookup a connection-wrapper, when the wrapper is not found, we construct one
* @param connection * @param connection
* @return Wrapper * @return Wrapper
*/ */
@ -400,7 +422,7 @@ private:
} }
// we need a new wrapper // we need a new wrapper
auto *wrapper = new Wrapper(_loop, connection, 60); auto *wrapper = new Wrapper(_loop, connection);
// add to the wrappers // add to the wrappers
_wrappers.emplace_back(wrapper); _wrappers.emplace_back(wrapper);
@ -415,7 +437,7 @@ private:
* @param fd The filedescriptor to be monitored * @param fd The filedescriptor to be monitored
* @param flags Should the object be monitored for readability or writability? * @param flags Should the object be monitored for readability or writability?
*/ */
virtual void monitor(TcpConnection *connection, int fd, int flags) override virtual void monitor(TcpConnection *connection, int fd, int flags) override final
{ {
// lookup the appropriate wrapper and start monitoring // lookup the appropriate wrapper and start monitoring
lookup(connection)->monitor(fd, flags); lookup(connection)->monitor(fd, flags);
@ -431,7 +453,7 @@ protected:
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override
{ {
// lookup the wrapper // lookup the wrapper
return lookup(connection)->start(); return lookup(connection)->start(interval);
} }
/** /**

View File

@ -236,6 +236,15 @@ public:
*/ */
bool close(bool immediate = false); bool close(bool immediate = false);
/**
* Is the connection connected, meaning: it has passed the login handshake?
* @return bool
*/
bool ready() const
{
return _connection.ready();
}
/** /**
* Is the connection in a usable state / not yet closed or being closed * Is the connection in a usable state / not yet closed or being closed
* When a connection is usable, you can send further commands over it. When it is * When a connection is usable, you can send further commands over it. When it is

View File

@ -81,7 +81,7 @@ void ChannelImpl::onError(const ErrorCallback &callback)
if (_connection == nullptr) return callback("Channel is not linked to a connection"); if (_connection == nullptr) return callback("Channel is not linked to a connection");
// if the connection is valid, this is a pure channel error // if the connection is valid, this is a pure channel error
if (_connection->connected()) return callback("Channel is in an error state, but the connection is valid"); if (_connection->ready()) return callback("Channel is in an error state, but the connection is valid");
// the connection is closing down // the connection is closing down
if (_connection->closing()) return callback("Channel is in an error state, the AMQP connection is closing down"); if (_connection->closing()) return callback("Channel is in an error state, the AMQP connection is closing down");