Fix segfault when the handler is destructed from within a callback

This commit is contained in:
Martijn Otto 2016-01-15 14:19:09 +01:00
parent 25ce57818a
commit 89c2075a5f
4 changed files with 45 additions and 29 deletions

View File

@ -1,12 +1,12 @@
/** /**
* LibEV.h * LibEV.h
* *
* Implementation for the AMQP::TcpHandler that is optimized for libev. You can * Implementation for the AMQP::TcpHandler that is optimized for libev. You can
* use this class instead of a AMQP::TcpHandler class, just pass the event loop * use this class instead of a AMQP::TcpHandler class, just pass the event loop
* to the constructor and you're all set * to the constructor and you're all set
* *
* Compile with: "g++ -std=c++11 libev.cpp -lamqpcpp -lev -lpthread" * Compile with: "g++ -std=c++11 libev.cpp -lamqpcpp -lev -lpthread"
* *
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com> * @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 Copernica BV * @copyright 2015 Copernica BV
*/ */
@ -43,7 +43,7 @@ private:
* @var struct ev_loop * @var struct ev_loop
*/ */
struct ev_loop *_loop; struct ev_loop *_loop;
/** /**
* The actual watcher structure * The actual watcher structure
* @var struct ev_io * @var struct ev_io
@ -60,11 +60,11 @@ private:
{ {
// retrieve the connection // retrieve the connection
TcpConnection *connection = static_cast<TcpConnection*>(watcher->data); TcpConnection *connection = static_cast<TcpConnection*>(watcher->data);
// tell the connection that its filedescriptor is active // tell the connection that its filedescriptor is active
connection->process(watcher->fd, revents); connection->process(watcher->fd, revents);
} }
public: public:
/** /**
* Constructor * Constructor
@ -77,14 +77,22 @@ private:
{ {
// initialize the libev structure // initialize the libev structure
ev_io_init(&_io, callback, fd, events); ev_io_init(&_io, callback, fd, events);
// store the connection in the data "void*" // store the connection in the data "void*"
_io.data = connection; _io.data = connection;
// start the watcher // start the watcher
ev_io_start(_loop, &_io); ev_io_start(_loop, &_io);
} }
/**
* Watchers cannot be copied or moved
*
* @param that The object to not move or copy
*/
Watcher(Watcher &&that) = delete;
Watcher(const Watcher &that) = delete;
/** /**
* Destructor * Destructor
*/ */
@ -93,7 +101,7 @@ private:
// stop the watcher // stop the watcher
ev_io_stop(_loop, &_io); ev_io_stop(_loop, &_io);
} }
/** /**
* Change the events for which the filedescriptor is monitored * Change the events for which the filedescriptor is monitored
* @param events * @param events
@ -102,10 +110,10 @@ private:
{ {
// stop the watcher if it was active // stop the watcher if it was active
ev_io_stop(_loop, &_io); ev_io_stop(_loop, &_io);
// set the events // set the events
ev_io_set(&_io, _io.fd, events); ev_io_set(&_io, _io.fd, events);
// and restart it // and restart it
ev_io_start(_loop, &_io); ev_io_start(_loop, &_io);
} }
@ -117,14 +125,14 @@ private:
* @var struct ev_loop* * @var struct ev_loop*
*/ */
struct ev_loop *_loop; struct ev_loop *_loop;
/** /**
* All I/O watchers that are active, indexed by their filedescriptor * All I/O watchers that are active, indexed by their filedescriptor
* @var std::map<int,Watcher> * @var std::map<int,Watcher>
*/ */
std::map<int,std::unique_ptr<Watcher>> _watchers; std::map<int,std::unique_ptr<Watcher>> _watchers;
/** /**
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
* @param connection The TCP connection object that is reporting * @param connection The TCP connection object that is reporting
@ -135,13 +143,13 @@ private:
{ {
// do we already have this filedescriptor // do we already have this filedescriptor
auto iter = _watchers.find(fd); auto iter = _watchers.find(fd);
// was it found? // was it found?
if (iter == _watchers.end()) if (iter == _watchers.end())
{ {
// we did not yet have this watcher - but that is ok if no filedescriptor was registered // we did not yet have this watcher - but that is ok if no filedescriptor was registered
if (flags == 0) return; if (flags == 0) return;
// construct a new watcher, and put it in the map // construct a new watcher, and put it in the map
_watchers[fd] = std::unique_ptr<Watcher>(new Watcher(_loop, connection, fd, flags)); _watchers[fd] = std::unique_ptr<Watcher>(new Watcher(_loop, connection, fd, flags));
} }
@ -163,13 +171,13 @@ public:
* @param loop The event loop to wrap * @param loop The event loop to wrap
*/ */
LibEvHandler(struct ev_loop *loop) : _loop(loop) {} LibEvHandler(struct ev_loop *loop) : _loop(loop) {}
/** /**
* Destructor * Destructor
*/ */
virtual ~LibEvHandler() = default; virtual ~LibEvHandler() = default;
}; };
/** /**
* End of namespace * End of namespace
*/ */

View File

@ -26,7 +26,9 @@ class TcpState;
/** /**
* Class definition * Class definition
*/ */
class TcpConnection : private ConnectionHandler class TcpConnection :
private ConnectionHandler,
private Watchable
{ {
private: private:
/** /**

View File

@ -47,17 +47,23 @@ TcpConnection::~TcpConnection()
*/ */
void TcpConnection::process(int fd, int flags) void TcpConnection::process(int fd, int flags)
{ {
// monitor the object for destruction
Monitor monitor{ this };
// pass on the the state, that returns a new impl // pass on the the state, that returns a new impl
auto *result = _state->process(fd, flags); auto *result = _state->process(fd, flags);
// are we still valid
if (!monitor.valid()) return;
// skip if the same state is continued to be used, or when the process() // skip if the same state is continued to be used, or when the process()
// method returns nullptr (which only happens when the object is destructed, // method returns nullptr (which only happens when the object is destructed,
// and "this" is no longer valid) // and "this" is no longer valid)
if (!result || result == _state) return; if (!result || result == _state) return;
// remove old state // remove old state
delete _state; delete _state;
// replace it with the new implementation // replace it with the new implementation
_state = result; _state = result;
} }
@ -105,10 +111,10 @@ void TcpConnection::onError(Connection *connection, const char *message)
{ {
// current object is going to be removed, wrap it in a unique pointer to enforce that // current object is going to be removed, wrap it in a unique pointer to enforce that
std::unique_ptr<TcpState> ptr(_state); std::unique_ptr<TcpState> ptr(_state);
// object is now in a closed state // object is now in a closed state
_state = new TcpClosed(_state); _state = new TcpClosed(_state);
// tell the implementation to report the error // tell the implementation to report the error
ptr->reportError(message); ptr->reportError(message);
} }
@ -131,10 +137,10 @@ void TcpConnection::onClosed(Connection *connection)
{ {
// current object is going to be removed, wrap it in a unique pointer to enforce that // current object is going to be removed, wrap it in a unique pointer to enforce that
std::unique_ptr<TcpState> ptr(_state); std::unique_ptr<TcpState> ptr(_state);
// object is now in a closed state // object is now in a closed state
_state = new TcpClosed(_state); _state = new TcpClosed(_state);
// tell the implementation to report the error // tell the implementation to report the error
ptr->reportClosed(); ptr->reportClosed();
} }

View File

@ -177,7 +177,7 @@ public:
{ {
// only works if the incoming pipe is readable // only works if the incoming pipe is readable
if (fd != _pipe.in() || !(flags & readable)) return this; if (fd != _pipe.in() || !(flags & readable)) return this;
// do we have a valid socket? // do we have a valid socket?
if (_socket >= 0) return new TcpConnected(_connection, _socket, std::move(_buffer), _handler); if (_socket >= 0) return new TcpConnected(_connection, _socket, std::move(_buffer), _handler);