diff --git a/include/amqpcpp/monitor.h b/include/amqpcpp/monitor.h index efb788c..321b76e 100644 --- a/include/amqpcpp/monitor.h +++ b/include/amqpcpp/monitor.h @@ -64,7 +64,25 @@ public: Monitor(const Monitor &monitor) : _watchable(monitor._watchable) { // register with the watchable - _watchable->add(this); + if (_watchable) _watchable->add(this); + } + + /** + * Assignment operator + * @param monitor + */ + Monitor& operator= (const Monitor &monitor) + { + // remove from watchable + if (_watchable) _watchable->remove(this); + + // replace watchable + _watchable = monitor._watchable; + + // register with the watchable + if (_watchable) _watchable->add(this); + + return *this; } /** diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index 5fc002a..abca375 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -194,7 +194,7 @@ public: * Proceed to the next state * @return TcpState * */ - TcpState *proceed() + TcpState *proceed(const Monitor &monitor) { // do we have a valid socket? if (_socket >= 0) @@ -211,6 +211,9 @@ public: // report error _handler->onError(_connection, _error.data()); + // handler callback might have destroyed connection + if (!monitor.valid()) return nullptr; + // create dummy implementation return new TcpClosed(_connection, _handler); } @@ -229,7 +232,7 @@ public: if (fd != _pipe.in() || !(flags & readable)) return this; // proceed to the next state - return proceed(); + return proceed(monitor); } /** @@ -243,7 +246,7 @@ public: _thread.join(); // proceed to the next state - return proceed(); + return proceed(monitor); } /** @@ -262,4 +265,3 @@ public: * End of namespace */ } -