Merge pull request #271 from CopernicaMarketingSoftware/heartbeats

Implemented custom heartbeats in Livev
This commit is contained in:
Emiel Bruijntjes 2018-12-13 23:41:20 +01:00 committed by GitHub
commit 20d4104d18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 53 deletions

View File

@ -196,6 +196,15 @@ public:
return _implementation.expected();
}
/**
* Is the connection ready to accept instructions / has passed the login handshake?
* @return bool
*/
bool ready() const
{
return _implementation.ready();
}
/**
* Is the connection in a usable state, or is it already closed or
* in the process of being closed?

View File

@ -218,10 +218,11 @@ public:
}
/**
* Are we fully connected?
* Are we fully connected and ready for instructions? This is true after the initial
* protocol and login handshake were completed.
* @return bool
*/
bool connected() const
bool ready() const
{
// state must be connected
return _state == state_connected;

View File

@ -181,26 +181,21 @@ private:
* When should we send the next heartbeat?
* @var ev_tstamp
*/
ev_tstamp _next;
ev_tstamp _next = 0.0;
/**
* When does the connection expire / was the server for a too longer period of time idle?
* During connection setup, this member is used as the connect-timeout.
* @var ev_tstamp
*/
ev_tstamp _expire;
/**
* Are heartbeats enabled?
* @var bool
*/
bool _enabled = false;
/**
* Interval between heartbeats
* Interval between heartbeats (we should send every interval / 2 a new heartbeat)
* Value zero means heartbeats are disabled.
* @var uint16_t
*/
uint16_t _interval;
uint16_t _interval = 0;
/**
* Callback method that is called by libev when the timer expires
@ -222,35 +217,49 @@ private:
*/
virtual void onExpired() override
{
// do nothing if heartbeats are not enabled
if (!_enabled) return;
// get the current time
ev_tstamp now = ev_now(_loop);
// should we send out a new heartbeat?
if (now >= _next)
// if the onNegotiate method was not yet called, and not heartbeat interval was negotiated
if (_interval == 0)
{
// send a heartbeat frame
_connection->heartbeat();
// there is a theoretical scenario in which the onNegotiate method
// was overridden, and the connection is still connected
if (_connection->ready()) return;
// remember when we should send out the next one
_next += _interval;
}
// was the server idle for a too longer period of time?
if (now >= _expire)
{
// close the connection with immediate effect (this will destruct the connection)
// the timer expired because the connection could not be set up in time,
// close the connection with immediate effect
_connection->close(true);
// done
return;
}
else
{
// find the earliest thing that expires
_timer.repeat = std::min(_next, _expire) - now;
// restart the timer
ev_timer_again(_loop, &_timer);
// the connection is alive, and heartbeats are needed, should we send a new one?
if (now >= _next)
{
// send a heartbeat frame
_connection->heartbeat();
// remember when we should send out the next one
_next += std::max(_interval / 2, 1);
}
// was the server idle for a too longer period of time?
if (now >= _expire)
{
// close the connection with immediate effect (this will destruct the connection)
_connection->close(true);
}
else
{
// find the earliest thing that expires
_timer.repeat = std::min(_next, _expire) - now;
// restart the timer
ev_timer_again(_loop, &_timer);
}
}
}
@ -262,7 +271,7 @@ private:
virtual void onActive(int fd, int events) override
{
// if the server is readable, we have some extra time before it expires
if (events & EV_READ) _expire = ev_now(_loop) + _interval * 2;
if (events & EV_READ) _expire = ev_now(_loop) + _interval;
// pass on to the connection
_connection->process(fd, events);
@ -274,20 +283,20 @@ private:
* Constructor
* @param loop The current event loop
* @param connection The TCP connection
* @param interval Timer interval
* @param timeout Connect timeout
*/
Wrapper(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout) :
Wrapper(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout = 60) :
_connection(connection),
_loop(loop),
_next(ev_now(loop) + timeout),
_expire(ev_now(loop) + timeout * 2),
_interval(timeout)
_next(0.0),
_expire(ev_now(loop) + timeout),
_interval(0)
{
// store the object in the data "void*"
_timer.data = this;
// initialize the libev structure
ev_timer_init(&_timer, callback, timeout, timeout);
ev_timer_init(&_timer, callback, timeout, 0.0);
// start the timer (this is the time that we reserve for setting up the connection)
ev_timer_start(_loop, &_timer);
@ -315,18 +324,32 @@ private:
// stop the timer
ev_timer_stop(_loop, &_timer);
}
/**
* Start the timer (and expose the interval)
* @return uint16_t
* @param interval the heartbeat interval proposed by the server
* @return uint16_t the heartbeat interval that we accepted
*/
uint16_t start()
uint16_t start(uint16_t interval)
{
// remember that heartbeats are enabled
_enabled = true;
// we now know for sure that the connection was set up
_interval = interval;
// expose the interval (the timer is already running, so we do not have to explicitly start it)
return _interval;
// if heartbeats are disabled we do not have to set it
if (_interval == 0) return 0;
// calculate current time
auto now = ev_now(_loop);
// we also know when the next heartbeat should be sent
_next = now + std::max(1, _interval / 2);
// find the earliest thing that expires
// @todo does this work?
_timer.repeat = std::min(_next, _expire) - now;
// restart the timer
ev_timer_again(_loop, &_timer);
}
/**
@ -377,16 +400,15 @@ private:
* @var struct ev_loop*
*/
struct ev_loop *_loop;
/**
* Each connection is wrapped
* @var std::vector
*/
std::vector<std::unique_ptr<Wrapper>> _wrappers;
/**
* Lookup a connection-wrapper
* Lookup a connection-wrapper, when the wrapper is not found, we construct one
* @param connection
* @return Wrapper
*/
@ -400,7 +422,7 @@ private:
}
// we need a new wrapper
auto *wrapper = new Wrapper(_loop, connection, 60);
auto *wrapper = new Wrapper(_loop, connection);
// add to the wrappers
_wrappers.emplace_back(wrapper);
@ -415,7 +437,7 @@ private:
* @param fd The filedescriptor to be monitored
* @param flags Should the object be monitored for readability or writability?
*/
virtual void monitor(TcpConnection *connection, int fd, int flags) override
virtual void monitor(TcpConnection *connection, int fd, int flags) override final
{
// lookup the appropriate wrapper and start monitoring
lookup(connection)->monitor(fd, flags);
@ -431,7 +453,7 @@ protected:
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override
{
// lookup the wrapper
return lookup(connection)->start();
return lookup(connection)->start(interval);
}
/**

View File

@ -236,6 +236,15 @@ public:
*/
bool close(bool immediate = false);
/**
* Is the connection connected, meaning: it has passed the login handshake?
* @return bool
*/
bool ready() const
{
return _connection.ready();
}
/**
* Is the connection in a usable state / not yet closed or being closed
* When a connection is usable, you can send further commands over it. When it is

View File

@ -81,7 +81,7 @@ void ChannelImpl::onError(const ErrorCallback &callback)
if (_connection == nullptr) return callback("Channel is not linked to a connection");
// if the connection is valid, this is a pure channel error
if (_connection->connected()) return callback("Channel is in an error state, but the connection is valid");
if (_connection->ready()) return callback("Channel is in an error state, but the connection is valid");
// the connection is closing down
if (_connection->closing()) return callback("Channel is in an error state, the AMQP connection is closing down");