fixed the libev handler to be compatible with amqp-cpp 4.0, and we now also close the connection if the server stops sending heartbeats
This commit is contained in:
parent
fc21b62852
commit
d83e88b95d
|
|
@ -33,6 +33,25 @@ namespace AMQP {
|
|||
class LibEvHandler : public TcpHandler
|
||||
{
|
||||
private:
|
||||
/**
|
||||
* Internal interface for the object that is being watched
|
||||
*/
|
||||
class Watchable
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* The method that is called when a filedescriptor becomes active
|
||||
* @param fd
|
||||
* @param events
|
||||
*/
|
||||
virtual void onActive(int fd, int events) = 0;
|
||||
|
||||
/**
|
||||
* Method that is called when the timer expires
|
||||
*/
|
||||
virtual void onExpired() = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Helper class that wraps a libev I/O watcher
|
||||
*/
|
||||
|
|
@ -59,28 +78,28 @@ private:
|
|||
*/
|
||||
static void callback(struct ev_loop *loop, struct ev_io *watcher, int revents)
|
||||
{
|
||||
// retrieve the connection
|
||||
TcpConnection *connection = static_cast<TcpConnection*>(watcher->data);
|
||||
// retrieve the watched object
|
||||
Watchable *object = static_cast<Watchable*>(watcher->data);
|
||||
|
||||
// tell the connection that its filedescriptor is active
|
||||
connection->process(watcher->fd, revents);
|
||||
// tell the object that its filedescriptor is active
|
||||
object->onActive(watcher->fd, revents);
|
||||
}
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
* @param loop The current event loop
|
||||
* @param connection The connection being watched
|
||||
* @param object The object being watched
|
||||
* @param fd The filedescriptor being watched
|
||||
* @param events The events that should be monitored
|
||||
*/
|
||||
Watcher(struct ev_loop *loop, TcpConnection *connection, int fd, int events) : _loop(loop)
|
||||
Watcher(struct ev_loop *loop, Watchable *object, int fd, int events) : _loop(loop)
|
||||
{
|
||||
// initialize the libev structure
|
||||
ev_io_init(&_io, callback, fd, events);
|
||||
|
||||
// store the connection in the data "void*"
|
||||
_io.data = connection;
|
||||
// store the object in the data "void*"
|
||||
_io.data = object;
|
||||
|
||||
// start the watcher
|
||||
ev_io_start(_loop, &_io);
|
||||
|
|
@ -102,6 +121,13 @@ private:
|
|||
// stop the watcher
|
||||
ev_io_stop(_loop, &_io);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a filedescriptor is covered by the watcher
|
||||
* @param fd
|
||||
* @return bool
|
||||
*/
|
||||
bool contains(int fd) const { return _io.fd == fd; }
|
||||
|
||||
/**
|
||||
* Change the events for which the filedescriptor is monitored
|
||||
|
|
@ -121,11 +147,18 @@ private:
|
|||
};
|
||||
|
||||
/**
|
||||
* Timer class to periodically fire a heartbeat
|
||||
* Wrapper around a connection, this will monitor the filedescriptors
|
||||
* and run a timer to send out heartbeats
|
||||
*/
|
||||
class Timer
|
||||
class Wrapper : private Watchable
|
||||
{
|
||||
private:
|
||||
/**
|
||||
* The connection that is monitored
|
||||
* @var TcpConnection
|
||||
*/
|
||||
TcpConnection *_connection;
|
||||
|
||||
/**
|
||||
* The event loop to which it is attached
|
||||
* @var struct ev_loop
|
||||
|
|
@ -133,10 +166,35 @@ private:
|
|||
struct ev_loop *_loop;
|
||||
|
||||
/**
|
||||
* The actual watcher structure
|
||||
* The watcher for the timer
|
||||
* @var struct ev_io
|
||||
*/
|
||||
struct ev_timer _timer;
|
||||
|
||||
/**
|
||||
* IO-watchers to monitor filedescriptors
|
||||
* @var std::vector
|
||||
*/
|
||||
std::vector<std::unique_ptr<Watcher>> _watchers;
|
||||
|
||||
/**
|
||||
* When should we send the next heartbeat?
|
||||
* @var ev_tstamp
|
||||
*/
|
||||
ev_tstamp _next;
|
||||
|
||||
/**
|
||||
* When does the connection expire / was the server for a too longer period of time idle?
|
||||
* @var ev_tstamp
|
||||
*/
|
||||
ev_tstamp _expire;
|
||||
|
||||
/**
|
||||
* Interval between heartbeats
|
||||
* @var uint16_t
|
||||
*/
|
||||
uint16_t _interval;
|
||||
|
||||
|
||||
/**
|
||||
* Callback method that is called by libev when the timer expires
|
||||
|
|
@ -146,11 +204,60 @@ private:
|
|||
*/
|
||||
static void callback(struct ev_loop *loop, struct ev_timer *timer, int revents)
|
||||
{
|
||||
// retrieve the connection
|
||||
TcpConnection *connection = static_cast<TcpConnection*>(timer->data);
|
||||
// retrieve the object
|
||||
Watchable *object = static_cast<Watchable*>(timer->data);
|
||||
|
||||
// send the heartbeat
|
||||
connection->heartbeat();
|
||||
// tell the object that it expired
|
||||
object->onExpired();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that is called when the timer expired
|
||||
*/
|
||||
virtual void onExpired() override
|
||||
{
|
||||
// get the current time
|
||||
ev_tstamp now = ev_now(_loop);
|
||||
|
||||
// should we send out a new heartbeat?
|
||||
if (now >= _next)
|
||||
{
|
||||
// send a heartbeat frame
|
||||
_connection->heartbeat();
|
||||
|
||||
// remember when we should send out the next one
|
||||
_next += _interval;
|
||||
}
|
||||
|
||||
// was the server idle for a too longer period of time?
|
||||
if (now >= _expire)
|
||||
{
|
||||
// close the connection with immediate effect (this will destruct the connection)
|
||||
// @todo do we want to report an error too?
|
||||
_connection->close(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
// find the earliest thing that expires
|
||||
_timer.repeat = std::min(_next, _expire) - now;
|
||||
|
||||
// restart the timer
|
||||
ev_timer_again(_loop, &_timer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that is called when a filedescriptor becomes active
|
||||
* @param fd the filedescriptor that is active
|
||||
* @param events the events that are active (readable/writable)
|
||||
*/
|
||||
virtual void onActive(int fd, int events) override
|
||||
{
|
||||
// if the server is readable, we have some extra time before it expires
|
||||
if (events & EV_READ) _expire = ev_now(_loop) + _interval * 2;
|
||||
|
||||
// pass on to the connection
|
||||
_connection->process(fd, events);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -161,16 +268,18 @@ private:
|
|||
* @param connection The TCP connection
|
||||
* @param interval Timer interval
|
||||
*/
|
||||
Timer(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout) : _loop(loop)
|
||||
Wrapper(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout) :
|
||||
_connection(connection),
|
||||
_loop(loop),
|
||||
_next(ev_now(loop) + timeout),
|
||||
_expire(ev_now(loop) + timeout * 2),
|
||||
_interval(timeout)
|
||||
{
|
||||
// store the connection in the data "void*"
|
||||
_timer.data = connection;
|
||||
// store the object in the data "void*"
|
||||
_timer.data = this;
|
||||
|
||||
// initialize the libev structure
|
||||
ev_timer_init(&_timer, callback, 60.0, 60.0);
|
||||
|
||||
// set the timer
|
||||
ev_timer_set(&_timer, timeout, timeout);
|
||||
ev_timer_init(&_timer, callback, timeout, timeout);
|
||||
|
||||
// and start it
|
||||
ev_timer_start(_loop, &_timer);
|
||||
|
|
@ -184,13 +293,13 @@ private:
|
|||
*
|
||||
* @param that The object to not move or copy
|
||||
*/
|
||||
Timer(Watcher &&that) = delete;
|
||||
Timer(const Watcher &that) = delete;
|
||||
Wrapper(Wrapper &&that) = delete;
|
||||
Wrapper(const Wrapper &that) = delete;
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~Timer()
|
||||
virtual ~Wrapper()
|
||||
{
|
||||
// restore loop refcount
|
||||
ev_ref(_loop);
|
||||
|
|
@ -199,6 +308,15 @@ private:
|
|||
ev_timer_stop(_loop, &_timer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Expose the selected heartbeat interval
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t interval() const
|
||||
{
|
||||
return _interval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the timer is associated with a certain connection
|
||||
* @param connection
|
||||
|
|
@ -206,7 +324,42 @@ private:
|
|||
*/
|
||||
bool contains(const AMQP::TcpConnection *connection) const
|
||||
{
|
||||
return _timer.data == connection;
|
||||
// compare the connections
|
||||
return _connection == connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Monitor a filedescriptor
|
||||
* @param fd
|
||||
* @param events
|
||||
*/
|
||||
void monitor(int fd, int events)
|
||||
{
|
||||
// should we remove?
|
||||
if (events == 0)
|
||||
{
|
||||
// remove the io-watcher
|
||||
_watchers.erase(std::remove_if(_watchers.begin(), _watchers.end(), [fd](const std::unique_ptr<Watcher> &watcher) -> bool {
|
||||
|
||||
// compare filedescriptors
|
||||
return watcher->contains(fd);
|
||||
}), _watchers.end());
|
||||
}
|
||||
else
|
||||
{
|
||||
// look in the array for this filedescriptor
|
||||
for (auto &watcher : _watchers)
|
||||
{
|
||||
// do we have a match?
|
||||
if (watcher->fd() != fd) continue;
|
||||
|
||||
// change the events (and leap out)
|
||||
return watcher->events(events);
|
||||
}
|
||||
|
||||
// we should monitor a new filedescriptor
|
||||
_watchers.emplace_back(new Watcher(_loop, this, fd, events));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -217,33 +370,34 @@ private:
|
|||
struct ev_loop *_loop;
|
||||
|
||||
/**
|
||||
* All I/O watchers that are active, indexed by their filedescriptor
|
||||
* @var std::map<int,Watcher>
|
||||
* Each connection is wrapped
|
||||
* @var std::vector
|
||||
*/
|
||||
std::map<int,std::unique_ptr<Watcher>> _watchers;
|
||||
|
||||
/**
|
||||
* A timer to periodically send out heartbeats, per connection
|
||||
* @var Timer
|
||||
*/
|
||||
std::vector<std::unique_ptr<Timer>> _timers;
|
||||
std::vector<std::unique_ptr<Wrapper>> _wrappers;
|
||||
|
||||
|
||||
/**
|
||||
* Remove all settings that is associated with a certain connection
|
||||
* Lookup a connection-wrapper
|
||||
* @param connection
|
||||
* @return Wrapper
|
||||
*/
|
||||
void remove(const AMQP::TcpConnection *connection)
|
||||
Wrapper *lookup(TcpConnection *connection)
|
||||
{
|
||||
// remove from the timers (std::remove moves the matching elements to the end)
|
||||
auto iter = std::remove_if(_timers.begin(), _timers.end(), [connection](std::unique_ptr<Timer> &timer) -> bool {
|
||||
|
||||
// connection should match
|
||||
return timer->contains(connection);
|
||||
});
|
||||
// look for the appropriate connection
|
||||
for (auto &wrapper : _wrappers)
|
||||
{
|
||||
// do we have a match?
|
||||
if (wrapper->contains(connection)) return wrapper.get();
|
||||
}
|
||||
|
||||
// no remove the timers for real
|
||||
_timers.erase(iter, _timers.end());
|
||||
// we need a new wrapper
|
||||
auto *wrapper = new Wrapper(_loop, connection, 60);
|
||||
|
||||
// add to the wrappers
|
||||
_wrappers.emplace_back(wrapper);
|
||||
|
||||
// done
|
||||
return wrapper;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -254,28 +408,8 @@ private:
|
|||
*/
|
||||
virtual void monitor(TcpConnection *connection, int fd, int flags) override
|
||||
{
|
||||
// do we already have this filedescriptor
|
||||
auto iter = _watchers.find(fd);
|
||||
|
||||
// was it found?
|
||||
if (iter == _watchers.end())
|
||||
{
|
||||
// we did not yet have this watcher - but that is ok if no filedescriptor was registered
|
||||
if (flags == 0) return;
|
||||
|
||||
// construct a new watcher, and put it in the map
|
||||
_watchers[fd] = std::unique_ptr<Watcher>(new Watcher(_loop, connection, fd, flags));
|
||||
}
|
||||
else if (flags == 0)
|
||||
{
|
||||
// the watcher does already exist, but we no longer have to watch this watcher
|
||||
_watchers.erase(iter);
|
||||
}
|
||||
else
|
||||
{
|
||||
// change the events
|
||||
iter->second->events(flags);
|
||||
}
|
||||
// lookup the appropriate wrapper and start monitoring
|
||||
lookup(connection)->monitor(fd, flags);
|
||||
}
|
||||
|
||||
protected:
|
||||
|
|
@ -287,35 +421,20 @@ protected:
|
|||
*/
|
||||
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override
|
||||
{
|
||||
// skip if no heartbeats are needed
|
||||
if (interval == 0) return 0;
|
||||
|
||||
// set the timer
|
||||
_timers.emplace_back(new Timer(_loop, connection, interval));
|
||||
|
||||
// we agree with the interval
|
||||
return interval;
|
||||
// lookup the wrapper
|
||||
return lookup(connection)->interval();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that is called when the TCP connection ends up in an error state
|
||||
* @param connection The TCP connection
|
||||
* @param message Error message
|
||||
*/
|
||||
virtual void onError(TcpConnection *connection, const char *message) override
|
||||
{
|
||||
// forget about the timer for this connection
|
||||
remove(connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that is called when the TCP connection is closed
|
||||
* Method that is called when the TCP connection is destructed
|
||||
* @param connection The TCP connection
|
||||
*/
|
||||
virtual void onClosed(TcpConnection *connection) override
|
||||
virtual void onDetached(TcpConnection *connection) override
|
||||
{
|
||||
// forget about the timer for this connection
|
||||
remove(connection);
|
||||
// remove from the array
|
||||
_wrappers.erase(std::remove_if(_wrappers.begin(), _wrappers.end(), [connection](const std::unique_ptr<Wrapper> &wrapper) -> bool {
|
||||
return wrapper->contains(connection);
|
||||
}), _wrappers.end());
|
||||
}
|
||||
|
||||
public:
|
||||
|
|
|
|||
Loading…
Reference in New Issue