breaking changes:

channel.publish() now returns a DeferredConsumer object on which callbacks can be installed for handling returned messages,
channel.get().onSize() has a different behavior: it now reports the message size (and no longer the queue size),
channel.get().onCount() has been added: it reports the queue size (this used to be the onSize() method),
channel.consume().onSize() method has been added to find out the size of the upcoming message
This commit is contained in:
Emiel Bruijntjes 2018-03-01 21:12:53 +01:00
parent f39df772d3
commit 1f3500cee8
14 changed files with 345 additions and 142 deletions

View File

@ -3,7 +3,7 @@
*
* Class storing deferred callbacks of different type.
*
* @copyright 2014 - 2017 Copernica BV
* @copyright 2014 - 2018 Copernica BV
*/
/**
@ -29,25 +29,53 @@ class Message;
class MetaData;
/**
* All the callbacks that are supported
*
* When someone registers a callback function for certain events, it should
* match one of the following signatures.
* Generic callbacks that are used by many deferred objects
*/
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
using EmptyCallback = std::function<void()>;
using BeginCallback = std::function<void(const std::string &exchange, const std::string &routingkey)>;
using HeaderCallback = std::function<void(const MetaData &metaData)>;
using DataCallback = std::function<void(const char *data, size_t size)>;
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
using CompleteCallback = std::function<void(uint64_t deliveryTag, bool redelivered)>;
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
using SizeCallback = std::function<void(uint32_t messagecount)>;
using ConsumeCallback = std::function<void(const std::string &consumer)>;
using CancelCallback = std::function<void(const std::string &consumer)>;
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
/**
* Declaring and deleting a queue
*/
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
/**
* When retrieving the size of a queue in some way
*/
using EmptyCallback = std::function<void()>;
using SizeCallback = std::function<void(uint32_t messagecount)>;
/**
* Starting and stopping a consumer
*/
using ConsumeCallback = std::function<void(const std::string &consumer)>;
using CancelCallback = std::function<void(const std::string &consumer)>;
/**
* Receiving messages, either via consume(), get() or as returned messages
* The following methods receive the returned message in multiple parts
*/
using StartCallback = std::function<void(const std::string &exchange, const std::string &routingkey)>;
using HeaderCallback = std::function<void(const MetaData &metaData)>;
using DataCallback = std::function<void(const char *data, size_t size)>;
using DeliveredCallback = std::function<void(uint64_t deliveryTag, bool redelivered)>;
/**
* For returned messages amqp-cpp first calls a return-callback before the start,
* header and data callbacks are called. Instead of the deliver-callback, a
* returned-callback is called.
*/
using ReturnCallback = std::function<void(int16_t code, const std::string &message)>;
using ReturnedCallback = std::function<void()>;
/**
* If you do not want to merge all data into a single string, you can als
* implement callbacks that return the collected message.
*/
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
using BounceCallback = std::function<void(const Message &message, int16_t code, const std::string &description)>;
/**
* End namespace

View File

@ -341,17 +341,31 @@ public:
/**
* Publish a message to an exchange
*
*
* This method returns a reference to a DeferredPublisher object. You can use this returned
* object to install callbacks that are called when an undeliverable message is returned, or
* to set the callback that is called when the server confirms that the message was received.
*
* To enable handling returned messages, or to enable publisher-confirms, you must not only
* set the callback, but also pass in appropriate flags to enable this feature. If you do not
* pass in these flags, your callbacks will not be called. If you are not at all interested
* in returned messages or publish-confirms, you can ignore the flag and the returned object.
*
* Watch out: the channel returns the same DeferredPublisher object for all calls to the
* publish() method. This means that the callbacks that you install for the first published
* message are also used for subsequent messages _and_ it means that if you install a different
* callback for a later publish operation, it overwrites your earlier callbacks
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
*/
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); }
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size())); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); }
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); }
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size())); }
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); }
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); }
/**
* Set the Quality of Service (QOS) for this channel

View File

@ -44,6 +44,7 @@ class DeferredDelete;
class DeferredCancel;
class DeferredQueue;
class DeferredGet;
class DeferredPublisher;
class Connection;
class Envelope;
class Table;
@ -73,6 +74,12 @@ private:
*/
ErrorCallback _errorCallback;
/**
* Handler that deals with incoming messages as a result of publish operations
* @var std::shared_ptr<DeferredPublisher>
*/
std::shared_ptr<DeferredPublisher> _publisher;
/**
* Handlers for all consumers that are active
* @var std::map<std::string,std::shared_ptr<DeferredConsumer>
@ -396,16 +403,16 @@ public:
* Publish a message to an exchange
*
* If the mandatory or immediate flag is set, and the message could not immediately
* be published, the message will be returned to the client. However, the AMQP-CPP
* library does not yet report such returned messages.
* be published, the message will be returned to the client.
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
* @return DeferredPublisher
*/
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope);
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope);
/**
* Set the Quality of Service (QOS) of the entire connection

View File

@ -68,8 +68,7 @@ private:
public:
/**
* Protected constructor that can only be called
* from within the channel implementation
* Constructor that should only be called from within the channel implementation
*
* Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all,
@ -167,15 +166,46 @@ public:
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredConsumer &onBegin(const BeginCallback &callback)
DeferredConsumer &onBegin(const StartCallback &callback)
{
// store callback
_beginCallback = callback;
_startCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function that is called when the start frame of a new
* consumed message is received
*
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredConsumer &onStart(const StartCallback &callback)
{
// store callback
_startCallback = callback;
// allow chaining
return *this;
}
/**
* Register a function that is called when the message size is known
*
* @param callback The callback to invoke for message headers
* @return Same object for chaining
*/
DeferredConsumer &onSize(const SizeCallback &callback)
{
// store callback
_sizeCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function that is called when message headers come in
*
@ -219,10 +249,25 @@ public:
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredConsumer &onComplete(const CompleteCallback &callback)
DeferredConsumer &onComplete(const DeliveredCallback &callback)
{
// store callback
_completeCallback = callback;
_deliveredCallback = callback;
// allow chaining
return *this;
}
/**
* Register a funtion to be called when a message was completely received
*
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredConsumer &onDelivered(const DeliveredCallback &callback)
{
// store callback
_deliveredCallback = callback;
// allow chaining
return *this;

View File

@ -50,6 +50,12 @@ protected:
*/
MessageCallback _messageCallback;
/**
* Callback for when a message was complete finished
* @var DeliveredCallback
*/
DeliveredCallback _deliveredCallback;
/**
* Initialize the object to send out a message

View File

@ -40,7 +40,7 @@ private:
* Callback with the number of messages still in the queue
* @var SizeCallback
*/
SizeCallback _sizeCallback;
SizeCallback _countCallback;
/**
* Report success for a get operation
@ -90,58 +90,6 @@ public:
DeferredExtReceiver(failed, channel) {}
public:
/**
* Register the function to be called when a new message is expected
*
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredGet &onBegin(const BeginCallback &callback)
{
// store callback
_beginCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function to be called when message headers come in
*
* @param callback The callback to invoke for message headers
* @return Same object for chaining
*/
DeferredGet &onHeaders(const HeaderCallback &callback)
{
// store callback
_headerCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function to be called when a chunk of data comes in
*
* Note that this function may be called zero, one or multiple times
* for each incoming message depending on the size of the message data.
*
* If you install this callback you very likely also want to install
* the onComplete callback so you know when the last data part was
* received.
*
* @param callback The callback to invoke for chunks of message data
* @return Same object for chaining
*/
DeferredGet &onData(const DataCallback &callback)
{
// store callback
_dataCallback = callback;
// allow chaining
return *this;
}
/**
* Register a function to be called when a message arrives
* This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it
@ -198,13 +146,95 @@ public:
}
/**
* Register a function to be called when size information is known
* Register a function to be called when queue size information is known
* @param callback the callback to execute
*/
DeferredGet &onCount(const SizeCallback &callback)
{
// store callback
_countCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function to be called when a new message is expected
*
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredGet &onBegin(const StartCallback &callback)
{
// store callback
_startCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function to be called when a new message is expected
*
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredGet &onStart(const StartCallback &callback)
{
// store callback
_startCallback = callback;
// allow chaining
return *this;
}
/**
* Register a function that is called when the message size is known
*
* @param callback The callback to invoke for message headers
* @return Same object for chaining
*/
DeferredGet &onSize(const SizeCallback &callback)
{
// store callback
_sizeCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function to be called when message headers come in
*
* @param callback The callback to invoke for message headers
* @return Same object for chaining
*/
DeferredGet &onHeaders(const HeaderCallback &callback)
{
// store callback
_headerCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function to be called when a chunk of data comes in
*
* Note that this function may be called zero, one or multiple times
* for each incoming message depending on the size of the message data.
*
* If you install this callback you very likely also want to install
* the onComplete callback so you know when the last data part was
* received.
*
* @param callback The callback to invoke for chunks of message data
* @return Same object for chaining
*/
DeferredGet &onData(const DataCallback &callback)
{
// store callback
_dataCallback = callback;
// allow chaining
return *this;
@ -216,10 +246,25 @@ public:
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredGet &onComplete(const CompleteCallback &callback)
DeferredGet &onComplete(const DeliveredCallback &callback)
{
// store callback
_completeCallback = callback;
_deliveredCallback = callback;
// allow chaining
return *this;
}
/**
* Register a funtion to be called when a message was completely received
*
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredGet &onDelivered(const DeliveredCallback &callback)
{
// store callback
_deliveredCallback = callback;
// allow chaining
return *this;

View File

@ -18,14 +18,58 @@
* Begin of namespace
*/
namespace AMQP {
/**
* Forward declarations
*/
class ChannelImpl;
/**
* Class definition
*/
class DeferredPublisher : public DeferredReceiver
class DeferredPublisher : public DeferredReceiver, public std::enable_shared_from_this<DeferredPublisher>
{
private:
/**
* The error code
* @var int16_t
*/
int16_t _code = 0;
/**
* The error message
* @var std::string
*/
std::string _description;
/**
* Callback that is called when a message is returned
* @var BounceCallback
*/
BounceCallback _bounceCallback;
/**
* Begin of a bounced message
* @var ReturnCallback
*/
ReturnCallback _beginCallback;
/**
* End of a bounced message
* @var ReturnedCallback
*/
ReturnedCallback _completeCallback;
/**
* Get reference to self to prevent that object falls out of scope
* @return std::shared_ptr
*/
virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); }
/**
* Extended implementation of the complete method that is called when a message was fully received
*/
virtual void complete() override;
public:
/**
@ -38,7 +82,7 @@ public:
* @param channel the channel implementation
* @param failed are we already failed?
*/
DeferredConsumer(ChannelImpl *channel, bool failed = false) :
DeferredPublisher(ChannelImpl *channel, bool failed = false) :
DeferredReceiver(failed, channel) {}
public:
@ -46,10 +90,10 @@ public:
* Register a function to be called when a full message is returned
* @param callback the callback to execute
*/
DeferredConsumer &onReceived(const ReturnCallback &callback)
DeferredPublisher &onReceived(const BounceCallback &callback)
{
// store callback
_returnCallback = callback;
_bounceCallback = callback;
// allow chaining
return *this;
@ -59,10 +103,10 @@ public:
* Alias for onReceived() (see above)
* @param callback the callback to execute
*/
DeferredConsumer &onMessage(const ReturnCallback &callback)
DeferredPublisher &onMessage(const BounceCallback &callback)
{
// store callback
_returnCallback = callback;
_bounceCallback = callback;
// allow chaining
return *this;
@ -72,36 +116,27 @@ public:
* Alias for onReceived() (see above)
* @param callback the callback to execute
*/
DeferredConsumer &onReturned(const ReturnCallback &callback)
DeferredPublisher &onReturned(const BounceCallback &callback)
{
// store callback
_returnCallback = callback;
_bounceCallback = callback;
// allow chaining
return *this;
}
/**
* RabbitMQ sends a message in multiple frames to its consumers.
* The AMQP-CPP library collects these frames and merges them into a
* single AMQP::Message object that is passed to the callback that
* you can set with the onReceived() or onMessage() methods (see above).
*
* However, you can also write your own algorithm to merge the frames.
* In that case you can install callbacks to handle the frames. Every
* message is sent in a number of frames:
*
* - a begin frame that marks the start of the message
* - an optional header if the message was sent with an envelope
* - zero or more data frames (usually 1, but more for large messages)
* - an end frame to mark the end of the message.
*
* To install handlers for these frames, you can use the onBegin(),
* onHeaders(), onData() and onComplete() methods.
*
* If you just rely on the onReceived() or onMessage() callbacks, you
* do not need any of the methods below this line.
* Alias for onReceived() (see above)
* @param callback the callback to execute
*/
DeferredPublisher &onBounced(const BounceCallback &callback)
{
// store callback
_bounceCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function that is called when the start frame of a new
@ -110,7 +145,7 @@ public:
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredConsumer &onBegin(const BeginCallback &callback)
DeferredPublisher &onBegin(const ReturnCallback &callback)
{
// store callback
_beginCallback = callback;
@ -119,13 +154,28 @@ public:
return *this;
}
/**
* Register a function that is called when the message size is known
*
* @param callback The callback to invoke for message headers
* @return Same object for chaining
*/
DeferredPublisher &onSize(const SizeCallback &callback)
{
// store callback
_sizeCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function that is called when message headers come in
*
* @param callback The callback to invoke for message headers
* @return Same object for chaining
*/
DeferredConsumer &onHeaders(const HeaderCallback &callback)
DeferredPublisher &onHeaders(const HeaderCallback &callback)
{
// store callback
_headerCallback = callback;
@ -147,7 +197,7 @@ public:
* @param callback The callback to invoke for chunks of message data
* @return Same object for chaining
*/
DeferredConsumer &onData(const DataCallback &callback)
DeferredPublisher &onData(const DataCallback &callback)
{
// store callback
_dataCallback = callback;
@ -162,7 +212,7 @@ public:
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredConsumer &onComplete(const CompleteCallback &callback)
DeferredPublisher &onComplete(const ReturnedCallback &callback)
{
// store callback
_completeCallback = callback;

View File

@ -96,9 +96,15 @@ protected:
/**
* Callback for new message
* @var BeginCallback
* @var StartCallback
*/
BeginCallback _beginCallback;
StartCallback _startCallback;
/**
* Callback that is called when size of the message is known
* @var SizeCallback
*/
SizeCallback _sizeCallback;
/**
* Callback for incoming headers
@ -112,12 +118,6 @@ protected:
*/
DataCallback _dataCallback;
/**
* Callback for when a message was complete finished
* @var CompleteCallback
*/
CompleteCallback _completeCallback;
/**
* The message that we are currently receiving
* @var stack_ptr<Message>

View File

@ -451,26 +451,30 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags)
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
* @return DeferredPublisher
*/
bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope)
DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope)
{
// we are going to send out multiple frames, each one will trigger a call to the handler,
// which in turn could destruct the channel object, we need to monitor that
Monitor monitor(this);
// @todo do not copy the entire buffer to individual frames
// make sure we have a deferred object to return
if (!_publisher) _publisher.reset(new DeferredPublisher(this));
// send the publish frame
if (!send(BasicPublishFrame(_id, exchange, routingKey))) return false;
if (!send(BasicPublishFrame(_id, exchange, routingKey))) return *_publisher;
// channel still valid?
if (!monitor.valid()) return false;
if (!monitor.valid()) return *_publisher;
// send header
if (!send(BasicHeaderFrame(_id, envelope))) return false;
if (!send(BasicHeaderFrame(_id, envelope))) return *_publisher;
// channel and connection still valid?
if (!monitor.valid() || !_connection) return false;
if (!monitor.valid() || !_connection) return *_publisher;
// the max payload size is the max frame size minus the bytes for headers and trailer
uint32_t maxpayload = _connection->maxPayload();
@ -487,10 +491,10 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
// send out a body frame
if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return false;
if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return *_publisher;
// channel still valid?
if (!monitor.valid()) return false;
if (!monitor.valid()) return *_publisher;
// update counters
bytessent += chunksize;
@ -498,7 +502,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
}
// done
return true;
return *_publisher;
}
/**

View File

@ -26,7 +26,7 @@ namespace AMQP {
void DeferredExtReceiver::initialize(const std::string &exchange, const std::string &routingkey)
{
// call base
DeferredExtReceiver::initialize(exchange, routingkey);
DeferredReceiver::initialize(exchange, routingkey);
// do we have anybody interested in messages? in that case we construct the message
if (_messageCallback) _message.construct(exchange, routingkey);
@ -44,7 +44,7 @@ void DeferredExtReceiver::complete()
if (_message) _messageCallback(*_message, _deliveryTag, _redelivered);
// do we have to inform anyone about completion?
if (_completeCallback) _completeCallback(_deliveryTag, _redelivered);
if (_deliveredCallback) _deliveredCallback(_deliveryTag, _redelivered);
// for the next iteration we want a new message
_message.reset();

View File

@ -34,7 +34,7 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messagecoun
// report the size (note that this is the size _minus_ the message that is retrieved
// (and for which the callback will be called later), so it could be zero)
if (_sizeCallback) _sizeCallback(messagecount);
if (_countCallback) _countCallback(messagecount);
// return next handler
return _next;
@ -47,7 +47,7 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messagecoun
const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
{
// report the size
if (_sizeCallback) _sizeCallback(0);
if (_countCallback) _countCallback(0);
// check if a callback was set
if (_emptyCallback) _emptyCallback();

View File

@ -22,7 +22,7 @@ void DeferredPublisher::complete()
Monitor monitor(_channel);
// do we have a message?
if (_message) _bounceCallback(*_message, 0, "");
if (_message) _bounceCallback(*_message, _code, _description);
// do we have to inform anyone about completion?
if (_completeCallback) _completeCallback();

View File

@ -28,7 +28,7 @@ namespace AMQP {
void DeferredReceiver::initialize(const std::string &exchange, const std::string &routingkey)
{
// anybody interested in the new message?
if (_beginCallback) _beginCallback(exchange, routingkey);
if (_startCallback) _startCallback(exchange, routingkey);
}
/**
@ -43,6 +43,9 @@ void DeferredReceiver::process(BasicHeaderFrame &frame)
// store the body size
_bodySize = frame.bodySize();
// is user interested in the size?
if (_sizeCallback) _sizeCallback(_bodySize);
// do we have a message?
if (_message)

View File

@ -68,6 +68,7 @@
#include "amqpcpp/callbacks.h"
#include "amqpcpp/deferred.h"
#include "amqpcpp/deferredconsumer.h"
#include "amqpcpp/deferredpublisher.h"
#include "amqpcpp/deferredqueue.h"
#include "amqpcpp/deferreddelete.h"
#include "amqpcpp/deferredcancel.h"