monitor class moved from include directory to src

This commit is contained in:
Emiel Bruijntjes 2014-01-06 13:34:54 -08:00
parent 42549a7812
commit aba769f5cb
7 changed files with 20 additions and 14 deletions

View File

@ -43,12 +43,6 @@ private:
*/ */
uint16_t _id; uint16_t _id;
/**
* Monitor to check if the connection is still alive
* @var Monitor
*/
Monitor _monitor;
/** /**
* State of the channel object * State of the channel object
* @var enum * @var enum
@ -90,6 +84,15 @@ public:
*/ */
virtual ~ChannelImpl(); virtual ~ChannelImpl();
/**
* Invalidate the channel
* This method is called when the connection is destructed
*/
void invalidate()
{
_connection = nullptr;
}
/** /**
* Pause deliveries on a channel * Pause deliveries on a channel
* *

View File

@ -29,7 +29,6 @@
#include <libamqp/receivedframe.h> #include <libamqp/receivedframe.h>
#include <libamqp/outbuffer.h> #include <libamqp/outbuffer.h>
#include <libamqp/watchable.h> #include <libamqp/watchable.h>
#include <libamqp/monitor.h>
// amqp types // amqp types
#include <libamqp/field.h> #include <libamqp/field.h>

View File

@ -51,8 +51,7 @@ namespace AMQP {
ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler *handler) : ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler *handler) :
_parent(parent), _parent(parent),
_connection(&connection->_implementation), _connection(&connection->_implementation),
_handler(handler), _handler(handler)
_monitor(_connection)
{ {
// add the channel to the connection // add the channel to the connection
_id = _connection->add(this); _id = _connection->add(this);
@ -86,7 +85,7 @@ ChannelImpl::~ChannelImpl()
_message = nullptr; _message = nullptr;
// remove this channel from the connection (but not if the connection is already destructed) // remove this channel from the connection (but not if the connection is already destructed)
if (_monitor.valid()) _connection->remove(this); if (_connection) _connection->remove(this);
// close the channel now // close the channel now
close(); close();
@ -334,7 +333,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
if (!send(BasicHeaderFrame(_id, envelope))) return false; if (!send(BasicHeaderFrame(_id, envelope))) return false;
// channel and connection still valid? // channel and connection still valid?
if (!monitor.valid() || !_monitor.valid()) return false; if (!monitor.valid() || !_connection) return false;
// the max payload size is the max frame size minus the bytes for headers and trailer // the max payload size is the max frame size minus the bytes for headers and trailer
uint32_t maxpayload = _connection->maxPayload(); uint32_t maxpayload = _connection->maxPayload();
@ -444,10 +443,10 @@ bool ChannelImpl::recover(int flags)
bool ChannelImpl::send(const Frame &frame) bool ChannelImpl::send(const Frame &frame)
{ {
// skip if channel is not connected // skip if channel is not connected
if (_state != state_connected) return false; if (_state != state_connected || !_connection) return false;
// send to tcp connection (first check if connection object was not destructed) // send to tcp connection
return _monitor.valid() && _connection->send(frame); return _connection->send(frame);
} }
/** /**

View File

@ -42,6 +42,9 @@ ConnectionImpl::~ConnectionImpl()
{ {
// close the connection in a nice fashion // close the connection in a nice fashion
close(); close();
// invalidate all channels, so they will no longer call methods on this channel object
for (auto iter = _channels.begin(); iter != _channels.end(); iter++) iter->second->invalidate();
} }
/** /**

View File

@ -11,6 +11,7 @@
#include "../libamqp.h" #include "../libamqp.h"
// classes that are very commonly used // classes that are very commonly used
#include "monitor.h"
#include "exception.h" #include "exception.h"
#include "protocolexception.h" #include "protocolexception.h"
#include "frame.h" #include "frame.h"

View File

@ -88,6 +88,7 @@ void MyConnection::onConnected(Network::TcpSocket *socket)
// we declare a queue, an exchange and we publish a message // we declare a queue, an exchange and we publish a message
_channel->declareQueue("my_queue"); _channel->declareQueue("my_queue");
// _channel->declareQueue("my_queue", AMQP::autodelete);
_channel->declareExchange("my_exchange", AMQP::direct); _channel->declareExchange("my_exchange", AMQP::direct);
_channel->bindQueue("my_exchange", "my_queue", "key"); _channel->bindQueue("my_exchange", "my_queue", "key");
} }