when a connection was closed, the commands already given to the channels were sometimes by-passed by the closing channel. This has been fixed. Also solved the problem that calls executed on channel objects that fall out of scope were not always completed

This commit is contained in:
Emiel Bruijntjes 2014-08-20 11:47:16 +02:00
parent 170ef1f82a
commit 6997a70cf1
32 changed files with 343 additions and 196 deletions

View File

@ -20,19 +20,23 @@ private:
* The implementation for the channel
* @var ChannelImpl
*/
ChannelImpl _implementation;
std::shared_ptr<ChannelImpl> _implementation;
public:
/**
* Construct a channel object
* @param connection
*/
Channel(Connection *connection) : _implementation(this, connection) {}
Channel(Connection *connection) : _implementation(ChannelImpl::instantiate(this, connection)) {}
/**
* Destructor
*/
virtual ~Channel() {}
virtual ~Channel()
{
// close the channel (this will eventually destruct the channel)
_implementation->close();
}
/**
* Callback that is called when the channel was succesfully created.
@ -45,7 +49,7 @@ public:
void onReady(const SuccessCallback &callback)
{
// store callback in implementation
_implementation._readyCallback = callback;
_implementation->_readyCallback = callback;
}
/**
@ -59,7 +63,7 @@ public:
void onError(const ErrorCallback &callback)
{
// store callback in implementation
_implementation._errorCallback = callback;
_implementation->_errorCallback = callback;
}
/**
@ -72,7 +76,7 @@ public:
*/
Deferred &pause()
{
return _implementation.pause();
return _implementation->pause();
}
/**
@ -85,7 +89,7 @@ public:
*/
Deferred &resume()
{
return _implementation.resume();
return _implementation->resume();
}
/**
@ -94,7 +98,7 @@ public:
*/
bool connected()
{
return _implementation.connected();
return _implementation->connected();
}
/**
@ -105,7 +109,7 @@ public:
*/
Deferred &startTransaction()
{
return _implementation.startTransaction();
return _implementation->startTransaction();
}
/**
@ -116,7 +120,7 @@ public:
*/
Deferred &commitTransaction()
{
return _implementation.commitTransaction();
return _implementation->commitTransaction();
}
/**
@ -127,7 +131,7 @@ public:
*/
Deferred &rollbackTransaction()
{
return _implementation.rollbackTransaction();
return _implementation->rollbackTransaction();
}
/**
@ -149,12 +153,12 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(name, type, flags, arguments); }
Deferred &declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation.declareExchange(name, type, 0, arguments); }
Deferred &declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(name, type, flags, Table()); }
Deferred &declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(std::string(), type, flags, arguments); }
Deferred &declareExchange(ExchangeType type, const Table &arguments) { return _implementation.declareExchange(std::string(), type, 0, arguments); }
Deferred &declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(std::string(), type, flags, Table()); }
Deferred &declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation->declareExchange(name, type, flags, arguments); }
Deferred &declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation->declareExchange(name, type, 0, arguments); }
Deferred &declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation->declareExchange(name, type, flags, Table()); }
Deferred &declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation->declareExchange(std::string(), type, flags, arguments); }
Deferred &declareExchange(ExchangeType type, const Table &arguments) { return _implementation->declareExchange(std::string(), type, 0, arguments); }
Deferred &declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation->declareExchange(std::string(), type, flags, Table()); }
/**
* Remove an exchange
@ -169,7 +173,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &removeExchange(const std::string &name, int flags = 0) { return _implementation.removeExchange(name, flags); }
Deferred &removeExchange(const std::string &name, int flags = 0) { return _implementation->removeExchange(name, flags); }
/**
* Bind two exchanges to each other
@ -182,8 +186,8 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, arguments); }
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey) { return _implementation.bindExchange(source, target, routingkey, Table()); }
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation->bindExchange(source, target, routingkey, arguments); }
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey) { return _implementation->bindExchange(source, target, routingkey, Table()); }
/**
* Unbind two exchanges from one another
@ -196,8 +200,8 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, arguments); }
Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey) { return _implementation.unbindExchange(target, source, routingkey, Table()); }
Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation->unbindExchange(target, source, routingkey, arguments); }
Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey) { return _implementation->unbindExchange(target, source, routingkey, Table()); }
/**
* Declare a queue
@ -228,12 +232,12 @@ public:
*
* });
*/
DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); }
DeferredQueue &declareQueue(const std::string &name, const Table &arguments) { return _implementation.declareQueue(name, 0, arguments); }
DeferredQueue &declareQueue(const std::string &name, int flags = 0) { return _implementation.declareQueue(name, flags, Table()); }
DeferredQueue &declareQueue(int flags, const Table &arguments) { return _implementation.declareQueue(std::string(), flags, arguments); }
DeferredQueue &declareQueue(const Table &arguments) { return _implementation.declareQueue(std::string(), 0, arguments); }
DeferredQueue &declareQueue(int flags = 0) { return _implementation.declareQueue(std::string(), flags, Table()); }
DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation->declareQueue(name, flags, arguments); }
DeferredQueue &declareQueue(const std::string &name, const Table &arguments) { return _implementation->declareQueue(name, 0, arguments); }
DeferredQueue &declareQueue(const std::string &name, int flags = 0) { return _implementation->declareQueue(name, flags, Table()); }
DeferredQueue &declareQueue(int flags, const Table &arguments) { return _implementation->declareQueue(std::string(), flags, arguments); }
DeferredQueue &declareQueue(const Table &arguments) { return _implementation->declareQueue(std::string(), 0, arguments); }
DeferredQueue &declareQueue(int flags = 0) { return _implementation->declareQueue(std::string(), flags, Table()); }
/**
* Bind a queue to an exchange
@ -246,8 +250,8 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, arguments); }
Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.bindQueue(exchange, queue, routingkey, Table()); }
Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation->bindQueue(exchange, queue, routingkey, arguments); }
Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation->bindQueue(exchange, queue, routingkey, Table()); }
/**
* Unbind a queue from an exchange
@ -259,8 +263,8 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); }
Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); }
Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation->unbindQueue(exchange, queue, routingkey, arguments); }
Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation->unbindQueue(exchange, queue, routingkey, Table()); }
/**
* Purge a queue
@ -280,7 +284,7 @@ public:
*
* });
*/
DeferredDelete &purgeQueue(const std::string &name){ return _implementation.purgeQueue(name); }
DeferredDelete &purgeQueue(const std::string &name){ return _implementation->purgeQueue(name); }
/**
* Remove a queue
@ -306,7 +310,7 @@ public:
*
* });
*/
DeferredDelete &removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); }
DeferredDelete &removeQueue(const std::string &name, int flags = 0) { return _implementation->removeQueue(name, flags); }
/**
* Publish a message to an exchange
@ -317,10 +321,10 @@ public:
* @param message the message to send
* @param size size of the message
*/
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, envelope); }
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, Envelope(message)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, Envelope(message, size)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation.publish(exchange, routingKey, Envelope(message, strlen(message))); }
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); }
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); }
/**
* Set the Quality of Service (QOS) for this channel
@ -337,7 +341,7 @@ public:
*/
Deferred &setQos(uint16_t prefetchCount, bool global = false)
{
return _implementation.setQos(prefetchCount, global);
return _implementation->setQos(prefetchCount, global);
}
/**
@ -373,12 +377,12 @@ public:
*
* });
*/
DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { return _implementation.consume(queue, tag, flags, arguments); }
DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags = 0) { return _implementation.consume(queue, tag, flags, Table()); }
DeferredConsumer &consume(const std::string &queue, const std::string &tag, const Table &arguments) { return _implementation.consume(queue, tag, 0, arguments); }
DeferredConsumer &consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); }
DeferredConsumer &consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); }
DeferredConsumer &consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); }
DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { return _implementation->consume(queue, tag, flags, arguments); }
DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags = 0) { return _implementation->consume(queue, tag, flags, Table()); }
DeferredConsumer &consume(const std::string &queue, const std::string &tag, const Table &arguments) { return _implementation->consume(queue, tag, 0, arguments); }
DeferredConsumer &consume(const std::string &queue, int flags, const Table &arguments) { return _implementation->consume(queue, std::string(), flags, arguments); }
DeferredConsumer &consume(const std::string &queue, int flags = 0) { return _implementation->consume(queue, std::string(), flags, Table()); }
DeferredConsumer &consume(const std::string &queue, const Table &arguments) { return _implementation->consume(queue, std::string(), 0, arguments); }
/**
* Cancel a running consume call
@ -400,7 +404,7 @@ public:
*
* });
*/
DeferredCancel &cancel(const std::string &tag) { return _implementation.cancel(tag); }
DeferredCancel &cancel(const std::string &tag) { return _implementation->cancel(tag); }
/**
* Retrieve a single message from RabbitMQ
@ -434,7 +438,7 @@ public:
*
* });
*/
DeferredGet &get(const std::string &queue, int flags = 0) { return _implementation.get(queue, flags); }
DeferredGet &get(const std::string &queue, int flags = 0) { return _implementation->get(queue, flags); }
/**
* Acknoldge a received message
@ -452,7 +456,7 @@ public:
* @param flags optional flags
* @return bool
*/
bool ack(uint64_t deliveryTag, int flags=0) { return _implementation.ack(deliveryTag, flags); }
bool ack(uint64_t deliveryTag, int flags=0) { return _implementation->ack(deliveryTag, flags); }
/**
* Reject or nack a message
@ -470,7 +474,7 @@ public:
* @param flags optional flags
* @return bool
*/
bool reject(uint64_t deliveryTag, int flags=0) { return _implementation.reject(deliveryTag, flags); }
bool reject(uint64_t deliveryTag, int flags=0) { return _implementation->reject(deliveryTag, flags); }
/**
* Recover all messages that were not yet acked
@ -487,7 +491,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &recover(int flags = 0) { return _implementation.recover(flags); }
Deferred &recover(int flags = 0) { return _implementation->recover(flags); }
/**
* Close the current channel
@ -495,7 +499,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &close() { return _implementation.close(); }
Deferred &close() { return _implementation->close(); }
/**
* Get the channel we're working on
@ -503,7 +507,7 @@ public:
*/
const uint16_t id() const
{
return _implementation.id();
return _implementation->id();
}
};

View File

@ -22,7 +22,7 @@ class ConsumedMessage;
/**
* Class definition
*/
class ChannelImpl : public Watchable
class ChannelImpl : public Watchable, public std::enable_shared_from_this<ChannelImpl>
{
private:
/**
@ -91,11 +91,14 @@ private:
*
* We store the data as well as whether they
* should be handled synchronously.
*
* @var std::queue
*/
std::queue<std::pair<bool, OutBuffer>> _queue;
/**
* Are we currently operating in synchronous mode?
* @var bool
*/
bool _synchronous = false;
@ -106,16 +109,11 @@ private:
ConsumedMessage *_message = nullptr;
/**
* Construct a channel object
*
* Note that the constructor is private, and that the Channel class is
* a friend. By doing this we ensure that nobody can instantiate this
* object, and that it can thus only be used inside the library.
*
* @param parent the public channel object
* Constructor to make a shared pointer
* @param parent the publis channel object
* @param connection pointer to the connection
*/
ChannelImpl(Channel *parent, Connection *connection);
static std::shared_ptr<ChannelImpl> instantiate(Channel *parent, Connection *connection);
/**
* Push a deferred result
@ -131,6 +129,18 @@ private:
*/
Deferred &push(const Frame &frame);
protected:
/**
* Construct a channel object
*
* Note that the constructor is private, and that the Channel class is
* a friend. By doing this we ensure that nobody can instantiate this
* object, and that it can thus only be used inside the library.
*
* @param parent the public channel object
* @param connection pointer to the connection
*/
ChannelImpl(Channel *parent, Connection *connection);
public:
/**
@ -475,11 +485,19 @@ public:
bool send(const Frame &frame);
/**
* Signal the channel that a synchronous operation
* was completed. After this operation, waiting
* frames can be sent out.
* Is this channel waiting for an answer before it can send furher instructions
* @return bool
*/
void synchronized();
bool waiting() const
{
return _synchronous;
}
/**
* Signal the channel that a synchronous operation was completed.
* After this operation, waiting frames can be sent out.
*/
void onSynchronized();
/**
* Report to the handler that the channel is opened
@ -493,36 +511,48 @@ public:
if (_readyCallback) _readyCallback();
// if the monitor is still valid, we exit synchronous mode now
if (monitor.valid()) synchronized();
if (monitor.valid()) onSynchronized();
}
/**
* Report to the handler that the channel is closed
*
* Returns whether the channel object is still valid
*
* @return bool
*/
bool reportClosed()
{
// change state
_state = state_closed;
_synchronous = false;
// create a monitor, because the callbacks could destruct the current object
Monitor monitor(this);
// and pass on to the reportSuccess() method which will call the
// appropriate deferred object to report the successful operation
return reportSuccess();
// technically, we should exit synchronous method now
// since the synchronous channel close frame has been
// acknowledged by the server.
//
// but since the channel was just closed, there is no
// real point in doing this, as we cannot send frames
// out anymore.
bool result = reportSuccess();
// leap out if object no longer exists
if (!monitor.valid()) return result;
// all later deferred objects should report an error, because it
// was not possible to complete the instruction as the channel is
// now closed
reportError("Channel has been closed", false);
// done
return result;
}
/**
* Report success
*
* Returns whether the channel object is still valid
*
* @param mixed
* @return bool
*/
template <typename... Arguments>
bool reportSuccess(Arguments ...parameters)
@ -558,54 +588,7 @@ public:
* @param message the error message
* @param notifyhandler should the channel-wide handler also be called?
*/
void reportError(const char *message, bool notifyhandler = true)
{
// change state
_state = state_closed;
// we are going to call callbacks that could destruct the channel
Monitor monitor(this);
// call the oldest
if (_oldestCallback)
{
// copy the callback (so that it can not be destructed during
// the "reportError" call
auto cb = _oldestCallback;
// call the callback
auto *next = cb->reportError(message);
// leap out if channel no longer exists
if (!monitor.valid()) return;
// set the oldest callback
_oldestCallback.reset(next);
}
// clean up all deferred other objects
while (_oldestCallback)
{
// copy the callback (so that it can not be destructed during
// the "reportError" call
auto cb = _oldestCallback;
// call the callback
auto *next = cb->reportError("Channel is in error state");
// leap out if channel no longer exists
if (!monitor.valid()) return;
// set the oldest callback
_oldestCallback.reset(next);
}
// all callbacks have been processed, so we also can reset the pointer to the newest
_newestCallback = nullptr;
// inform handler
if (notifyhandler && _errorCallback) _errorCallback(message);
}
void reportError(const char *message, bool notifyhandler = true);
/**
* Install a consumer callback
@ -657,7 +640,6 @@ public:
* The channel class is its friend, thus can it instantiate this object
*/
friend class Channel;
};
/**

View File

@ -55,7 +55,7 @@ protected:
* All channels that are active
* @var map
*/
std::map<uint16_t, ChannelImpl*> _channels;
std::map<uint16_t, std::shared_ptr<ChannelImpl>> _channels;
/**
* The last unused channel ID
@ -100,6 +100,13 @@ protected:
*/
bool sendClose();
/**
* Is any channel waiting for an answer on a synchronous call?
* @return bool
*/
bool waiting() const;
private:
/**
* Construct an AMQP object based on full login data
@ -210,13 +217,13 @@ public:
* @param channel
* @return uint16_t
*/
uint16_t add(ChannelImpl *channel);
uint16_t add(const std::shared_ptr<ChannelImpl> &channel);
/**
* Remove a channel
* @param channel
*/
void remove(ChannelImpl *channel);
void remove(const ChannelImpl *channel);
/**
* Parse the buffer into a recognized frame
@ -269,7 +276,7 @@ public:
* @param number channel identifier
* @return channel the channel object, or nullptr if not yet created
*/
ChannelImpl *channel(int number)
std::shared_ptr<ChannelImpl> channel(int number)
{
auto iter = _channels.find(number);
return iter == _channels.end() ? nullptr : iter->second;

View File

@ -88,13 +88,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
if (channel->reportSuccess<const std::string&>(consumerTag())) channel->synchronized();
if (channel->reportSuccess<const std::string&>(consumerTag())) channel->onSynchronized();
// done
return true;

View File

@ -88,13 +88,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
if (channel->reportSuccess(consumerTag())) channel->synchronized();
if (channel->reportSuccess(consumerTag())) channel->onSynchronized();
// done
return true;

View File

@ -175,7 +175,7 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;

View File

@ -78,13 +78,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -165,7 +165,7 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;

View File

@ -109,7 +109,7 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;

View File

@ -61,13 +61,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -66,7 +66,7 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;

View File

@ -100,7 +100,7 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;

View File

@ -147,7 +147,7 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// send back an ok frame
connection->send(ChannelCloseOKFrame(this->channel()));

View File

@ -66,13 +66,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report that the channel is closed
if (channel->reportClosed()) channel->synchronized();
if (channel->reportClosed()) channel->onSynchronized();
// done
return true;

View File

@ -87,13 +87,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report success for the call
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -46,6 +46,37 @@
*/
namespace AMQP {
/**
* Derived class with public constructor
*
* We need this because std::make_shared<ChannelImpl> is not possible
*/
struct PublicChannelImpl : public ChannelImpl
{
/**
* Constructor
* @param parent
* @param connection
*/
PublicChannelImpl(Channel *parent, Connection *connection) : ChannelImpl(parent, connection) {}
/**
* Destructor
*/
virtual ~PublicChannelImpl() {}
};
/**
* Constructor to make a shared pointer
* @param parent the publis channel object
* @param connection pointer to the connection
*/
std::shared_ptr<ChannelImpl> ChannelImpl::instantiate(Channel *parent, Connection *connection)
{
// we can only use std::make_shared with a PublicChannelImpl
return std::make_shared<PublicChannelImpl>(parent, connection);
}
/**
* Construct a channel object
* @param parent
@ -57,7 +88,7 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection) :
_connection(&connection->_implementation)
{
// add the channel to the connection
_id = _connection->add(this);
_id = _connection->add(shared_from_this());
// check if the id is valid
if (_id == 0)
@ -87,9 +118,6 @@ ChannelImpl::~ChannelImpl()
// remove this channel from the connection (but not if the connection is already destructed)
if (_connection) _connection->remove(this);
// close the channel now
close();
// destruct deferred results
while (_oldestCallback) _oldestCallback.reset(_oldestCallback->next());
}
@ -195,6 +223,9 @@ Deferred &ChannelImpl::rollbackTransaction()
*/
Deferred &ChannelImpl::close()
{
// this is completely pointless if not connected
if (_state != state_connected) return push(new Deferred(_state == state_closing));
// send a channel close frame
auto &handler = push(ChannelCloseFrame(_id));
@ -643,7 +674,13 @@ Deferred &ChannelImpl::recover(int flags)
bool ChannelImpl::send(const Frame &frame)
{
// skip if channel is not connected
if (_state != state_connected || !_connection) return false;
if (_state == state_closed || !_connection) return false;
// if we're busy closing, we pretend that the send operation was a
// success. this causes the deferred object to be created, and to be
// added to the list of deferred objects. it will be notified about
// the error when the close operation succeeds
if (_state == state_closing) return true;
// are we currently in synchronous mode or are there
// other frames waiting for their turn to be sent?
@ -666,11 +703,10 @@ bool ChannelImpl::send(const Frame &frame)
}
/**
* Signal the channel that a synchronous operation
* was completed. After this operation, waiting
* frames can be sent out.
* Signal the channel that a synchronous operation was completed. After
* this operation, waiting frames can be sent out.
*/
void ChannelImpl::synchronized()
void ChannelImpl::onSynchronized()
{
// we are no longer waiting for synchronous operations
_synchronous = false;
@ -679,7 +715,7 @@ void ChannelImpl::synchronized()
Monitor monitor(this);
// send all frames while not in synchronous mode
while (monitor.valid() && !_synchronous && !_queue.empty())
while (monitor.valid() && _connection && !_synchronous && !_queue.empty())
{
// retrieve the first buffer and synchronous
auto pair = std::move(_queue.front());
@ -707,7 +743,7 @@ void ChannelImpl::reportMessage()
Monitor monitor(this);
// synchronize the channel if this comes from a basic.get frame
if (_message->consumer().empty()) synchronized();
if (_message->consumer().empty()) onSynchronized();
// syncing the channel may destruct the channel
if (!monitor.valid()) return;
@ -729,6 +765,73 @@ void ChannelImpl::reportMessage()
delete _message; _message = nullptr;
}
/**
* Report an error message on a channel
* @param message the error message
* @param notifyhandler should the channel-wide handler also be called?
*/
void ChannelImpl::reportError(const char *message, bool notifyhandler)
{
// change state
_state = state_closed;
_synchronous = false;
// the queue of messages that still have to sent can be emptied now
// (we do this by moving the current queue into an unused variable)
auto queue(std::move(_queue));
// we are going to call callbacks that could destruct the channel
Monitor monitor(this);
// call the oldest
if (_oldestCallback)
{
// copy the callback (so that it can not be destructed during
// the "reportError" call
auto cb = _oldestCallback;
// call the callback
auto *next = cb->reportError(message);
// leap out if channel no longer exists
if (!monitor.valid()) return;
// set the oldest callback
_oldestCallback.reset(next);
}
// clean up all deferred other objects
while (_oldestCallback)
{
// copy the callback (so that it can not be destructed during
// the "reportError" call
auto cb = _oldestCallback;
// call the callback
auto *next = cb->reportError("Channel is in error state");
// leap out if channel no longer exists
if (!monitor.valid()) return;
// set the oldest callback
_oldestCallback.reset(next);
}
// all callbacks have been processed, so we also can reset the pointer to the newest
_newestCallback = nullptr;
// inform handler
if (notifyhandler && _errorCallback) _errorCallback(message);
// leap out if object no longer exists
if (!monitor.valid()) return;
// the connection now longer has to know that this channel exists,
// because the channel ID is no longer in use
if (_connection) _connection->remove(this);
}
/**
* Create an incoming message from a consume call
* @param frame

View File

@ -71,7 +71,7 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;

View File

@ -139,6 +139,15 @@ public:
return _failingMethod;
}
/**
* This frame is part of the shutdown operation
* @return bool
*/
virtual bool partOfShutdown() const override
{
return true;
}
/**
* Process the frame
* @param connection The connection over which it was received

View File

@ -54,7 +54,7 @@ ConnectionImpl::~ConnectionImpl()
* @param channel
* @return uint16_t
*/
uint16_t ConnectionImpl::add(ChannelImpl *channel)
uint16_t ConnectionImpl::add(const std::shared_ptr<ChannelImpl> &channel)
{
// check if we have exceeded the limit already
if (_maxChannels > 0 && _channels.size() >= _maxChannels) return 0;
@ -80,7 +80,7 @@ uint16_t ConnectionImpl::add(ChannelImpl *channel)
* Remove a channel
* @param channel
*/
void ConnectionImpl::remove(ChannelImpl *channel)
void ConnectionImpl::remove(const ChannelImpl *channel)
{
// skip zero channel
if (channel->id() == 0) return;
@ -144,6 +144,13 @@ size_t ConnectionImpl::parse(const Buffer &buffer)
return processed;
}
}
// leap out if the connection object no longer exists
if (!monitor.valid() || !_closed || _state == state_connected) return processed;
// the close() function was called, but if the close frame was not yet sent
// if there are no waiting channels, we can do that right now
if (!waiting()) sendClose();
// done
return processed;
@ -162,8 +169,27 @@ bool ConnectionImpl::close()
// mark that the object is closed
_closed = true;
// after the send operation the object could be dead
Monitor monitor(this);
// number of channels that is waiting for an answer and that has further data
int waiters = 0;
// loop over all channels, and close them
for (auto iter = _channels.begin(); iter != _channels.end(); iter++)
{
// close the channel
iter->second->close();
// we could be dead now
if (!monitor.valid()) return true;
// is this channel waiting for an answer
if (iter->second->waiting()) waiters++;
}
// if still busy with handshake, we delay closing for a while
if (_state == state_handshake || _state == state_protocol) return true;
if (waiters > 0 || _state == state_handshake || _state == state_protocol) return true;
// perform the close operation
sendClose();
@ -182,16 +208,6 @@ bool ConnectionImpl::sendClose()
// after the send operation the object could be dead
Monitor monitor(this);
// loop over all channels
for (auto iter = _channels.begin(); iter != _channels.end(); iter++)
{
// close the channel
iter->second->close();
// we could be dead now
if (!monitor.valid()) return false;
}
// send the close frame
send(ConnectionCloseFrame(0, "shutdown"));
@ -216,7 +232,7 @@ void ConnectionImpl::setConnected()
// if the close method was called before, the frame was not
// sent. append it to the end of the queue to make sure we
// are correctly closed down.
if (_closed && !sendClose()) return;
if (_closed && !waiting() && !sendClose()) return;
// we're going to call the handler, which can destruct the connection,
// so we must monitor if the queue object is still valid after calling
@ -239,6 +255,23 @@ void ConnectionImpl::setConnected()
}
}
/**
* Is any channel waiting for an answer on a synchronous call?
* @return bool
*/
bool ConnectionImpl::waiting() const
{
// loop through the channels
for (auto &iter : _channels)
{
// is this a waiting channel
if (iter.second->waiting()) return true;
}
// no waiting channel found
return false;
}
/**
* Send a frame over the connection
* @param frame The frame to send
@ -249,6 +282,9 @@ bool ConnectionImpl::send(const Frame &frame)
// its not possible to send anything if closed or closing down
if (_state == state_closing || _state == state_closed) return false;
// some frames can be sent _after_ the close() function was called
if (_closed && !frame.partOfShutdown()) return false;
// we need an output buffer
OutBuffer buffer(frame.buffer());

View File

@ -61,13 +61,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -64,13 +64,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report exchange declare ok
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -65,13 +65,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -62,13 +62,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -48,6 +48,12 @@ public:
*/
virtual bool partOfHandshake() const { return false; }
/**
* Is this a frame that is part of the connection close operation?
* @return bool
*/
virtual bool partOfShutdown() const { return false; }
/**
* Does this frame need an end-of-frame seperator?
* @return bool

View File

@ -63,13 +63,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -127,13 +127,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// what if channel doesn't exist?
if (!channel) return false;
// report success
if (channel->reportSuccess(name(), messageCount(), consumerCount())) channel->synchronized();
if (channel->reportSuccess(name(), messageCount(), consumerCount())) channel->onSynchronized();
// done
return true;

View File

@ -88,13 +88,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report queue deletion success
if (channel->reportSuccess(this->messageCount())) channel->synchronized();
if (channel->reportSuccess(this->messageCount())) channel->onSynchronized();
// done
return true;

View File

@ -88,13 +88,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report queue purge success
if (channel->reportSuccess(this->messageCount())) channel->synchronized();
if (channel->reportSuccess(this->messageCount())) channel->onSynchronized();
// done
return true;

View File

@ -67,13 +67,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report queue unbind success
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -68,13 +68,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is open
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -68,13 +68,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is open
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;

View File

@ -68,13 +68,13 @@ public:
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is open
if (channel->reportSuccess()) channel->synchronized();
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;