libev implementation was incorrect when one single handler was used to manage multiple connections

This commit is contained in:
Aljar Meesters 2018-04-12 12:05:22 +02:00
parent 93a9b37843
commit 1db6ed13a6
1 changed files with 65 additions and 44 deletions

View File

@ -20,7 +20,6 @@
* Dependencies * Dependencies
*/ */
#include <ev.h> #include <ev.h>
#include "amqpcpp/linux_tcp.h" #include "amqpcpp/linux_tcp.h"
/** /**
@ -154,36 +153,30 @@ private:
connection->heartbeat(); connection->heartbeat();
} }
/**
* Stop the timer
*/
void stop()
{
// do nothing if it was never set
if (_timer.data == nullptr) return;
// restore loop refcount
ev_ref(_loop);
// stop the timer
ev_timer_stop(_loop, &_timer);
// restore data nullptr to indicate that timer is not set
_timer.data = nullptr;
}
public: public:
/** /**
* Constructor * Constructor
* @param loop The current event loop * @param loop The current event loop
* @param connection The TCP connection
* @param interval Timer interval
*/ */
Timer(struct ev_loop *loop) : _loop(loop) Timer(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout) : _loop(loop)
{ {
// there is no data yet // store the connection in the data "void*"
_timer.data = nullptr; _timer.data = connection;
// initialize the libev structure // initialize the libev structure
ev_timer_init(&_timer, callback, 60.0, 60.0); ev_timer_init(&_timer, callback, 60.0, 60.0);
// set the timer
ev_timer_set(&_timer, timeout, timeout);
// and start it
ev_timer_start(_loop, &_timer);
// the timer should not keep the event loop active
ev_unref(_loop);
} }
/** /**
@ -199,31 +192,21 @@ private:
*/ */
virtual ~Timer() virtual ~Timer()
{ {
// restore loop refcount
ev_ref(_loop);
// stop the timer // stop the timer
stop(); ev_timer_stop(_loop, &_timer);
} }
/** /**
* Change the expire time * Check if the timer is associated with a certain connection
* @param connection * @param connection
* @param timeout * @return bool
*/ */
void set(TcpConnection *connection, uint16_t timeout) bool contains(const AMQP::TcpConnection *connection) const
{ {
// stop timer in case it was already set return _timer.data == connection;
stop();
// store the connection in the data "void*"
_timer.data = connection;
// set the timer
ev_timer_set(&_timer, timeout, timeout);
// and start it
ev_timer_start(_loop, &_timer);
// the timer should not keep the event loop active
ev_unref(_loop);
} }
}; };
@ -240,12 +223,29 @@ private:
std::map<int,std::unique_ptr<Watcher>> _watchers; std::map<int,std::unique_ptr<Watcher>> _watchers;
/** /**
* A timer to periodically send out heartbeats * A timer to periodically send out heartbeats, per connection
* @var Timer * @var Timer
*/ */
Timer _timer; std::vector<std::unique_ptr<Timer>> _timers;
/**
* Remove all settings that is associated with a certain connection
* @param connection
*/
void remove(const AMQP::TcpConnection *connection)
{
// remove from the timers (std::remove moves the matching elements to the end)
auto iter = std::remove_if(_timers.begin(), _timers.end(), [connection](std::unique_ptr<Timer> &timer) -> bool {
// connection should match
return timer->contains(connection);
});
// no remove the timers for real
_timers.erase(iter, _timers.end());
}
/** /**
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
* @param connection The TCP connection object that is reporting * @param connection The TCP connection object that is reporting
@ -291,18 +291,39 @@ protected:
if (interval == 0) return 0; if (interval == 0) return 0;
// set the timer // set the timer
_timer.set(connection, interval); _timers.emplace_back(new Timer(_loop, connection, interval));
// we agree with the interval // we agree with the interval
return interval; return interval;
} }
/**
* Method that is called when the TCP connection ends up in an error state
* @param connection The TCP connection
* @param message Error message
*/
virtual void onError(TcpConnection *connection, const char *message) override
{
// forget about the timer for this connection
remove(connection);
}
/**
* Method that is called when the TCP connection is closed
* @param connection The TCP connection
*/
virtual void onClosed(TcpConnection *connection) override
{
// forget about the timer for this connection
remove(connection);
}
public: public:
/** /**
* Constructor * Constructor
* @param loop The event loop to wrap * @param loop The event loop to wrap
*/ */
LibEvHandler(struct ev_loop *loop) : _loop(loop), _timer(loop) {} LibEvHandler(struct ev_loop *loop) : _loop(loop) {}
/** /**
* Destructor * Destructor