Removed the nowait option from the public interface, because the deferred would never be called and implemented a queue to wait for synchronous methods to complete before sending the next frame

This commit is contained in:
Martijn Otto 2014-04-29 15:51:33 +02:00
parent e0b709fa63
commit a9570277b7
43 changed files with 524 additions and 237 deletions

View File

@ -174,42 +174,30 @@ public:
/** /**
* Bind two exchanges to each other * Bind two exchanges to each other
* *
* The following flags can be used for the exchange
*
* - nowait do not wait on response
*
* @param source the source exchange * @param source the source exchange
* @param target the target exchange * @param target the target exchange
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags
* @param arguments additional bind arguments * @param arguments additional bind arguments
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
*/ */
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, flags, arguments); } Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, arguments); }
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, 0, arguments); } Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey) { return _implementation.bindExchange(source, target, routingkey, Table()); }
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags = 0) { return _implementation.bindExchange(source, target, routingkey, flags, Table()); }
/** /**
* Unbind two exchanges from one another * Unbind two exchanges from one another
* *
* The following flags can be used for the exchange
*
* - nowait do not wait on response
*
* @param target the target exchange * @param target the target exchange
* @param source the source exchange * @param source the source exchange
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags
* @param arguments additional unbind arguments * @param arguments additional unbind arguments
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
*/ */
Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, flags, arguments); } Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, arguments); }
Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, 0, arguments); } Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey) { return _implementation.unbindExchange(target, source, routingkey, Table()); }
Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags = 0) { return _implementation.unbindExchange(target, source, routingkey, flags, Table()); }
/** /**
* Declare a queue * Declare a queue
@ -250,22 +238,16 @@ public:
/** /**
* Bind a queue to an exchange * Bind a queue to an exchange
* *
* The following flags can be used for the exchange
*
* - nowait do not wait on response
*
* @param exchange the source exchange * @param exchange the source exchange
* @param queue the target queue * @param queue the target queue
* @param routingkey the routing key * @param routingkey the routing key
* @param flags additional flags
* @param arguments additional bind arguments * @param arguments additional bind arguments
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
*/ */
Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); } Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, arguments); }
Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); } Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.bindQueue(exchange, queue, routingkey, Table()); }
Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); }
/** /**
* Unbind a queue from an exchange * Unbind a queue from an exchange
@ -283,12 +265,7 @@ public:
/** /**
* Purge a queue * Purge a queue
* *
* The following flags can be used for the exchange
*
* - nowait do not wait on response
*
* @param name name of the queue * @param name name of the queue
* @param flags additional flags
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
@ -303,7 +280,7 @@ public:
* *
* }); * });
*/ */
DeferredDelete &purgeQueue(const std::string &name, int flags = 0){ return _implementation.purgeQueue(name, flags); } DeferredDelete &purgeQueue(const std::string &name){ return _implementation.purgeQueue(name); }
/** /**
* Remove a queue * Remove a queue
@ -375,11 +352,6 @@ public:
* - nolocal if set, messages published on this channel are not also consumed * - nolocal if set, messages published on this channel are not also consumed
* - noack if set, consumed messages do not have to be acked, this happens automatically * - noack if set, consumed messages do not have to be acked, this happens automatically
* - exclusive request exclusive access, only this consumer can access the queue * - exclusive request exclusive access, only this consumer can access the queue
* - nowait the server does not have to send a response back that consuming is active
*
* The method Deferred::onSuccess() will be called when the
* consumer has started (unless the nowait option was set, in which case
* no confirmation method is called)
* *
* @param queue the queue from which you want to consume * @param queue the queue from which you want to consume
* @param tag a consumer tag that will be associated with this consume operation * @param tag a consumer tag that will be associated with this consume operation
@ -411,16 +383,7 @@ public:
* *
* If you want to stop a running consumer, you can use this method with the consumer tag * If you want to stop a running consumer, you can use this method with the consumer tag
* *
* The following flags are supported:
*
* - nowait the server does not have to send a response back that the consumer has been cancelled
*
* The method Deferred::onSuccess() will be called when the consumer
* was succesfully stopped (unless the nowait option was used, in which case no
* confirmation method is called)
*
* @param tag the consumer tag * @param tag the consumer tag
* @param flags optional additional flags
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
@ -435,7 +398,7 @@ public:
* *
* }); * });
*/ */
DeferredCancel &cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } DeferredCancel &cancel(const std::string &tag) { return _implementation.cancel(tag); }
/** /**
* Acknoldge a received message * Acknoldge a received message

View File

@ -58,18 +58,18 @@ private:
/** /**
* Pointer to the oldest deferred result (the first one that is going * Pointer to the oldest deferred result (the first one that is going
* to be executed) * to be executed)
* *
* @var Deferred * @var Deferred
*/ */
std::unique_ptr<Deferred> _oldestCallback = nullptr; std::unique_ptr<Deferred> _oldestCallback = nullptr;
/** /**
* Pointer to the newest deferred result (the last one to be added). * Pointer to the newest deferred result (the last one to be added).
* *
* @var Deferred * @var Deferred
*/ */
Deferred *_newestCallback = nullptr; Deferred *_newestCallback = nullptr;
/** /**
* The channel number * The channel number
* @var uint16_t * @var uint16_t
@ -86,6 +86,19 @@ private:
state_closed state_closed
} _state = state_connected; } _state = state_connected;
/**
* The frames that still need to be send out
*
* We store the data as well as whether they
* should be handled synchronously.
*/
std::queue<std::pair<bool, OutBuffer>> _queue;
/**
* Are we currently operating in synchronous mode?
*/
bool _synchronous = false;
/** /**
* The message that is now being received * The message that is now being received
* @var ConsumedMessage * @var ConsumedMessage
@ -203,13 +216,12 @@ public:
* @param source exchange which binds to target * @param source exchange which binds to target
* @param target exchange to bind to * @param target exchange to bind to
* @param routingKey routing key * @param routingKey routing key
* @param glags additional flags
* @param arguments additional arguments for binding * @param arguments additional arguments for binding
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
*/ */
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments);
/** /**
* unbind two exchanges * unbind two exchanges
@ -217,13 +229,12 @@ public:
* @param source the source exchange * @param source the source exchange
* @param target the target exchange * @param target the target exchange
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags
* @param arguments additional unbind arguments * @param arguments additional unbind arguments
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
*/ */
Deferred &unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); Deferred &unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments);
/** /**
* remove an exchange * remove an exchange
@ -253,13 +264,12 @@ public:
* @param exchangeName name of the exchange to bind to * @param exchangeName name of the exchange to bind to
* @param queueName name of the queue * @param queueName name of the queue
* @param routingkey routingkey * @param routingkey routingkey
* @param flags additional flags
* @param arguments additional arguments * @param arguments additional arguments
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
*/ */
Deferred &bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments); Deferred &bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments);
/** /**
* Unbind a queue from an exchange * Unbind a queue from an exchange
@ -277,7 +287,6 @@ public:
/** /**
* Purge a queue * Purge a queue
* @param queue queue to purge * @param queue queue to purge
* @param flags additional flags
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
@ -292,7 +301,7 @@ public:
* *
* }); * });
*/ */
DeferredDelete &purgeQueue(const std::string &name, int flags); DeferredDelete &purgeQueue(const std::string &name);
/** /**
* Remove a queue * Remove a queue
@ -363,7 +372,6 @@ public:
/** /**
* Cancel a running consumer * Cancel a running consumer
* @param tag the consumer tag * @param tag the consumer tag
* @param flags optional flags
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
@ -378,7 +386,7 @@ public:
* *
* }); * });
*/ */
DeferredCancel &cancel(const std::string &tag, int flags); DeferredCancel &cancel(const std::string &tag);
/** /**
* Acknowledge a message * Acknowledge a message
@ -429,54 +437,79 @@ public:
*/ */
bool send(const Frame &frame); bool send(const Frame &frame);
/**
* Signal the channel that a synchronous operation
* was completed. After this operation, waiting
* frames can be sent out.
*/
void synchronized();
/** /**
* Report to the handler that the channel is opened * Report to the handler that the channel is opened
*/ */
void reportReady() void reportReady()
{ {
// callbacks could destroy us, so monitor it
Monitor monitor(this);
// inform handler // inform handler
if (_readyCallback) _readyCallback(); if (_readyCallback) _readyCallback();
// if the monitor is still valid, we exit synchronous mode now
if (monitor.valid()) synchronized();
} }
/** /**
* Report to the handler that the channel is closed * Report to the handler that the channel is closed
*
* Returns whether the channel object is still valid
*/ */
void reportClosed() bool reportClosed()
{ {
// change state // change state
_state = state_closed; _state = state_closed;
// and pass on to the reportSuccess() method which will call the // and pass on to the reportSuccess() method which will call the
// appropriate deferred object to report the successful operation // appropriate deferred object to report the successful operation
reportSuccess(); return reportSuccess();
// technically, we should exit synchronous method now
// since the synchronous channel close frame has been
// acknowledged by the server.
//
// but since the channel was just closed, there is no
// real point in doing this, as we cannot send frames
// out anymore.
} }
/** /**
* Report success * Report success
* *
* This function is called to report success for all * Returns whether the channel object is still valid
* cases where the callback does not receive any parameters
*/ */
template <typename... Arguments> template <typename... Arguments>
void reportSuccess(Arguments ...parameters) bool reportSuccess(Arguments ...parameters)
{ {
// skip if there is no oldest callback // skip if there is no oldest callback
if (!_oldestCallback) return; if (!_oldestCallback) return true;
// we are going to call callbacks that could destruct the channel // we are going to call callbacks that could destruct the channel
Monitor monitor(this); Monitor monitor(this);
// call the callback // call the callback
auto *next = _oldestCallback->reportSuccess(std::forward<Arguments>(parameters)...); auto *next = _oldestCallback->reportSuccess(std::forward<Arguments>(parameters)...);
// leap out if channel no longer exists // leap out if channel no longer exists
if (!monitor.valid()) return; if (!monitor.valid()) return false;
// set the oldest callback // set the oldest callback
_oldestCallback.reset(next); _oldestCallback.reset(next);
// if there was no next callback, the newest callback was just used // if there was no next callback, the newest callback was just used
if (!next) _newestCallback = nullptr; if (!next) _newestCallback = nullptr;
// we are still valid
return true;
} }
/** /**
@ -497,30 +530,30 @@ public:
{ {
// call the callback // call the callback
auto *next = _oldestCallback->reportError(message); auto *next = _oldestCallback->reportError(message);
// leap out if channel no longer exists // leap out if channel no longer exists
if (!monitor.valid()) return; if (!monitor.valid()) return;
// set the oldest callback // set the oldest callback
_oldestCallback.reset(next); _oldestCallback.reset(next);
} }
// clean up all deferred other objects // clean up all deferred other objects
while (_oldestCallback) while (_oldestCallback)
{ {
// call the callback // call the callback
auto *next = _oldestCallback->reportError("Channel is in error state"); auto *next = _oldestCallback->reportError("Channel is in error state");
// leap out if channel no longer exists // leap out if channel no longer exists
if (!monitor.valid()) return; if (!monitor.valid()) return;
// set the oldest callback // set the oldest callback
_oldestCallback.reset(next); _oldestCallback.reset(next);
} }
// all callbacks have been processed, so we also can reset the pointer to the newest // all callbacks have been processed, so we also can reset the pointer to the newest
_newestCallback = nullptr; _newestCallback = nullptr;
// inform handler // inform handler
if (notifyhandler && _errorCallback) _errorCallback(message); if (notifyhandler && _errorCallback) _errorCallback(message);
} }
@ -534,7 +567,7 @@ public:
{ {
// install the callback if it is assigned // install the callback if it is assigned
if (callback) _consumers[consumertag] = callback; if (callback) _consumers[consumertag] = callback;
// otherwise we erase the previously set callback // otherwise we erase the previously set callback
else _consumers.erase(consumertag); else _consumers.erase(consumertag);
} }

View File

@ -253,6 +253,13 @@ public:
*/ */
bool send(const Frame &frame); bool send(const Frame &frame);
/**
* Send buffered data over the connection
*
* @param buffer the buffer with data to send
*/
bool send(OutBuffer &&buffer);
/** /**
* Get a channel by its identifier * Get a channel by its identifier
* *

View File

@ -26,7 +26,6 @@ extern const int global;
extern const int nolocal; extern const int nolocal;
extern const int noack; extern const int noack;
extern const int exclusive; extern const int exclusive;
extern const int nowait;
extern const int mandatory; extern const int mandatory;
extern const int immediate; extern const int immediate;
extern const int redelivered; extern const int redelivered;

View File

@ -24,7 +24,7 @@ private:
* @var char* * @var char*
*/ */
char *_buffer; char *_buffer;
/** /**
* Pointer to the buffer to be filled * Pointer to the buffer to be filled
* @var char* * @var char*
@ -36,13 +36,13 @@ private:
* @var size_t * @var size_t
*/ */
size_t _size; size_t _size;
/** /**
* The total capacity of the out buffer * The total capacity of the out buffer
* @var size_t * @var size_t
*/ */
size_t _capacity; size_t _capacity;
public: public:
/** /**
@ -56,7 +56,7 @@ public:
_capacity = capacity; _capacity = capacity;
_buffer = _current = new char[capacity]; _buffer = _current = new char[capacity];
} }
/** /**
* Copy constructor * Copy constructor
* @param that * @param that
@ -68,11 +68,11 @@ public:
_capacity = that._capacity; _capacity = that._capacity;
_buffer = new char[_capacity]; _buffer = new char[_capacity];
_current = _buffer + _size; _current = _buffer + _size;
// copy memory // copy memory
memcpy(_buffer, that._buffer, _size); memcpy(_buffer, that._buffer, _size);
} }
/** /**
* Move constructor * Move constructor
* @param that * @param that
@ -84,7 +84,7 @@ public:
_capacity = that._capacity; _capacity = that._capacity;
_buffer = that._buffer; _buffer = that._buffer;
_current = that._current; _current = that._current;
// reset the other object // reset the other object
that._size = 0; that._size = 0;
that._capacity = 0; that._capacity = 0;
@ -95,7 +95,7 @@ public:
/** /**
* Destructor * Destructor
*/ */
virtual ~OutBuffer() virtual ~OutBuffer()
{ {
if (_buffer) delete[] _buffer; if (_buffer) delete[] _buffer;
} }
@ -104,7 +104,7 @@ public:
* Get access to the internal buffer * Get access to the internal buffer
* @return const char* * @return const char*
*/ */
const char *data() const char *data() const
{ {
return _buffer; return _buffer;
} }
@ -113,7 +113,7 @@ public:
* Current size of the output buffer * Current size of the output buffer
* @return size_t * @return size_t
*/ */
size_t size() size_t size() const
{ {
return _size; return _size;
} }

View File

@ -1,6 +1,6 @@
/** /**
* Class describing a basic acknowledgement frame * Class describing a basic acknowledgement frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -38,10 +38,10 @@ protected:
{ {
// call base // call base
BasicFrame::fill(buffer); BasicFrame::fill(buffer);
// add the delivery tag // add the delivery tag
buffer.add(_deliveryTag); buffer.add(_deliveryTag);
// add the booleans // add the booleans
_multiple.fill(buffer); _multiple.fill(buffer);
} }
@ -54,25 +54,36 @@ public:
* @param deliveryTag server-assigned and channel specific delivery tag * @param deliveryTag server-assigned and channel specific delivery tag
* @param multiple acknowledge mutiple messages * @param multiple acknowledge mutiple messages
*/ */
BasicAckFrame(uint16_t channel, uint64_t deliveryTag, bool multiple = false) : BasicAckFrame(uint16_t channel, uint64_t deliveryTag, bool multiple = false) :
BasicFrame(channel, 9), BasicFrame(channel, 9),
_deliveryTag(deliveryTag), _deliveryTag(deliveryTag),
_multiple(multiple) {} _multiple(multiple) {}
/** /**
* Construct based on received frame * Construct based on received frame
* @param frame * @param frame
*/ */
BasicAckFrame(ReceivedFrame &frame) : BasicAckFrame(ReceivedFrame &frame) :
BasicFrame(frame), BasicFrame(frame),
_deliveryTag(frame.nextUint64()), _deliveryTag(frame.nextUint64()),
_multiple(frame) {} _multiple(frame) {}
/** /**
* Destructor * Destructor
*/ */
virtual ~BasicAckFrame() {} virtual ~BasicAckFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
virtual bool synchronous() const override
{
return false;
}
/** /**
* Return the method ID * Return the method ID
* @return uint16_t * @return uint16_t

View File

@ -67,7 +67,19 @@ public:
* Destructor * Destructor
*/ */
virtual ~BasicCancelFrame() {} virtual ~BasicCancelFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
// we are synchronous when the nowait option is not used
return !noWait();
}
/** /**
* Return the consumertag, which is specified by the client or provided by the server * Return the consumertag, which is specified by the client or provided by the server
* @return string * @return string
@ -90,7 +102,7 @@ public:
* Return whether to wait for a response * Return whether to wait for a response
* @return boolean * @return boolean
*/ */
const bool noWait() const bool noWait() const
{ {
return _noWait.get(0); return _noWait.get(0);
} }

View File

@ -94,7 +94,7 @@ public:
if (!channel) return false; if (!channel) return false;
// report // report
channel->reportSuccess<const std::string&>(consumerTag()); if (channel->reportSuccess<const std::string&>(consumerTag())) channel->synchronized();
// done // done
return true; return true;

View File

@ -111,6 +111,18 @@ public:
*/ */
virtual ~BasicConsumeFrame() {} virtual ~BasicConsumeFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
// we are synchronous when the nowait option is not set
return !noWait();
}
/** /**
* Return the method ID * Return the method ID
* @return uint16_t * @return uint16_t

View File

@ -94,7 +94,7 @@ public:
if (!channel) return false; if (!channel) return false;
// report // report
channel->reportSuccess(consumerTag()); if (channel->reportSuccess(consumerTag())) channel->synchronized();
// done // done
return true; return true;

View File

@ -120,6 +120,17 @@ public:
return _routingKey; return _routingKey;
} }
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
virtual bool synchronous() const override
{
return false;
}
/** /**
* Return the method ID * Return the method ID
* @return uint16_t * @return uint16_t

View File

@ -94,6 +94,17 @@ public:
*/ */
virtual ~BasicPublishFrame() {} virtual ~BasicPublishFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
return false;
}
/** /**
* Return the name of the exchange to publish to * Return the name of the exchange to publish to
* @return string * @return string

View File

@ -67,7 +67,7 @@ public:
if (!channel) return false; if (!channel) return false;
// report // report
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;

View File

@ -62,6 +62,17 @@ public:
*/ */
virtual ~BasicRecoverAsyncFrame() {} virtual ~BasicRecoverAsyncFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
virtual bool synchronous() const override
{
return false;
}
/** /**
* Return the method ID * Return the method ID
* @return uint16_t * @return uint16_t

View File

@ -62,6 +62,17 @@ public:
*/ */
virtual ~BasicRecoverFrame() {} virtual ~BasicRecoverFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
return false;
}
/** /**
* Return the method ID * Return the method ID
* @return uint16_t * @return uint16_t

View File

@ -73,6 +73,17 @@ public:
*/ */
virtual ~BasicRejectFrame() {} virtual ~BasicRejectFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
return false;
}
/** /**
* Return the method ID * Return the method ID
* @return uint16_t * @return uint16_t

View File

@ -92,6 +92,17 @@ public:
*/ */
virtual ~BasicReturnFrame() {} virtual ~BasicReturnFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
virtual bool synchronous() const override
{
return false;
}
/** /**
* Return the name of the exchange to publish to * Return the name of the exchange to publish to
* @return string * @return string

View File

@ -1,6 +1,6 @@
/** /**
* Class describing a channel close acknowledgement frame * Class describing a channel close acknowledgement frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -67,13 +67,13 @@ public:
{ {
// we need the appropriate channel // we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if (!channel) return false; if (!channel) return false;
// report that the channel is closed // report that the channel is closed
channel->reportClosed(); if (channel->reportClosed()) channel->synchronized();
// done // done
return true; return true;
} }

View File

@ -93,7 +93,7 @@ public:
if (!channel) return false; if (!channel) return false;
// report success for the call // report success for the call
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;

View File

@ -85,7 +85,7 @@ ChannelImpl::~ChannelImpl()
// close the channel now // close the channel now
close(); close();
// destruct deferred results // destruct deferred results
while (_oldestCallback) _oldestCallback.reset(_oldestCallback->next()); while (_oldestCallback) _oldestCallback.reset(_oldestCallback->next());
} }
@ -98,13 +98,13 @@ Deferred &ChannelImpl::push(Deferred *deferred)
{ {
// do we already have an oldest? // do we already have an oldest?
if (!_oldestCallback) _oldestCallback.reset(deferred); if (!_oldestCallback) _oldestCallback.reset(deferred);
// do we already have a newest? // do we already have a newest?
if (_newestCallback) _newestCallback->add(deferred); if (_newestCallback) _newestCallback->add(deferred);
// store newest callback // store newest callback
_newestCallback = deferred; _newestCallback = deferred;
// done // done
return *deferred; return *deferred;
} }
@ -222,7 +222,7 @@ Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType typ
if (type == ExchangeType::headers)exchangeType = "headers"; if (type == ExchangeType::headers)exchangeType = "headers";
// send declare exchange frame // send declare exchange frame
return push(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments)); return push(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, false, arguments));
} }
/** /**
@ -231,16 +231,15 @@ Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType typ
* @param source exchange which binds to target * @param source exchange which binds to target
* @param target exchange to bind to * @param target exchange to bind to
* @param routingKey routing key * @param routingKey routing key
* @param flags additional flags
* @param arguments additional arguments for binding * @param arguments additional arguments for binding
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
*/ */
Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments)
{ {
// send exchange bind frame // send exchange bind frame
return push(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments)); return push(ExchangeBindFrame(_id, target, source, routingkey, false, arguments));
} }
/** /**
@ -249,16 +248,15 @@ Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string
* @param source the source exchange * @param source the source exchange
* @param target the target exchange * @param target the target exchange
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags
* @param arguments additional unbind arguments * @param arguments additional unbind arguments
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
*/ */
Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments)
{ {
// send exchange unbind frame // send exchange unbind frame
return push(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments)); return push(ExchangeUnbindFrame(_id, target, source, routingkey, false, arguments));
} }
/** /**
@ -273,7 +271,7 @@ Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::stri
Deferred &ChannelImpl::removeExchange(const std::string &name, int flags) Deferred &ChannelImpl::removeExchange(const std::string &name, int flags)
{ {
// send delete exchange frame // send delete exchange frame
return push(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait)); return push(ExchangeDeleteFrame(_id, name, flags & ifunused, false));
} }
/** /**
@ -288,14 +286,14 @@ Deferred &ChannelImpl::removeExchange(const std::string &name, int flags)
DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments) DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments)
{ {
// the frame to send // the frame to send
QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments); QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, false, arguments);
// send the queuedeclareframe // send the queuedeclareframe
auto *result = new DeferredQueue(send(frame)); auto *result = new DeferredQueue(send(frame));
// add the deferred result // add the deferred result
push(result); push(result);
// done // done
return *result; return *result;
} }
@ -306,16 +304,15 @@ DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, con
* @param exchangeName name of the exchange to bind to * @param exchangeName name of the exchange to bind to
* @param queueName name of the queue * @param queueName name of the queue
* @param routingkey routingkey * @param routingkey routingkey
* @param flags additional flags
* @param arguments additional arguments * @param arguments additional arguments
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
*/ */
Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments) Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments)
{ {
// send the bind queue frame // send the bind queue frame
return push(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments)); return push(QueueBindFrame(_id, queueName, exchangeName, routingkey, false, arguments));
} }
/** /**
@ -338,7 +335,6 @@ Deferred &ChannelImpl::unbindQueue(const std::string &exchange, const std::strin
/** /**
* Purge a queue * Purge a queue
* @param queue queue to purge * @param queue queue to purge
* @param flags additional flags
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
@ -353,17 +349,17 @@ Deferred &ChannelImpl::unbindQueue(const std::string &exchange, const std::strin
* *
* }); * });
*/ */
DeferredDelete &ChannelImpl::purgeQueue(const std::string &name, int flags) DeferredDelete &ChannelImpl::purgeQueue(const std::string &name)
{ {
// the frame to send // the frame to send
QueuePurgeFrame frame(_id, name, flags & nowait); QueuePurgeFrame frame(_id, name, false);
// send the frame, and create deferred object // send the frame, and create deferred object
auto *deferred = new DeferredDelete(send(frame)); auto *deferred = new DeferredDelete(send(frame));
// push to list // push to list
push(deferred); push(deferred);
// done // done
return *deferred; return *deferred;
} }
@ -389,14 +385,14 @@ DeferredDelete &ChannelImpl::purgeQueue(const std::string &name, int flags)
DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags)
{ {
// the frame to send // the frame to send
QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, flags & nowait); QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, false);
// send the frame, and create deferred object // send the frame, and create deferred object
auto *deferred = new DeferredDelete(send(frame)); auto *deferred = new DeferredDelete(send(frame));
// push to list // push to list
push(deferred); push(deferred);
// done // done
return *deferred; return *deferred;
} }
@ -495,14 +491,14 @@ Deferred &ChannelImpl::setQos(uint16_t prefetchCount)
DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments)
{ {
// the frame to send // the frame to send
BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments); BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, false, arguments);
// send the frame, and create deferred object // send the frame, and create deferred object
auto *deferred = new DeferredConsumer(this, send(frame)); auto *deferred = new DeferredConsumer(this, send(frame));
// push to list // push to list
push(deferred); push(deferred);
// done // done
return *deferred; return *deferred;
} }
@ -510,7 +506,6 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri
/** /**
* Cancel a running consumer * Cancel a running consumer
* @param tag the consumer tag * @param tag the consumer tag
* @param flags optional flags
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
@ -525,17 +520,17 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri
* *
* }); * });
*/ */
DeferredCancel &ChannelImpl::cancel(const std::string &tag, int flags) DeferredCancel &ChannelImpl::cancel(const std::string &tag)
{ {
// the cancel frame to send // the cancel frame to send
BasicCancelFrame frame(_id, tag, flags & nowait); BasicCancelFrame frame(_id, tag, false);
// send the frame, and create deferred object // send the frame, and create deferred object
auto *deferred = new DeferredCancel(this, send(frame)); auto *deferred = new DeferredCancel(this, send(frame));
// push to list // push to list
push(deferred); push(deferred);
// done // done
return *deferred; return *deferred;
} }
@ -587,10 +582,56 @@ bool ChannelImpl::send(const Frame &frame)
// skip if channel is not connected // skip if channel is not connected
if (_state != state_connected || !_connection) return false; if (_state != state_connected || !_connection) return false;
// are we currently in synchronous mode or are there
// other frames waiting for their turn to be sent?
if (_synchronous || !_queue.empty())
{
// we need to wait until the synchronous frame has
// been processed, so queue the frame until it was
_queue.emplace(frame.synchronous(), frame.buffer());
// it was of course not actually sent but we pretend
// that it was, because no error occured
return true;
}
// enter synchronous mode if necessary
_synchronous = frame.synchronous();
// send to tcp connection // send to tcp connection
return _connection->send(frame); return _connection->send(frame);
} }
/**
* Signal the channel that a synchronous operation
* was completed. After this operation, waiting
* frames can be sent out.
*/
void ChannelImpl::synchronized()
{
// we are no longer waiting for synchronous operations
_synchronous = false;
// we need to monitor the channel for validity
Monitor monitor(this);
// send all frames while not in synchronous mode
while (monitor.valid() && !_synchronous && !_queue.empty())
{
// retrieve the first buffer and synchronous
auto pair = std::move(_queue.front());
// remove from the list
_queue.pop();
// mark as synchronous if necessary
_synchronous = pair.first;
// send it over the connection
_connection->send(std::move(pair.second));
}
}
/** /**
* Report the received message * Report the received message
*/ */
@ -602,16 +643,16 @@ void ChannelImpl::reportMessage()
// look for the consumer // look for the consumer
auto iter = _consumers.find(_message->consumer()); auto iter = _consumers.find(_message->consumer());
if (iter == _consumers.end()) return; if (iter == _consumers.end()) return;
// is this a valid callback method // is this a valid callback method
if (!iter->second) return; if (!iter->second) return;
// after the report the channel may be destructed, monitor that // after the report the channel may be destructed, monitor that
Monitor monitor(this); Monitor monitor(this);
// call the callback // call the callback
_message->report(iter->second); _message->report(iter->second);
// skip if channel was destructed // skip if channel was destructed
if (!monitor.valid()) return; if (!monitor.valid()) return;
@ -628,7 +669,7 @@ ConsumedMessage *ChannelImpl::message(const BasicDeliverFrame &frame)
{ {
// destruct if message is already set // destruct if message is already set
if (_message) delete _message; if (_message) delete _message;
// construct a message // construct a message
return _message = new ConsumedMessage(frame); return _message = new ConsumedMessage(frame);
} }

View File

@ -213,9 +213,9 @@ void ConnectionImpl::setConnected()
// store connected state // store connected state
_state = state_connected; _state = state_connected;
// if the close operation was already called, we do that again now again // if the close method was called before, the frame was not
// so that the actual messages to close down the connection and the channel // sent. append it to the end of the queue to make sure we
// are appended to the queue // are correctly closed down.
if (_closed && !sendClose()) return; if (_closed && !sendClose()) return;
// we're going to call the handler, which can destruct the connection, // we're going to call the handler, which can destruct the connection,
@ -225,11 +225,8 @@ void ConnectionImpl::setConnected()
// inform handler // inform handler
_handler->onConnected(_parent); _handler->onConnected(_parent);
// leap out if the connection no longer exists
if (!monitor.valid()) return;
// empty the queue of messages // empty the queue of messages
while (!_queue.empty()) while (monitor.valid() && !_queue.empty())
{ {
// get the next message // get the next message
OutBuffer buffer(std::move(_queue.front())); OutBuffer buffer(std::move(_queue.front()));
@ -239,9 +236,6 @@ void ConnectionImpl::setConnected()
// send it // send it
_handler->onData(_parent, buffer.data(), buffer.size()); _handler->onData(_parent, buffer.data(), buffer.size());
// leap out if the connection was destructed
if (!monitor.valid()) return;
} }
} }
@ -256,16 +250,10 @@ bool ConnectionImpl::send(const Frame &frame)
if (_state == state_closing || _state == state_closed) return false; if (_state == state_closing || _state == state_closed) return false;
// we need an output buffer // we need an output buffer
OutBuffer buffer(frame.totalSize()); OutBuffer buffer(frame.buffer());
// fill the buffer
frame.fill(buffer);
// append an end of frame byte (but not when still negotiating the protocol)
if (frame.needsSeparator()) buffer.add((uint8_t)206);
// are we still setting up the connection? // are we still setting up the connection?
if ((_state == state_connected && _queue.size() == 0) || frame.partOfHandshake()) if ((_state == state_connected && _queue.empty()) || frame.partOfHandshake())
{ {
// send the buffer // send the buffer
_handler->onData(_parent, buffer.data(), buffer.size()); _handler->onData(_parent, buffer.data(), buffer.size());
@ -280,6 +268,32 @@ bool ConnectionImpl::send(const Frame &frame)
return true; return true;
} }
/**
* Send buffered data over the connection
*
* @param buffer the buffer with data to send
*/
bool ConnectionImpl::send(OutBuffer &&buffer)
{
// this only works when we are already connected
if (_state != state_connected) return false;
// are we waiting for other frames to be sent before us?
if (_queue.empty())
{
// send it directly
_handler->onData(_parent, buffer.data(), buffer.size());
}
else
{
// add to the list of waiting buffers
_queue.push(std::move(buffer));
}
// done
return true;
}
/** /**
* End of namspace * End of namspace
*/ */

View File

@ -1,6 +1,6 @@
/** /**
* Exchangebindframe.h * Exchangebindframe.h
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -26,25 +26,25 @@ private:
* @var ShortString * @var ShortString
*/ */
ShortString _destination; ShortString _destination;
/** /**
* Exchange which is bound * Exchange which is bound
* @var ShortString * @var ShortString
*/ */
ShortString _source; ShortString _source;
/** /**
* Routing key * Routing key
* @var ShortString * @var ShortString
*/ */
ShortString _routingKey; ShortString _routingKey;
/** /**
* contains: nowait do not wait on response * contains: nowait do not wait on response
* @var booleanset * @var booleanset
*/ */
BooleanSet _bools; BooleanSet _bools;
/** /**
* Additional arguments * Additional arguments
* @var Table * @var Table
@ -61,7 +61,7 @@ protected:
{ {
// call base // call base
ExchangeFrame::fill(buffer); ExchangeFrame::fill(buffer);
buffer.add(_reserved); buffer.add(_reserved);
_destination.fill(buffer); _destination.fill(buffer);
_source.fill(buffer); _source.fill(buffer);
@ -70,13 +70,13 @@ protected:
_arguments.fill(buffer); _arguments.fill(buffer);
} }
public: public:
/** /**
* Constructor based on incoming data * Constructor based on incoming data
* *
* @param frame received frame to decode * @param frame received frame to decode
*/ */
ExchangeBindFrame(ReceivedFrame &frame) : ExchangeBindFrame(ReceivedFrame &frame) :
ExchangeFrame(frame), ExchangeFrame(frame),
_reserved(frame.nextUint16()), _reserved(frame.nextUint16()),
_destination(frame), _destination(frame),
@ -102,8 +102,19 @@ public:
_bools(noWait), _bools(noWait),
_arguments(arguments) _arguments(arguments)
{} {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
// we are synchronous when the nowait option has not been set
return !noWait();
}
/** /**
* Get the destination exchange * Get the destination exchange
* @return string * @return string
@ -112,7 +123,7 @@ public:
{ {
return _destination; return _destination;
} }
/** /**
* Get the source exchange * Get the source exchange
* @return string * @return string
@ -121,7 +132,7 @@ public:
{ {
return _source; return _source;
} }
/** /**
* Get the routing key * Get the routing key
* @return string * @return string
@ -130,7 +141,7 @@ public:
{ {
return _routingKey; return _routingKey;
} }
/** /**
* Get the method id * Get the method id
* @return uint16_t * @return uint16_t
@ -139,7 +150,7 @@ public:
{ {
return 30; return 30;
} }
/** /**
* Get the additional arguments * Get the additional arguments
* @return Table * @return Table
@ -148,12 +159,12 @@ public:
{ {
return _arguments; return _arguments;
} }
/** /**
* Get the nowait bool * Get the nowait bool
* @return bool * @return bool
*/ */
bool noWait() bool noWait() const
{ {
return _bools.get(0); return _bools.get(0);
} }

View File

@ -67,7 +67,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report to handler // report to handler
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;

View File

@ -106,6 +106,18 @@ public:
*/ */
virtual ~ExchangeDeclareFrame() {} virtual ~ExchangeDeclareFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
// we are synchronous without the nowait option
return !noWait();
}
/** /**
* Method id * Method id
* @return uint16_t * @return uint16_t

View File

@ -70,7 +70,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report exchange declare ok // report exchange declare ok
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;

View File

@ -83,6 +83,18 @@ public:
*/ */
virtual ~ExchangeDeleteFrame() {} virtual ~ExchangeDeleteFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
// we are synchronous without the nowait option
return !noWait();
}
/** /**
* returns the method id * returns the method id
* @return uint16_t * @return uint16_t

View File

@ -71,7 +71,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report to handler // report to handler
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;

View File

@ -8,7 +8,7 @@
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/** /**
* Class definition * Class definition
*/ */
@ -26,31 +26,31 @@ private:
* @var ShortString * @var ShortString
*/ */
ShortString _destination; ShortString _destination;
/** /**
* Exchange which is bound * Exchange which is bound
* @var ShortString * @var ShortString
*/ */
ShortString _source; ShortString _source;
/** /**
* Routing key * Routing key
* @var ShortString * @var ShortString
*/ */
ShortString _routingKey; ShortString _routingKey;
/** /**
* contains: nowait do not wait on response * contains: nowait do not wait on response
* @var booleanset * @var booleanset
*/ */
BooleanSet _bools; BooleanSet _bools;
/** /**
* Additional arguments * Additional arguments
* @var Table * @var Table
*/ */
Table _arguments; Table _arguments;
protected: protected:
/** /**
* Encode a frame on a string buffer * Encode a frame on a string buffer
@ -61,7 +61,7 @@ protected:
{ {
// call base // call base
ExchangeFrame::fill(buffer); ExchangeFrame::fill(buffer);
buffer.add(_reserved); buffer.add(_reserved);
_destination.fill(buffer); _destination.fill(buffer);
_source.fill(buffer); _source.fill(buffer);
@ -73,10 +73,10 @@ protected:
public: public:
/** /**
* Constructor based on incoming data * Constructor based on incoming data
* *
* @param frame received frame to decode * @param frame received frame to decode
*/ */
ExchangeUnbindFrame(ReceivedFrame &frame) : ExchangeUnbindFrame(ReceivedFrame &frame) :
ExchangeFrame(frame), ExchangeFrame(frame),
_reserved(frame.nextUint16()), _reserved(frame.nextUint16()),
_destination(frame), _destination(frame),
@ -102,8 +102,19 @@ public:
_bools(noWait), _bools(noWait),
_arguments(arguments) _arguments(arguments)
{} {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
// we are synchronous without the nowait option
return !noWait();
}
/** /**
* Get the destination exchange * Get the destination exchange
* @return string * @return string
@ -112,7 +123,7 @@ public:
{ {
return _destination; return _destination;
} }
/** /**
* Get the source exchange * Get the source exchange
* @return string * @return string
@ -121,7 +132,7 @@ public:
{ {
return _source; return _source;
} }
/** /**
* Get the routing key * Get the routing key
* @return string * @return string
@ -130,7 +141,7 @@ public:
{ {
return _routingKey; return _routingKey;
} }
/** /**
* Get the method id * Get the method id
* @return uint16_t * @return uint16_t
@ -139,7 +150,7 @@ public:
{ {
return 40; return 40;
} }
/** /**
* Get the additional arguments * Get the additional arguments
* @return Table * @return Table
@ -148,16 +159,16 @@ public:
{ {
return _arguments; return _arguments;
} }
/** /**
* Get the nowait bool * Get the nowait bool
* @return bool * @return bool
*/ */
bool noWait() bool noWait() const
{ {
return _bools.get(0); return _bools.get(0);
} }
}; };
// leave namespace // leave namespace
} }

View File

@ -68,7 +68,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report to handler // report to handler
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;

View File

@ -1,9 +1,9 @@
/** /**
* Frame.h * Frame.h
* *
* Base class for frames. This base class can not be constructed from outside * Base class for frames. This base class can not be constructed from outside
* the library, and is only used internally. * the library, and is only used internally.
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -11,7 +11,7 @@
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/** /**
* Class definition * Class definition
*/ */
@ -19,11 +19,11 @@ class Frame
{ {
protected: protected:
/** /**
* Protected constructor to ensure that no objects are created from * Protected constructor to ensure that no objects are created from
* outside the library * outside the library
*/ */
Frame() {} Frame() {}
public: public:
/** /**
* Destructor * Destructor
@ -35,40 +35,67 @@ public:
* @return uint32_t * @return uint32_t
*/ */
virtual uint32_t totalSize() const = 0; virtual uint32_t totalSize() const = 0;
/** /**
* Fill an output buffer * Fill an output buffer
* @param buffer * @param buffer
*/ */
virtual void fill(OutBuffer &buffer) const = 0; virtual void fill(OutBuffer &buffer) const = 0;
/** /**
* Is this a frame that is part of the connection setup? * Is this a frame that is part of the connection setup?
* @return bool * @return bool
*/ */
virtual bool partOfHandshake() const { return false; } virtual bool partOfHandshake() const { return false; }
/** /**
* Does this frame need an end-of-frame seperator? * Does this frame need an end-of-frame seperator?
* @return bool * @return bool
*/ */
virtual bool needsSeparator() const { return true; } virtual bool needsSeparator() const { return true; }
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
virtual bool synchronous() const { return false; }
/**
* Retrieve the buffer in AMQP wire-format for
* sending over the socket connection
*/
OutBuffer buffer() const
{
// we need an output buffer
OutBuffer buffer(totalSize());
// fill the buffer
fill(buffer);
// append an end of frame byte (but not when still negotiating the protocol)
if (needsSeparator()) buffer.add((uint8_t)206);
// return the created buffer
return buffer;
}
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received
* @return bool Was it succesfully processed? * @return bool Was it succesfully processed?
*/ */
virtual bool process(ConnectionImpl *connection) virtual bool process(ConnectionImpl *connection)
{ {
// this is an exception // this is an exception
throw ProtocolException("unimplemented frame"); throw ProtocolException("unimplemented frame");
// unreachable // unreachable
return false; return false;
} }
}; };
/** /**
* End of namespace * End of namespace
*/ */

View File

@ -49,6 +49,14 @@ public:
*/ */
virtual ~MethodFrame() {} virtual ~MethodFrame() {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override { return true; }
/** /**
* Get the message type * Get the message type
* @return uint8_t * @return uint8_t

View File

@ -110,6 +110,18 @@ public:
_arguments(frame) _arguments(frame)
{} {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
// we are synchronous without the nowait option
return !noWait();
}
/** /**
* Returns the method id * Returns the method id
* @return uint16_t * @return uint16_t

View File

@ -69,7 +69,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report to handler // report to handler
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;

View File

@ -98,6 +98,18 @@ public:
_arguments(frame) _arguments(frame)
{} {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
// we are synchronous without the nowait option
return !noWait();
}
/** /**
* returns the method id * returns the method id
* @return string * @return string
@ -156,7 +168,7 @@ public:
* returns whether to wait for a response * returns whether to wait for a response
* @return bool * @return bool
*/ */
bool noWait() bool noWait() const
{ {
return _bools.get(4); return _bools.get(4);
} }

View File

@ -133,7 +133,7 @@ public:
if (!channel) return false; if (!channel) return false;
// report success // report success
channel->reportSuccess(name(), messageCount(), consumerCount()); if (channel->reportSuccess(name(), messageCount(), consumerCount())) channel->synchronized();
// done // done
return true; return true;

View File

@ -85,6 +85,18 @@ public:
_bools(frame) _bools(frame)
{} {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
// we are synchronous without the nowait option
return !noWait();
}
/** /**
* returns the method id * returns the method id
* @returns uint16_t * @returns uint16_t

View File

@ -94,7 +94,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report queue deletion success // report queue deletion success
channel->reportSuccess(this->messageCount()); if (channel->reportSuccess(this->messageCount())) channel->synchronized();
// done // done
return true; return true;

View File

@ -1,6 +1,6 @@
/** /**
* Class describing an AMQP queue purge frame * Class describing an AMQP queue purge frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -27,7 +27,7 @@ private:
ShortString _name; ShortString _name;
/** /**
* Do not wait on response * Do not wait on response
* @var BooleanSet * @var BooleanSet
*/ */
BooleanSet _noWait; BooleanSet _noWait;
@ -37,7 +37,7 @@ protected:
* Encode the frame into a buffer * Encode the frame into a buffer
* *
* @param buffer buffer to write frame to * @param buffer buffer to write frame to
*/ */
virtual void fill(OutBuffer& buffer) const override virtual void fill(OutBuffer& buffer) const override
{ {
// call base // call base
@ -63,13 +63,13 @@ public:
* @param noWait Do not wait on response * @param noWait Do not wait on response
* *
* @return newly created Queuepurgeframe * @return newly created Queuepurgeframe
*/ */
QueuePurgeFrame(uint16_t channel, const std::string& name, bool noWait = false) : QueuePurgeFrame(uint16_t channel, const std::string& name, bool noWait = false) :
QueueFrame(channel, name.length() + 4), // 1 extra for string length, 1 for bool, 2 for deprecated field QueueFrame(channel, name.length() + 4), // 1 extra for string length, 1 for bool, 2 for deprecated field
_name(name), _name(name),
_noWait(noWait) _noWait(noWait)
{} {}
/** /**
* Constructor based on received data * Constructor based on received data
* @param frame received frame * @param frame received frame
@ -80,11 +80,23 @@ public:
_name(frame), _name(frame),
_noWait(frame) _noWait(frame)
{} {}
/**
* Is this a synchronous frame?
*
* After a synchronous frame no more frames may be
* sent until the accompanying -ok frame arrives
*/
bool synchronous() const override
{
// we are synchronous without the nowait option
return !noWait();
}
/** /**
* The method ID * The method ID
* @return method id * @return method id
*/ */
virtual uint16_t methodID() const override virtual uint16_t methodID() const override
{ {
return 30; return 30;

View File

@ -94,7 +94,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report queue purge success // report queue purge success
channel->reportSuccess(this->messageCount()); if (channel->reportSuccess(this->messageCount())) channel->synchronized();
// done // done
return true; return true;

View File

@ -73,7 +73,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report queue unbind success // report queue unbind success
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;

View File

@ -74,7 +74,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report that the channel is open // report that the channel is open
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;

View File

@ -74,7 +74,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report that the channel is open // report that the channel is open
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;

View File

@ -74,7 +74,7 @@ public:
if(!channel) return false; if(!channel) return false;
// report that the channel is open // report that the channel is open
channel->reportSuccess(); if (channel->reportSuccess()) channel->synchronized();
// done // done
return true; return true;