diff --git a/include/channelimpl.h b/include/channelimpl.h index 9aaaf3f..ad02d27 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -43,12 +43,6 @@ private: */ uint16_t _id; - /** - * Monitor to check if the connection is still alive - * @var Monitor - */ - Monitor _monitor; - /** * State of the channel object * @var enum @@ -90,6 +84,15 @@ public: */ virtual ~ChannelImpl(); + /** + * Invalidate the channel + * This method is called when the connection is destructed + */ + void invalidate() + { + _connection = nullptr; + } + /** * Pause deliveries on a channel * diff --git a/libamqp.h b/libamqp.h index 83dc696..28c3675 100644 --- a/libamqp.h +++ b/libamqp.h @@ -29,7 +29,6 @@ #include #include #include -#include // amqp types #include diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index aaeee2c..9d0a88f 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -51,8 +51,7 @@ namespace AMQP { ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler *handler) : _parent(parent), _connection(&connection->_implementation), - _handler(handler), - _monitor(_connection) + _handler(handler) { // add the channel to the connection _id = _connection->add(this); @@ -86,7 +85,7 @@ ChannelImpl::~ChannelImpl() _message = nullptr; // remove this channel from the connection (but not if the connection is already destructed) - if (_monitor.valid()) _connection->remove(this); + if (_connection) _connection->remove(this); // close the channel now close(); @@ -334,7 +333,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin if (!send(BasicHeaderFrame(_id, envelope))) return false; // channel and connection still valid? - if (!monitor.valid() || !_monitor.valid()) return false; + if (!monitor.valid() || !_connection) return false; // the max payload size is the max frame size minus the bytes for headers and trailer uint32_t maxpayload = _connection->maxPayload(); @@ -444,10 +443,10 @@ bool ChannelImpl::recover(int flags) bool ChannelImpl::send(const Frame &frame) { // skip if channel is not connected - if (_state != state_connected) return false; + if (_state != state_connected || !_connection) return false; - // send to tcp connection (first check if connection object was not destructed) - return _monitor.valid() && _connection->send(frame); + // send to tcp connection + return _connection->send(frame); } /** diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index 8b99873..e12a431 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -42,6 +42,9 @@ ConnectionImpl::~ConnectionImpl() { // close the connection in a nice fashion close(); + + // invalidate all channels, so they will no longer call methods on this channel object + for (auto iter = _channels.begin(); iter != _channels.end(); iter++) iter->second->invalidate(); } /** diff --git a/src/includes.h b/src/includes.h index 6d3b1b1..c24f5f3 100644 --- a/src/includes.h +++ b/src/includes.h @@ -11,6 +11,7 @@ #include "../libamqp.h" // classes that are very commonly used +#include "monitor.h" #include "exception.h" #include "protocolexception.h" #include "frame.h" diff --git a/include/monitor.h b/src/monitor.h similarity index 100% rename from include/monitor.h rename to src/monitor.h diff --git a/tests/myconnection.cpp b/tests/myconnection.cpp index c57fb56..94eb6f3 100644 --- a/tests/myconnection.cpp +++ b/tests/myconnection.cpp @@ -88,6 +88,7 @@ void MyConnection::onConnected(Network::TcpSocket *socket) // we declare a queue, an exchange and we publish a message _channel->declareQueue("my_queue"); +// _channel->declareQueue("my_queue", AMQP::autodelete); _channel->declareExchange("my_exchange", AMQP::direct); _channel->bindQueue("my_exchange", "my_queue", "key"); }