diff --git a/include/amqpcpp/callbacks.h b/include/amqpcpp/callbacks.h index 67e3757..a2d5ec4 100644 --- a/include/amqpcpp/callbacks.h +++ b/include/amqpcpp/callbacks.h @@ -3,7 +3,7 @@ * * Class storing deferred callbacks of different type. * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -29,25 +29,53 @@ class Message; class MetaData; /** - * All the callbacks that are supported - * - * When someone registers a callback function for certain events, it should - * match one of the following signatures. + * Generic callbacks that are used by many deferred objects */ -using SuccessCallback = std::function; -using ErrorCallback = std::function; -using FinalizeCallback = std::function; -using EmptyCallback = std::function; -using BeginCallback = std::function; -using HeaderCallback = std::function; -using DataCallback = std::function; -using MessageCallback = std::function; -using CompleteCallback = std::function; -using QueueCallback = std::function; -using DeleteCallback = std::function; -using SizeCallback = std::function; -using ConsumeCallback = std::function; -using CancelCallback = std::function; +using SuccessCallback = std::function; +using ErrorCallback = std::function; +using FinalizeCallback = std::function; + +/** + * Declaring and deleting a queue + */ +using QueueCallback = std::function; +using DeleteCallback = std::function; + +/** + * When retrieving the size of a queue in some way + */ +using EmptyCallback = std::function; +using SizeCallback = std::function; + +/** + * Starting and stopping a consumer + */ +using ConsumeCallback = std::function; +using CancelCallback = std::function; + +/** + * Receiving messages, either via consume(), get() or as returned messages + * The following methods receive the returned message in multiple parts + */ +using StartCallback = std::function; +using HeaderCallback = std::function; +using DataCallback = std::function; +using DeliveredCallback = std::function; + +/** + * For returned messages amqp-cpp first calls a return-callback before the start, + * header and data callbacks are called. Instead of the deliver-callback, a + * returned-callback is called. + */ +using ReturnCallback = std::function; +using ReturnedCallback = std::function; + +/** + * If you do not want to merge all data into a single string, you can als + * implement callbacks that return the collected message. + */ +using MessageCallback = std::function; +using BounceCallback = std::function; /** * End namespace diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index c41e0aa..2be43cd 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -341,17 +341,31 @@ public: /** * Publish a message to an exchange - * + * + * This method returns a reference to a DeferredPublisher object. You can use this returned + * object to install callbacks that are called when an undeliverable message is returned, or + * to set the callback that is called when the server confirms that the message was received. + * + * To enable handling returned messages, or to enable publisher-confirms, you must not only + * set the callback, but also pass in appropriate flags to enable this feature. If you do not + * pass in these flags, your callbacks will not be called. If you are not at all interested + * in returned messages or publish-confirms, you can ignore the flag and the returned object. + * + * Watch out: the channel returns the same DeferredPublisher object for all calls to the + * publish() method. This means that the callbacks that you install for the first published + * message are also used for subsequent messages _and_ it means that if you install a different + * callback for a later publish operation, it overwrites your earlier callbacks + * * @param exchange the exchange to publish to * @param routingkey the routing key * @param envelope the full envelope to send * @param message the message to send * @param size size of the message */ - bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); } - bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size())); } - bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); } - bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size())); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); } /** * Set the Quality of Service (QOS) for this channel diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index 3f1577e..41d5742 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -44,6 +44,7 @@ class DeferredDelete; class DeferredCancel; class DeferredQueue; class DeferredGet; +class DeferredPublisher; class Connection; class Envelope; class Table; @@ -73,6 +74,12 @@ private: */ ErrorCallback _errorCallback; + /** + * Handler that deals with incoming messages as a result of publish operations + * @var std::shared_ptr + */ + std::shared_ptr _publisher; + /** * Handlers for all consumers that are active * @var std::map @@ -396,16 +403,16 @@ public: * Publish a message to an exchange * * If the mandatory or immediate flag is set, and the message could not immediately - * be published, the message will be returned to the client. However, the AMQP-CPP - * library does not yet report such returned messages. + * be published, the message will be returned to the client. * * @param exchange the exchange to publish to * @param routingkey the routing key * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @return DeferredPublisher */ - bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope); + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope); /** * Set the Quality of Service (QOS) of the entire connection diff --git a/include/amqpcpp/deferredconsumer.h b/include/amqpcpp/deferredconsumer.h index bd5e576..c8030e7 100644 --- a/include/amqpcpp/deferredconsumer.h +++ b/include/amqpcpp/deferredconsumer.h @@ -68,8 +68,7 @@ private: public: /** - * Protected constructor that can only be called - * from within the channel implementation + * Constructor that should only be called from within the channel implementation * * Note: this constructor _should_ be protected, but because make_shared * will then not work, we have decided to make it public after all, @@ -167,15 +166,46 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredConsumer &onBegin(const BeginCallback &callback) + DeferredConsumer &onBegin(const StartCallback &callback) { // store callback - _beginCallback = callback; + _startCallback = callback; // allow chaining return *this; } + /** + * Register the function that is called when the start frame of a new + * consumed message is received + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredConsumer &onStart(const StartCallback &callback) + { + // store callback + _startCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function that is called when the message size is known + * + * @param callback The callback to invoke for message headers + * @return Same object for chaining + */ + DeferredConsumer &onSize(const SizeCallback &callback) + { + // store callback + _sizeCallback = callback; + + // allow chaining + return *this; + } + /** * Register the function that is called when message headers come in * @@ -219,10 +249,25 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredConsumer &onComplete(const CompleteCallback &callback) + DeferredConsumer &onComplete(const DeliveredCallback &callback) { // store callback - _completeCallback = callback; + _deliveredCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a funtion to be called when a message was completely received + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredConsumer &onDelivered(const DeliveredCallback &callback) + { + // store callback + _deliveredCallback = callback; // allow chaining return *this; diff --git a/include/amqpcpp/deferredextreceiver.h b/include/amqpcpp/deferredextreceiver.h index af15bfe..3d427e3 100644 --- a/include/amqpcpp/deferredextreceiver.h +++ b/include/amqpcpp/deferredextreceiver.h @@ -50,6 +50,12 @@ protected: */ MessageCallback _messageCallback; + /** + * Callback for when a message was complete finished + * @var DeliveredCallback + */ + DeliveredCallback _deliveredCallback; + /** * Initialize the object to send out a message diff --git a/include/amqpcpp/deferredget.h b/include/amqpcpp/deferredget.h index 4ea8f1f..78332be 100644 --- a/include/amqpcpp/deferredget.h +++ b/include/amqpcpp/deferredget.h @@ -40,7 +40,7 @@ private: * Callback with the number of messages still in the queue * @var SizeCallback */ - SizeCallback _sizeCallback; + SizeCallback _countCallback; /** * Report success for a get operation @@ -90,58 +90,6 @@ public: DeferredExtReceiver(failed, channel) {} public: - /** - * Register the function to be called when a new message is expected - * - * @param callback The callback to invoke - * @return Same object for chaining - */ - DeferredGet &onBegin(const BeginCallback &callback) - { - // store callback - _beginCallback = callback; - - // allow chaining - return *this; - } - - /** - * Register the function to be called when message headers come in - * - * @param callback The callback to invoke for message headers - * @return Same object for chaining - */ - DeferredGet &onHeaders(const HeaderCallback &callback) - { - // store callback - _headerCallback = callback; - - // allow chaining - return *this; - } - - /** - * Register the function to be called when a chunk of data comes in - * - * Note that this function may be called zero, one or multiple times - * for each incoming message depending on the size of the message data. - * - * If you install this callback you very likely also want to install - * the onComplete callback so you know when the last data part was - * received. - * - * @param callback The callback to invoke for chunks of message data - * @return Same object for chaining - */ - DeferredGet &onData(const DataCallback &callback) - { - // store callback - _dataCallback = callback; - - // allow chaining - return *this; - } - /** * Register a function to be called when a message arrives * This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it @@ -198,13 +146,95 @@ public: } /** - * Register a function to be called when size information is known + * Register a function to be called when queue size information is known * @param callback the callback to execute */ + DeferredGet &onCount(const SizeCallback &callback) + { + // store callback + _countCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when a new message is expected + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredGet &onBegin(const StartCallback &callback) + { + // store callback + _startCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when a new message is expected + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredGet &onStart(const StartCallback &callback) + { + // store callback + _startCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function that is called when the message size is known + * + * @param callback The callback to invoke for message headers + * @return Same object for chaining + */ DeferredGet &onSize(const SizeCallback &callback) { // store callback _sizeCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when message headers come in + * + * @param callback The callback to invoke for message headers + * @return Same object for chaining + */ + DeferredGet &onHeaders(const HeaderCallback &callback) + { + // store callback + _headerCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when a chunk of data comes in + * + * Note that this function may be called zero, one or multiple times + * for each incoming message depending on the size of the message data. + * + * If you install this callback you very likely also want to install + * the onComplete callback so you know when the last data part was + * received. + * + * @param callback The callback to invoke for chunks of message data + * @return Same object for chaining + */ + DeferredGet &onData(const DataCallback &callback) + { + // store callback + _dataCallback = callback; // allow chaining return *this; @@ -216,10 +246,25 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredGet &onComplete(const CompleteCallback &callback) + DeferredGet &onComplete(const DeliveredCallback &callback) { // store callback - _completeCallback = callback; + _deliveredCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a funtion to be called when a message was completely received + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredGet &onDelivered(const DeliveredCallback &callback) + { + // store callback + _deliveredCallback = callback; // allow chaining return *this; diff --git a/include/amqpcpp/deferredpublisher.h b/include/amqpcpp/deferredpublisher.h index 04f151c..77c2465 100644 --- a/include/amqpcpp/deferredpublisher.h +++ b/include/amqpcpp/deferredpublisher.h @@ -18,14 +18,58 @@ * Begin of namespace */ namespace AMQP { + +/** + * Forward declarations + */ +class ChannelImpl; /** * Class definition */ -class DeferredPublisher : public DeferredReceiver +class DeferredPublisher : public DeferredReceiver, public std::enable_shared_from_this { private: + /** + * The error code + * @var int16_t + */ + int16_t _code = 0; + + /** + * The error message + * @var std::string + */ + std::string _description; + /** + * Callback that is called when a message is returned + * @var BounceCallback + */ + BounceCallback _bounceCallback; + + /** + * Begin of a bounced message + * @var ReturnCallback + */ + ReturnCallback _beginCallback; + + /** + * End of a bounced message + * @var ReturnedCallback + */ + ReturnedCallback _completeCallback; + + /** + * Get reference to self to prevent that object falls out of scope + * @return std::shared_ptr + */ + virtual std::shared_ptr lock() override { return shared_from_this(); } + + /** + * Extended implementation of the complete method that is called when a message was fully received + */ + virtual void complete() override; public: /** @@ -38,7 +82,7 @@ public: * @param channel the channel implementation * @param failed are we already failed? */ - DeferredConsumer(ChannelImpl *channel, bool failed = false) : + DeferredPublisher(ChannelImpl *channel, bool failed = false) : DeferredReceiver(failed, channel) {} public: @@ -46,10 +90,10 @@ public: * Register a function to be called when a full message is returned * @param callback the callback to execute */ - DeferredConsumer &onReceived(const ReturnCallback &callback) + DeferredPublisher &onReceived(const BounceCallback &callback) { // store callback - _returnCallback = callback; + _bounceCallback = callback; // allow chaining return *this; @@ -59,10 +103,10 @@ public: * Alias for onReceived() (see above) * @param callback the callback to execute */ - DeferredConsumer &onMessage(const ReturnCallback &callback) + DeferredPublisher &onMessage(const BounceCallback &callback) { // store callback - _returnCallback = callback; + _bounceCallback = callback; // allow chaining return *this; @@ -72,36 +116,27 @@ public: * Alias for onReceived() (see above) * @param callback the callback to execute */ - DeferredConsumer &onReturned(const ReturnCallback &callback) + DeferredPublisher &onReturned(const BounceCallback &callback) { // store callback - _returnCallback = callback; + _bounceCallback = callback; // allow chaining return *this; } /** - * RabbitMQ sends a message in multiple frames to its consumers. - * The AMQP-CPP library collects these frames and merges them into a - * single AMQP::Message object that is passed to the callback that - * you can set with the onReceived() or onMessage() methods (see above). - * - * However, you can also write your own algorithm to merge the frames. - * In that case you can install callbacks to handle the frames. Every - * message is sent in a number of frames: - * - * - a begin frame that marks the start of the message - * - an optional header if the message was sent with an envelope - * - zero or more data frames (usually 1, but more for large messages) - * - an end frame to mark the end of the message. - * - * To install handlers for these frames, you can use the onBegin(), - * onHeaders(), onData() and onComplete() methods. - * - * If you just rely on the onReceived() or onMessage() callbacks, you - * do not need any of the methods below this line. + * Alias for onReceived() (see above) + * @param callback the callback to execute */ + DeferredPublisher &onBounced(const BounceCallback &callback) + { + // store callback + _bounceCallback = callback; + + // allow chaining + return *this; + } /** * Register the function that is called when the start frame of a new @@ -110,7 +145,7 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredConsumer &onBegin(const BeginCallback &callback) + DeferredPublisher &onBegin(const ReturnCallback &callback) { // store callback _beginCallback = callback; @@ -119,13 +154,28 @@ public: return *this; } + /** + * Register a function that is called when the message size is known + * + * @param callback The callback to invoke for message headers + * @return Same object for chaining + */ + DeferredPublisher &onSize(const SizeCallback &callback) + { + // store callback + _sizeCallback = callback; + + // allow chaining + return *this; + } + /** * Register the function that is called when message headers come in * * @param callback The callback to invoke for message headers * @return Same object for chaining */ - DeferredConsumer &onHeaders(const HeaderCallback &callback) + DeferredPublisher &onHeaders(const HeaderCallback &callback) { // store callback _headerCallback = callback; @@ -147,7 +197,7 @@ public: * @param callback The callback to invoke for chunks of message data * @return Same object for chaining */ - DeferredConsumer &onData(const DataCallback &callback) + DeferredPublisher &onData(const DataCallback &callback) { // store callback _dataCallback = callback; @@ -162,7 +212,7 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredConsumer &onComplete(const CompleteCallback &callback) + DeferredPublisher &onComplete(const ReturnedCallback &callback) { // store callback _completeCallback = callback; diff --git a/include/amqpcpp/deferredreceiver.h b/include/amqpcpp/deferredreceiver.h index e4d4562..caa489b 100644 --- a/include/amqpcpp/deferredreceiver.h +++ b/include/amqpcpp/deferredreceiver.h @@ -96,9 +96,15 @@ protected: /** * Callback for new message - * @var BeginCallback + * @var StartCallback */ - BeginCallback _beginCallback; + StartCallback _startCallback; + + /** + * Callback that is called when size of the message is known + * @var SizeCallback + */ + SizeCallback _sizeCallback; /** * Callback for incoming headers @@ -112,12 +118,6 @@ protected: */ DataCallback _dataCallback; - /** - * Callback for when a message was complete finished - * @var CompleteCallback - */ - CompleteCallback _completeCallback; - /** * The message that we are currently receiving * @var stack_ptr diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 95ae8f8..fff8873 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -451,26 +451,30 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @return DeferredPublisher */ -bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) +DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { // we are going to send out multiple frames, each one will trigger a call to the handler, // which in turn could destruct the channel object, we need to monitor that Monitor monitor(this); // @todo do not copy the entire buffer to individual frames + + // make sure we have a deferred object to return + if (!_publisher) _publisher.reset(new DeferredPublisher(this)); // send the publish frame - if (!send(BasicPublishFrame(_id, exchange, routingKey))) return false; + if (!send(BasicPublishFrame(_id, exchange, routingKey))) return *_publisher; // channel still valid? - if (!monitor.valid()) return false; + if (!monitor.valid()) return *_publisher; // send header - if (!send(BasicHeaderFrame(_id, envelope))) return false; + if (!send(BasicHeaderFrame(_id, envelope))) return *_publisher; // channel and connection still valid? - if (!monitor.valid() || !_connection) return false; + if (!monitor.valid() || !_connection) return *_publisher; // the max payload size is the max frame size minus the bytes for headers and trailer uint32_t maxpayload = _connection->maxPayload(); @@ -487,10 +491,10 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin uint64_t chunksize = std::min(static_cast(maxpayload), bytesleft); // send out a body frame - if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return false; + if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return *_publisher; // channel still valid? - if (!monitor.valid()) return false; + if (!monitor.valid()) return *_publisher; // update counters bytessent += chunksize; @@ -498,7 +502,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin } // done - return true; + return *_publisher; } /** diff --git a/src/deferredextreceiver.cpp b/src/deferredextreceiver.cpp index ca05185..0f2f047 100644 --- a/src/deferredextreceiver.cpp +++ b/src/deferredextreceiver.cpp @@ -26,7 +26,7 @@ namespace AMQP { void DeferredExtReceiver::initialize(const std::string &exchange, const std::string &routingkey) { // call base - DeferredExtReceiver::initialize(exchange, routingkey); + DeferredReceiver::initialize(exchange, routingkey); // do we have anybody interested in messages? in that case we construct the message if (_messageCallback) _message.construct(exchange, routingkey); @@ -44,7 +44,7 @@ void DeferredExtReceiver::complete() if (_message) _messageCallback(*_message, _deliveryTag, _redelivered); // do we have to inform anyone about completion? - if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); + if (_deliveredCallback) _deliveredCallback(_deliveryTag, _redelivered); // for the next iteration we want a new message _message.reset(); diff --git a/src/deferredget.cpp b/src/deferredget.cpp index 84a2337..e8985f1 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -34,7 +34,7 @@ const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messagecoun // report the size (note that this is the size _minus_ the message that is retrieved // (and for which the callback will be called later), so it could be zero) - if (_sizeCallback) _sizeCallback(messagecount); + if (_countCallback) _countCallback(messagecount); // return next handler return _next; @@ -47,7 +47,7 @@ const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messagecoun const std::shared_ptr &DeferredGet::reportSuccess() const { // report the size - if (_sizeCallback) _sizeCallback(0); + if (_countCallback) _countCallback(0); // check if a callback was set if (_emptyCallback) _emptyCallback(); diff --git a/src/deferredpublisher.cpp b/src/deferredpublisher.cpp index 61af52a..72843f1 100644 --- a/src/deferredpublisher.cpp +++ b/src/deferredpublisher.cpp @@ -22,7 +22,7 @@ void DeferredPublisher::complete() Monitor monitor(_channel); // do we have a message? - if (_message) _bounceCallback(*_message, 0, ""); + if (_message) _bounceCallback(*_message, _code, _description); // do we have to inform anyone about completion? if (_completeCallback) _completeCallback(); diff --git a/src/deferredreceiver.cpp b/src/deferredreceiver.cpp index 251258a..f4e246c 100644 --- a/src/deferredreceiver.cpp +++ b/src/deferredreceiver.cpp @@ -28,7 +28,7 @@ namespace AMQP { void DeferredReceiver::initialize(const std::string &exchange, const std::string &routingkey) { // anybody interested in the new message? - if (_beginCallback) _beginCallback(exchange, routingkey); + if (_startCallback) _startCallback(exchange, routingkey); } /** @@ -43,6 +43,9 @@ void DeferredReceiver::process(BasicHeaderFrame &frame) // store the body size _bodySize = frame.bodySize(); + + // is user interested in the size? + if (_sizeCallback) _sizeCallback(_bodySize); // do we have a message? if (_message) diff --git a/src/includes.h b/src/includes.h index 122f2d0..89ae392 100644 --- a/src/includes.h +++ b/src/includes.h @@ -68,6 +68,7 @@ #include "amqpcpp/callbacks.h" #include "amqpcpp/deferred.h" #include "amqpcpp/deferredconsumer.h" +#include "amqpcpp/deferredpublisher.h" #include "amqpcpp/deferredqueue.h" #include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferredcancel.h"