Merge remote-tracking branch 'upstream/master' into hunter
This commit is contained in:
commit
e3c6c9df41
4
Makefile
4
Makefile
|
|
@ -2,8 +2,8 @@ PREFIX ?= /usr
|
||||||
INCLUDE_DIR = ${PREFIX}/include
|
INCLUDE_DIR = ${PREFIX}/include
|
||||||
LIBRARY_DIR = ${PREFIX}/lib
|
LIBRARY_DIR = ${PREFIX}/lib
|
||||||
export LIBRARY_NAME = amqpcpp
|
export LIBRARY_NAME = amqpcpp
|
||||||
export SONAME = 2.8
|
export SONAME = 3.0
|
||||||
export VERSION = 2.8.0
|
export VERSION = 3.0.0
|
||||||
|
|
||||||
all:
|
all:
|
||||||
$(MAKE) -C src all
|
$(MAKE) -C src all
|
||||||
|
|
|
||||||
65
README.md
65
README.md
|
|
@ -799,44 +799,54 @@ in almost any form:
|
||||||
````c++
|
````c++
|
||||||
/**
|
/**
|
||||||
* Publish a message to an exchange
|
* Publish a message to an exchange
|
||||||
*
|
*
|
||||||
* The following flags can be used
|
* You have to supply the name of an exchange and a routing key. RabbitMQ will
|
||||||
*
|
* then try to send the message to one or more queues. With the optional flags
|
||||||
* - mandatory if set, an unroutable message will be sent back to
|
* parameter you can specify what should happen if the message could not be routed
|
||||||
* the client (currently not supported)
|
* to a queue. By default, unroutable message are silently discarded.
|
||||||
*
|
*
|
||||||
* - immediate if set, a message that could not immediately be consumed
|
* This method returns a reference to a DeferredPublisher object. You can use
|
||||||
* is returned to the client (currently not supported)
|
* this returned object to install callbacks that are called when an undeliverable
|
||||||
*
|
* message is returned, or to set the callback that is called when the server
|
||||||
* If either of the two flags is set, and the message could not immediately
|
* confirms that the message was received.
|
||||||
* be published, the message is returned by the server to the client. However,
|
*
|
||||||
* at this moment in time, the AMQP-CPP library does not support catching
|
* To enable handling returned messages, or to enable publisher-confirms, you must
|
||||||
* such returned messages.
|
* not only set the callback, but also pass in appropriate flags to enable this
|
||||||
*
|
* feature. If you do not pass in these flags, your callbacks will not be called.
|
||||||
|
* If you are not at all interested in returned messages or publish-confirms, you
|
||||||
|
* can ignore the flag and the returned object.
|
||||||
|
*
|
||||||
|
* Watch out: the channel returns _the same_ DeferredPublisher object for all
|
||||||
|
* calls to the publish() method. This means that the callbacks that you install
|
||||||
|
* for the first published message are also used for subsequent messages _and_
|
||||||
|
* it means that if you install a different callback for a later publish
|
||||||
|
* operation, it overwrites your earlier callbacks
|
||||||
|
*
|
||||||
|
* The following flags can be supplied:
|
||||||
|
*
|
||||||
|
* - mandatory If set, server returns messages that are not sent to a queue
|
||||||
|
* - immediate If set, server returns messages that can not immediately be forwarded to a consumer.
|
||||||
|
*
|
||||||
* @param exchange the exchange to publish to
|
* @param exchange the exchange to publish to
|
||||||
* @param routingkey the routing key
|
* @param routingkey the routing key
|
||||||
* @param flags optional flags (see above)
|
|
||||||
* @param envelope the full envelope to send
|
* @param envelope the full envelope to send
|
||||||
* @param message the message to send
|
* @param message the message to send
|
||||||
* @param size size of the message
|
* @param size size of the message
|
||||||
|
* @param flags optional flags
|
||||||
*/
|
*/
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const AMQP::Envelope &envelope);
|
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); }
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const AMQP::Envelope &envelope);
|
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const std::string &message);
|
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); }
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message);
|
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size);
|
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size);
|
|
||||||
````
|
````
|
||||||
|
|
||||||
Published messages are normally not confirmed by the server, and the RabbitMQ
|
Published messages are normally not confirmed by the server, and the RabbitMQ
|
||||||
will not send a report back to inform you whether the message was succesfully
|
will not send a report back to inform you whether the message was succesfully
|
||||||
published or not. Therefore the publish method does not return a Deferred
|
published or not. But with the flags you can instruct RabbitMQ to send back
|
||||||
object.
|
the message if it was undeliverable.
|
||||||
|
|
||||||
As long as no error is reported via the Channel::onError() method, you can safely
|
You can also use transactions to ensure that your messages get delivered.
|
||||||
assume that your messages were delivered.
|
Let's say that you are publishing many messages in a row. If you get
|
||||||
|
|
||||||
This can of course be a problem when you are publishing many messages. If you get
|
|
||||||
an error halfway through there is no way to know for sure how many messages made
|
an error halfway through there is no way to know for sure how many messages made
|
||||||
it to the broker and how many should be republished. If this is important, you can
|
it to the broker and how many should be republished. If this is important, you can
|
||||||
wrap the publish commands inside a transaction. In this case, if an error occurs,
|
wrap the publish commands inside a transaction. In this case, if an error occurs,
|
||||||
|
|
@ -1002,7 +1012,6 @@ need additional attention:
|
||||||
- ability to set up secure connections (or is this fully done on the IO level)
|
- ability to set up secure connections (or is this fully done on the IO level)
|
||||||
- login with other protocols than login/password
|
- login with other protocols than login/password
|
||||||
- publish confirms
|
- publish confirms
|
||||||
- returned messages
|
|
||||||
|
|
||||||
We also need to add more safety checks so that strange or invalid data from
|
We also need to add more safety checks so that strange or invalid data from
|
||||||
RabbitMQ does not break the library (although in reality RabbitMQ only sends
|
RabbitMQ does not break the library (although in reality RabbitMQ only sends
|
||||||
|
|
|
||||||
|
|
@ -71,6 +71,7 @@
|
||||||
#include "amqpcpp/deferreddelete.h"
|
#include "amqpcpp/deferreddelete.h"
|
||||||
#include "amqpcpp/deferredcancel.h"
|
#include "amqpcpp/deferredcancel.h"
|
||||||
#include "amqpcpp/deferredget.h"
|
#include "amqpcpp/deferredget.h"
|
||||||
|
#include "amqpcpp/deferredpublisher.h"
|
||||||
#include "amqpcpp/channelimpl.h"
|
#include "amqpcpp/channelimpl.h"
|
||||||
#include "amqpcpp/channel.h"
|
#include "amqpcpp/channel.h"
|
||||||
#include "amqpcpp/login.h"
|
#include "amqpcpp/login.h"
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@
|
||||||
*
|
*
|
||||||
* Class storing deferred callbacks of different type.
|
* Class storing deferred callbacks of different type.
|
||||||
*
|
*
|
||||||
* @copyright 2014 - 2017 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -29,25 +29,53 @@ class Message;
|
||||||
class MetaData;
|
class MetaData;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All the callbacks that are supported
|
* Generic callbacks that are used by many deferred objects
|
||||||
*
|
|
||||||
* When someone registers a callback function for certain events, it should
|
|
||||||
* match one of the following signatures.
|
|
||||||
*/
|
*/
|
||||||
using SuccessCallback = std::function<void()>;
|
using SuccessCallback = std::function<void()>;
|
||||||
using ErrorCallback = std::function<void(const char *message)>;
|
using ErrorCallback = std::function<void(const char *message)>;
|
||||||
using FinalizeCallback = std::function<void()>;
|
using FinalizeCallback = std::function<void()>;
|
||||||
using EmptyCallback = std::function<void()>;
|
|
||||||
using BeginCallback = std::function<void()>;
|
/**
|
||||||
using HeaderCallback = std::function<void(const MetaData &metaData)>;
|
* Declaring and deleting a queue
|
||||||
using DataCallback = std::function<void(const char *data, size_t size)>;
|
*/
|
||||||
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
|
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
|
||||||
using CompleteCallback = std::function<void(uint64_t deliveryTag, bool redelivered)>;
|
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
|
||||||
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
|
|
||||||
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
|
/**
|
||||||
using SizeCallback = std::function<void(uint32_t messagecount)>;
|
* When retrieving the size of a queue in some way
|
||||||
using ConsumeCallback = std::function<void(const std::string &consumer)>;
|
*/
|
||||||
using CancelCallback = std::function<void(const std::string &consumer)>;
|
using EmptyCallback = std::function<void()>;
|
||||||
|
using SizeCallback = std::function<void(uint32_t messagecount)>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starting and stopping a consumer
|
||||||
|
*/
|
||||||
|
using ConsumeCallback = std::function<void(const std::string &consumer)>;
|
||||||
|
using CancelCallback = std::function<void(const std::string &consumer)>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receiving messages, either via consume(), get() or as returned messages
|
||||||
|
* The following methods receive the returned message in multiple parts
|
||||||
|
*/
|
||||||
|
using StartCallback = std::function<void(const std::string &exchange, const std::string &routingkey)>;
|
||||||
|
using HeaderCallback = std::function<void(const MetaData &metaData)>;
|
||||||
|
using DataCallback = std::function<void(const char *data, size_t size)>;
|
||||||
|
using DeliveredCallback = std::function<void(uint64_t deliveryTag, bool redelivered)>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For returned messages amqp-cpp first calls a return-callback before the start,
|
||||||
|
* header and data callbacks are called. Instead of the deliver-callback, a
|
||||||
|
* returned-callback is called.
|
||||||
|
*/
|
||||||
|
using ReturnCallback = std::function<void(int16_t code, const std::string &message)>;
|
||||||
|
using ReturnedCallback = std::function<void()>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If you do not want to merge all data into a single string, you can als
|
||||||
|
* implement callbacks that return the collected message.
|
||||||
|
*/
|
||||||
|
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
|
||||||
|
using BounceCallback = std::function<void(const Message &message, int16_t code, const std::string &description)>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* End namespace
|
* End namespace
|
||||||
|
|
|
||||||
|
|
@ -341,17 +341,42 @@ public:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Publish a message to an exchange
|
* Publish a message to an exchange
|
||||||
*
|
*
|
||||||
|
* You have to supply the name of an exchange and a routing key. RabbitMQ will then try
|
||||||
|
* to send the message to one or more queues. With the optional flags parameter you can
|
||||||
|
* specify what should happen if the message could not be routed to a queue. By default,
|
||||||
|
* unroutable message are silently discarded.
|
||||||
|
*
|
||||||
|
* This method returns a reference to a DeferredPublisher object. You can use this returned
|
||||||
|
* object to install callbacks that are called when an undeliverable message is returned, or
|
||||||
|
* to set the callback that is called when the server confirms that the message was received.
|
||||||
|
*
|
||||||
|
* To enable handling returned messages, or to enable publisher-confirms, you must not only
|
||||||
|
* set the callback, but also pass in appropriate flags to enable this feature. If you do not
|
||||||
|
* pass in these flags, your callbacks will not be called. If you are not at all interested
|
||||||
|
* in returned messages or publish-confirms, you can ignore the flag and the returned object.
|
||||||
|
*
|
||||||
|
* Watch out: the channel returns _the same_ DeferredPublisher object for all calls to the
|
||||||
|
* publish() method. This means that the callbacks that you install for the first published
|
||||||
|
* message are also used for subsequent messages _and_ it means that if you install a different
|
||||||
|
* callback for a later publish operation, it overwrites your earlier callbacks
|
||||||
|
*
|
||||||
|
* The following flags can be supplied:
|
||||||
|
*
|
||||||
|
* - mandatory If set, server returns messages that are not sent to a queue
|
||||||
|
* - immediate If set, server returns messages that can not immediately be forwarded to a consumer.
|
||||||
|
*
|
||||||
* @param exchange the exchange to publish to
|
* @param exchange the exchange to publish to
|
||||||
* @param routingkey the routing key
|
* @param routingkey the routing key
|
||||||
* @param envelope the full envelope to send
|
* @param envelope the full envelope to send
|
||||||
* @param message the message to send
|
* @param message the message to send
|
||||||
* @param size size of the message
|
* @param size size of the message
|
||||||
|
* @param flags optional flags
|
||||||
*/
|
*/
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); }
|
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); }
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size())); }
|
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); }
|
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); }
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); }
|
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the Quality of Service (QOS) for this channel
|
* Set the Quality of Service (QOS) for this channel
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ namespace AMQP {
|
||||||
/**
|
/**
|
||||||
* Forward declarations
|
* Forward declarations
|
||||||
*/
|
*/
|
||||||
class DeferredConsumerBase;
|
class DeferredReceiver;
|
||||||
class BasicDeliverFrame;
|
class BasicDeliverFrame;
|
||||||
class DeferredConsumer;
|
class DeferredConsumer;
|
||||||
class BasicGetOKFrame;
|
class BasicGetOKFrame;
|
||||||
|
|
@ -44,6 +44,7 @@ class DeferredDelete;
|
||||||
class DeferredCancel;
|
class DeferredCancel;
|
||||||
class DeferredQueue;
|
class DeferredQueue;
|
||||||
class DeferredGet;
|
class DeferredGet;
|
||||||
|
class DeferredPublisher;
|
||||||
class Connection;
|
class Connection;
|
||||||
class Envelope;
|
class Envelope;
|
||||||
class Table;
|
class Table;
|
||||||
|
|
@ -74,10 +75,16 @@ private:
|
||||||
ErrorCallback _errorCallback;
|
ErrorCallback _errorCallback;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handlers for all consumers that are active
|
* Handler that deals with incoming messages as a result of publish operations
|
||||||
* @var std::map<std::string,std::shared_ptr<DeferredConsumerBase>
|
* @var std::shared_ptr<DeferredPublisher>
|
||||||
*/
|
*/
|
||||||
std::map<std::string,std::shared_ptr<DeferredConsumerBase>> _consumers;
|
std::shared_ptr<DeferredPublisher> _publisher;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handlers for all consumers that are active
|
||||||
|
* @var std::map<std::string,std::shared_ptr<DeferredConsumer>
|
||||||
|
*/
|
||||||
|
std::map<std::string,std::shared_ptr<DeferredConsumer>> _consumers;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pointer to the oldest deferred result (the first one that is going
|
* Pointer to the oldest deferred result (the first one that is going
|
||||||
|
|
@ -129,10 +136,10 @@ private:
|
||||||
bool _synchronous = false;
|
bool _synchronous = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The current consumer receiving a message
|
* The current object that is busy receiving a message
|
||||||
* @var std::shared_ptr<DeferredConsumerBase>
|
* @var std::shared_ptr<DeferredReceiver>
|
||||||
*/
|
*/
|
||||||
std::shared_ptr<DeferredConsumerBase> _consumer;
|
std::shared_ptr<DeferredReceiver> _receiver;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Attach the connection
|
* Attach the connection
|
||||||
|
|
@ -396,16 +403,17 @@ public:
|
||||||
* Publish a message to an exchange
|
* Publish a message to an exchange
|
||||||
*
|
*
|
||||||
* If the mandatory or immediate flag is set, and the message could not immediately
|
* If the mandatory or immediate flag is set, and the message could not immediately
|
||||||
* be published, the message will be returned to the client. However, the AMQP-CPP
|
* be published, the message will be returned to the client.
|
||||||
* library does not yet report such returned messages.
|
|
||||||
*
|
*
|
||||||
* @param exchange the exchange to publish to
|
* @param exchange the exchange to publish to
|
||||||
* @param routingkey the routing key
|
* @param routingkey the routing key
|
||||||
* @param envelope the full envelope to send
|
* @param envelope the full envelope to send
|
||||||
* @param message the message to send
|
* @param message the message to send
|
||||||
* @param size size of the message
|
* @param size size of the message
|
||||||
|
* @param flags optional flags
|
||||||
|
* @return DeferredPublisher
|
||||||
*/
|
*/
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope);
|
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the Quality of Service (QOS) of the entire connection
|
* Set the Quality of Service (QOS) of the entire connection
|
||||||
|
|
@ -659,18 +667,23 @@ public:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Install a consumer
|
* Install a consumer
|
||||||
*
|
|
||||||
* @param consumertag The consumer tag
|
* @param consumertag The consumer tag
|
||||||
* @param consumer The consumer handler
|
* @param consumer The consumer object
|
||||||
* @param active Is this the new active consumer
|
|
||||||
*/
|
*/
|
||||||
void install(std::string consumertag, const std::shared_ptr<DeferredConsumerBase> &consumer, bool active = false)
|
void install(const std::string &consumertag, const std::shared_ptr<DeferredConsumer> &consumer)
|
||||||
{
|
{
|
||||||
// install the consumer handler
|
// install the consumer handler
|
||||||
_consumers[consumertag] = consumer;
|
_consumers[consumertag] = consumer;
|
||||||
|
}
|
||||||
|
|
||||||
// should we become the current consumer?
|
/**
|
||||||
if (active) _consumer = consumer;
|
* Install the current consumer
|
||||||
|
* @param receiver The receiver object
|
||||||
|
*/
|
||||||
|
void install(const std::shared_ptr<DeferredReceiver> &receiver)
|
||||||
|
{
|
||||||
|
// store object as current receiver
|
||||||
|
_receiver = receiver;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -684,23 +697,23 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process incoming delivery
|
* Fetch the receiver for a specific consumer tag
|
||||||
*
|
* @param consumertag the consumer tag
|
||||||
* @param frame The frame to process
|
* @return the receiver object
|
||||||
*/
|
*/
|
||||||
void process(BasicDeliverFrame &frame);
|
DeferredConsumer *consumer(const std::string &consumertag) const;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve the current consumer handler
|
* Retrieve the current object that is receiving a message
|
||||||
*
|
|
||||||
* @return The handler responsible for the current message
|
* @return The handler responsible for the current message
|
||||||
*/
|
*/
|
||||||
DeferredConsumerBase *consumer();
|
DeferredReceiver *receiver() const { return _receiver.get(); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mark the current consumer as done
|
* Retrieve the deferred publisher that handles returned messages
|
||||||
|
* @return The deferred publisher object
|
||||||
*/
|
*/
|
||||||
void complete();
|
DeferredPublisher *publisher() const { return _publisher.get(); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The channel class is its friend, thus can it instantiate this object
|
* The channel class is its friend, thus can it instantiate this object
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@
|
||||||
*
|
*
|
||||||
* Deferred callback for consumers
|
* Deferred callback for consumers
|
||||||
*
|
*
|
||||||
* @copyright 2014 - 2017 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -14,17 +14,22 @@
|
||||||
/**
|
/**
|
||||||
* Dependencies
|
* Dependencies
|
||||||
*/
|
*/
|
||||||
#include "deferredconsumerbase.h"
|
#include "deferredextreceiver.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
*/
|
*/
|
||||||
namespace AMQP {
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forward declararions
|
||||||
|
*/
|
||||||
|
class BasicDeliverFrame;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* We extend from the default deferred and add extra functionality
|
* We extend from the default deferred and add extra functionality
|
||||||
*/
|
*/
|
||||||
class DeferredConsumer : public DeferredConsumerBase
|
class DeferredConsumer : public DeferredExtReceiver, public std::enable_shared_from_this<DeferredConsumer>
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
|
|
@ -33,6 +38,13 @@ private:
|
||||||
*/
|
*/
|
||||||
ConsumeCallback _consumeCallback;
|
ConsumeCallback _consumeCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a delivery frame
|
||||||
|
*
|
||||||
|
* @param frame The frame to process
|
||||||
|
*/
|
||||||
|
void process(BasicDeliverFrame &frame);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report success for frames that report start consumer operations
|
* Report success for frames that report start consumer operations
|
||||||
* @param name Consumer tag that is started
|
* @param name Consumer tag that is started
|
||||||
|
|
@ -41,12 +53,10 @@ private:
|
||||||
virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) override;
|
virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) override;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Announce that a message has been received
|
* Get reference to self to prevent that object falls out of scope
|
||||||
* @param message The message to announce
|
* @return std::shared_ptr
|
||||||
* @param deliveryTag The delivery tag (for ack()ing)
|
|
||||||
* @param redelivered Is this a redelivered message
|
|
||||||
*/
|
*/
|
||||||
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const override;
|
virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The channel implementation may call our
|
* The channel implementation may call our
|
||||||
|
|
@ -54,11 +64,11 @@ private:
|
||||||
*/
|
*/
|
||||||
friend class ChannelImpl;
|
friend class ChannelImpl;
|
||||||
friend class ConsumedMessage;
|
friend class ConsumedMessage;
|
||||||
|
friend class BasicDeliverFrame;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Protected constructor that can only be called
|
* Constructor that should only be called from within the channel implementation
|
||||||
* from within the channel implementation
|
|
||||||
*
|
*
|
||||||
* Note: this constructor _should_ be protected, but because make_shared
|
* Note: this constructor _should_ be protected, but because make_shared
|
||||||
* will then not work, we have decided to make it public after all,
|
* will then not work, we have decided to make it public after all,
|
||||||
|
|
@ -68,7 +78,7 @@ public:
|
||||||
* @param failed are we already failed?
|
* @param failed are we already failed?
|
||||||
*/
|
*/
|
||||||
DeferredConsumer(ChannelImpl *channel, bool failed = false) :
|
DeferredConsumer(ChannelImpl *channel, bool failed = false) :
|
||||||
DeferredConsumerBase(failed, channel) {}
|
DeferredExtReceiver(failed, channel) {}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
|
|
@ -156,15 +166,46 @@ public:
|
||||||
* @param callback The callback to invoke
|
* @param callback The callback to invoke
|
||||||
* @return Same object for chaining
|
* @return Same object for chaining
|
||||||
*/
|
*/
|
||||||
DeferredConsumer &onBegin(const BeginCallback &callback)
|
DeferredConsumer &onBegin(const StartCallback &callback)
|
||||||
{
|
{
|
||||||
// store callback
|
// store callback
|
||||||
_beginCallback = callback;
|
_startCallback = callback;
|
||||||
|
|
||||||
// allow chaining
|
// allow chaining
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the function that is called when the start frame of a new
|
||||||
|
* consumed message is received
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredConsumer &onStart(const StartCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_startCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a function that is called when the message size is known
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke for message headers
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredConsumer &onSize(const SizeCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_sizeCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register the function that is called when message headers come in
|
* Register the function that is called when message headers come in
|
||||||
*
|
*
|
||||||
|
|
@ -208,10 +249,25 @@ public:
|
||||||
* @param callback The callback to invoke
|
* @param callback The callback to invoke
|
||||||
* @return Same object for chaining
|
* @return Same object for chaining
|
||||||
*/
|
*/
|
||||||
DeferredConsumer &onComplete(const CompleteCallback &callback)
|
DeferredConsumer &onComplete(const DeliveredCallback &callback)
|
||||||
{
|
{
|
||||||
// store callback
|
// store callback
|
||||||
_completeCallback = callback;
|
_deliveredCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a funtion to be called when a message was completely received
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredConsumer &onDelivered(const DeliveredCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_deliveredCallback = callback;
|
||||||
|
|
||||||
// allow chaining
|
// allow chaining
|
||||||
return *this;
|
return *this;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,91 @@
|
||||||
|
/**
|
||||||
|
* DeferredExtReceiver.h
|
||||||
|
*
|
||||||
|
* Extended receiver that _wants_ to receive message (because it is
|
||||||
|
* consuming or get'ting messages. This is the base class for both
|
||||||
|
* the DeferredConsumer as well as the DeferredGet classes, but not
|
||||||
|
* the base of the DeferredPublisher (which can also receive returned
|
||||||
|
* messages, but not as a result of an explicit request)
|
||||||
|
*
|
||||||
|
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||||
|
* @copyright 2018 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Include guard
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dependencies
|
||||||
|
*/
|
||||||
|
#include "deferredreceiver.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin of namespace
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class definition
|
||||||
|
*/
|
||||||
|
class DeferredExtReceiver : public DeferredReceiver
|
||||||
|
{
|
||||||
|
protected:
|
||||||
|
/**
|
||||||
|
* The delivery tag for the current message
|
||||||
|
* @var uint64_t
|
||||||
|
*/
|
||||||
|
uint64_t _deliveryTag = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this a redelivered message
|
||||||
|
* @var bool
|
||||||
|
*/
|
||||||
|
bool _redelivered = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback for incoming messages
|
||||||
|
* @var MessageCallback
|
||||||
|
*/
|
||||||
|
MessageCallback _messageCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback for when a message was complete finished
|
||||||
|
* @var DeliveredCallback
|
||||||
|
*/
|
||||||
|
DeliveredCallback _deliveredCallback;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the object to send out a message
|
||||||
|
* @param exchange the exchange to which the message was published
|
||||||
|
* @param routingkey the routing key that was used to publish the message
|
||||||
|
*/
|
||||||
|
virtual void initialize(const std::string &exchange, const std::string &routingkey) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicate that a message was done
|
||||||
|
*/
|
||||||
|
virtual void complete() override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param failed Have we already failed?
|
||||||
|
* @param channel The channel we are consuming on
|
||||||
|
*/
|
||||||
|
DeferredExtReceiver(bool failed, ChannelImpl *channel) :
|
||||||
|
DeferredReceiver(failed, channel) {}
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Destructor
|
||||||
|
*/
|
||||||
|
virtual ~DeferredExtReceiver() = default;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of namespace
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -2,7 +2,7 @@
|
||||||
* DeferredGet.h
|
* DeferredGet.h
|
||||||
*
|
*
|
||||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||||
* @copyright 2014 - 2017 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -13,7 +13,7 @@
|
||||||
/**
|
/**
|
||||||
* Dependencies
|
* Dependencies
|
||||||
*/
|
*/
|
||||||
#include "deferredconsumerbase.h"
|
#include "deferredextreceiver.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
|
|
@ -27,7 +27,7 @@ namespace AMQP {
|
||||||
* it grabs a self-pointer when the callback is running, otherwise the onFinalize()
|
* it grabs a self-pointer when the callback is running, otherwise the onFinalize()
|
||||||
* is called before the actual message is consumed.
|
* is called before the actual message is consumed.
|
||||||
*/
|
*/
|
||||||
class DeferredGet : public DeferredConsumerBase
|
class DeferredGet : public DeferredExtReceiver, public std::enable_shared_from_this<DeferredGet>
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
|
|
@ -40,7 +40,7 @@ private:
|
||||||
* Callback with the number of messages still in the queue
|
* Callback with the number of messages still in the queue
|
||||||
* @var SizeCallback
|
* @var SizeCallback
|
||||||
*/
|
*/
|
||||||
SizeCallback _sizeCallback;
|
SizeCallback _countCallback;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report success for a get operation
|
* Report success for a get operation
|
||||||
|
|
@ -57,12 +57,15 @@ private:
|
||||||
virtual const std::shared_ptr<Deferred> &reportSuccess() const override;
|
virtual const std::shared_ptr<Deferred> &reportSuccess() const override;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Announce that a message has been received
|
* Get reference to self to prevent that object falls out of scope
|
||||||
* @param message The message to announce
|
* @return std::shared_ptr
|
||||||
* @param deliveryTag The delivery tag (for ack()ing)
|
|
||||||
* @param redelivered Is this a redelivered message
|
|
||||||
*/
|
*/
|
||||||
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const override;
|
virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extended implementation of the complete method that is called when a message was fully received
|
||||||
|
*/
|
||||||
|
virtual void complete() override;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The channel implementation may call our
|
* The channel implementation may call our
|
||||||
|
|
@ -84,61 +87,9 @@ public:
|
||||||
* @param failed are we already failed?
|
* @param failed are we already failed?
|
||||||
*/
|
*/
|
||||||
DeferredGet(ChannelImpl *channel, bool failed = false) :
|
DeferredGet(ChannelImpl *channel, bool failed = false) :
|
||||||
DeferredConsumerBase(failed, channel) {}
|
DeferredExtReceiver(failed, channel) {}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
|
||||||
* Register the function to be called when a new message is expected
|
|
||||||
*
|
|
||||||
* @param callback The callback to invoke
|
|
||||||
* @return Same object for chaining
|
|
||||||
*/
|
|
||||||
DeferredGet &onBegin(const BeginCallback &callback)
|
|
||||||
{
|
|
||||||
// store callback
|
|
||||||
_beginCallback = callback;
|
|
||||||
|
|
||||||
// allow chaining
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register the function to be called when message headers come in
|
|
||||||
*
|
|
||||||
* @param callback The callback to invoke for message headers
|
|
||||||
* @return Same object for chaining
|
|
||||||
*/
|
|
||||||
DeferredGet &onHeaders(const HeaderCallback &callback)
|
|
||||||
{
|
|
||||||
// store callback
|
|
||||||
_headerCallback = callback;
|
|
||||||
|
|
||||||
// allow chaining
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register the function to be called when a chunk of data comes in
|
|
||||||
*
|
|
||||||
* Note that this function may be called zero, one or multiple times
|
|
||||||
* for each incoming message depending on the size of the message data.
|
|
||||||
*
|
|
||||||
* If you install this callback you very likely also want to install
|
|
||||||
* the onComplete callback so you know when the last data part was
|
|
||||||
* received.
|
|
||||||
*
|
|
||||||
* @param callback The callback to invoke for chunks of message data
|
|
||||||
* @return Same object for chaining
|
|
||||||
*/
|
|
||||||
DeferredGet &onData(const DataCallback &callback)
|
|
||||||
{
|
|
||||||
// store callback
|
|
||||||
_dataCallback = callback;
|
|
||||||
|
|
||||||
// allow chaining
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a function to be called when a message arrives
|
* Register a function to be called when a message arrives
|
||||||
* This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it
|
* This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it
|
||||||
|
|
@ -195,13 +146,95 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a function to be called when size information is known
|
* Register a function to be called when queue size information is known
|
||||||
* @param callback the callback to execute
|
* @param callback the callback to execute
|
||||||
*/
|
*/
|
||||||
|
DeferredGet &onCount(const SizeCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_countCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the function to be called when a new message is expected
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredGet &onBegin(const StartCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_startCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the function to be called when a new message is expected
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredGet &onStart(const StartCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_startCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a function that is called when the message size is known
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke for message headers
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
DeferredGet &onSize(const SizeCallback &callback)
|
DeferredGet &onSize(const SizeCallback &callback)
|
||||||
{
|
{
|
||||||
// store callback
|
// store callback
|
||||||
_sizeCallback = callback;
|
_sizeCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the function to be called when message headers come in
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke for message headers
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredGet &onHeaders(const HeaderCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_headerCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the function to be called when a chunk of data comes in
|
||||||
|
*
|
||||||
|
* Note that this function may be called zero, one or multiple times
|
||||||
|
* for each incoming message depending on the size of the message data.
|
||||||
|
*
|
||||||
|
* If you install this callback you very likely also want to install
|
||||||
|
* the onComplete callback so you know when the last data part was
|
||||||
|
* received.
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke for chunks of message data
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredGet &onData(const DataCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_dataCallback = callback;
|
||||||
|
|
||||||
// allow chaining
|
// allow chaining
|
||||||
return *this;
|
return *this;
|
||||||
|
|
@ -213,10 +246,25 @@ public:
|
||||||
* @param callback The callback to invoke
|
* @param callback The callback to invoke
|
||||||
* @return Same object for chaining
|
* @return Same object for chaining
|
||||||
*/
|
*/
|
||||||
DeferredGet &onComplete(const CompleteCallback &callback)
|
DeferredGet &onComplete(const DeliveredCallback &callback)
|
||||||
{
|
{
|
||||||
// store callback
|
// store callback
|
||||||
_completeCallback = callback;
|
_deliveredCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a funtion to be called when a message was completely received
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredGet &onDelivered(const DeliveredCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_deliveredCallback = callback;
|
||||||
|
|
||||||
// allow chaining
|
// allow chaining
|
||||||
return *this;
|
return *this;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,241 @@
|
||||||
|
/**
|
||||||
|
* DeferredPublisher.h
|
||||||
|
*
|
||||||
|
* Class that is returned when channel::publish() is called, and that
|
||||||
|
* can be used to install callback methods that define how returned
|
||||||
|
* messages should be handled.
|
||||||
|
*
|
||||||
|
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||||
|
* @copyright 2018 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Include guard
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin of namespace
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forward declarations
|
||||||
|
*/
|
||||||
|
class ChannelImpl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class definition
|
||||||
|
*/
|
||||||
|
class DeferredPublisher : public DeferredReceiver, public std::enable_shared_from_this<DeferredPublisher>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
/**
|
||||||
|
* The error code
|
||||||
|
* @var int16_t
|
||||||
|
*/
|
||||||
|
int16_t _code = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The error message
|
||||||
|
* @var std::string
|
||||||
|
*/
|
||||||
|
std::string _description;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback that is called when a message is returned
|
||||||
|
* @var BounceCallback
|
||||||
|
*/
|
||||||
|
BounceCallback _bounceCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin of a bounced message
|
||||||
|
* @var ReturnCallback
|
||||||
|
*/
|
||||||
|
ReturnCallback _beginCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of a bounced message
|
||||||
|
* @var ReturnedCallback
|
||||||
|
*/
|
||||||
|
ReturnedCallback _completeCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a return frame
|
||||||
|
*
|
||||||
|
* @param frame The frame to process
|
||||||
|
*/
|
||||||
|
void process(BasicReturnFrame &frame);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get reference to self to prevent that object falls out of scope
|
||||||
|
* @return std::shared_ptr
|
||||||
|
*/
|
||||||
|
virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extended implementation of the complete method that is called when a message was fully received
|
||||||
|
*/
|
||||||
|
virtual void complete() override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Classes that can access private members
|
||||||
|
*/
|
||||||
|
friend class BasicReturnFrame;
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Constructor that should only be called from within the channel implementation
|
||||||
|
*
|
||||||
|
* Note: this constructor _should_ be protected, but because make_shared
|
||||||
|
* will then not work, we have decided to make it public after all,
|
||||||
|
* because the work-around would result in not-so-easy-to-read code.
|
||||||
|
*
|
||||||
|
* @param channel the channel implementation
|
||||||
|
* @param failed are we already failed?
|
||||||
|
*/
|
||||||
|
DeferredPublisher(ChannelImpl *channel, bool failed = false) :
|
||||||
|
DeferredReceiver(failed, channel) {}
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Register a function to be called when a full message is returned
|
||||||
|
* @param callback the callback to execute
|
||||||
|
*/
|
||||||
|
DeferredPublisher &onReceived(const BounceCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_bounceCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Alias for onReceived() (see above)
|
||||||
|
* @param callback the callback to execute
|
||||||
|
*/
|
||||||
|
DeferredPublisher &onMessage(const BounceCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_bounceCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Alias for onReceived() (see above)
|
||||||
|
* @param callback the callback to execute
|
||||||
|
*/
|
||||||
|
DeferredPublisher &onReturned(const BounceCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_bounceCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Alias for onReceived() (see above)
|
||||||
|
* @param callback the callback to execute
|
||||||
|
*/
|
||||||
|
DeferredPublisher &onBounced(const BounceCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_bounceCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the function that is called when the start frame of a new
|
||||||
|
* consumed message is received
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredPublisher &onBegin(const ReturnCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_beginCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a function that is called when the message size is known
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke for message headers
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredPublisher &onSize(const SizeCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_sizeCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the function that is called when message headers come in
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke for message headers
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredPublisher &onHeaders(const HeaderCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_headerCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the function to be called when a chunk of data comes in
|
||||||
|
*
|
||||||
|
* Note that this function may be called zero, one or multiple times
|
||||||
|
* for each incoming message depending on the size of the message data.
|
||||||
|
*
|
||||||
|
* If you install this callback you very likely also want to install
|
||||||
|
* the onComplete callback so you know when the last data part was
|
||||||
|
* received.
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke for chunks of message data
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredPublisher &onData(const DataCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_dataCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a funtion to be called when a message was completely received
|
||||||
|
*
|
||||||
|
* @param callback The callback to invoke
|
||||||
|
* @return Same object for chaining
|
||||||
|
*/
|
||||||
|
DeferredPublisher &onComplete(const ReturnedCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_completeCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of namespace
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
/**
|
/**
|
||||||
* deferredconsumerbase.h
|
* DeferredReceiver.h
|
||||||
*
|
*
|
||||||
* Base class for the deferred consumer and the
|
* Base class for the deferred consumer, the deferred get and the
|
||||||
* deferred get.
|
* deferred publisher (that may receive returned messages)
|
||||||
*
|
*
|
||||||
* @copyright 2016 - 2017 Copernica B.V.
|
* @copyright 2016 - 2018 Copernica B.V.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -35,9 +35,7 @@ class BodyFrame;
|
||||||
/**
|
/**
|
||||||
* Base class for deferred consumers
|
* Base class for deferred consumers
|
||||||
*/
|
*/
|
||||||
class DeferredConsumerBase :
|
class DeferredReceiver : public Deferred
|
||||||
public Deferred,
|
|
||||||
public std::enable_shared_from_this<DeferredConsumerBase>
|
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
|
|
@ -46,20 +44,27 @@ private:
|
||||||
*/
|
*/
|
||||||
uint64_t _bodySize = 0;
|
uint64_t _bodySize = 0;
|
||||||
|
|
||||||
/**
|
|
||||||
* Process a delivery frame
|
|
||||||
*
|
|
||||||
* @param frame The frame to process
|
|
||||||
*/
|
|
||||||
void process(BasicDeliverFrame &frame);
|
|
||||||
|
|
||||||
|
protected:
|
||||||
/**
|
/**
|
||||||
* Process a delivery frame from a get request
|
* Initialize the object to send out a message
|
||||||
*
|
* @param exchange the exchange to which the message was published
|
||||||
* @param frame The frame to process
|
* @param routingkey the routing key that was used to publish the message
|
||||||
*/
|
*/
|
||||||
void process(BasicGetOKFrame &frame);
|
virtual void initialize(const std::string &exchange, const std::string &routingkey);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get reference to self to prevent that object falls out of scope
|
||||||
|
* @return std::shared_ptr
|
||||||
|
*/
|
||||||
|
virtual std::shared_ptr<DeferredReceiver> lock() = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicate that a message was done
|
||||||
|
*/
|
||||||
|
virtual void complete() = 0;
|
||||||
|
|
||||||
|
private:
|
||||||
/**
|
/**
|
||||||
* Process the message headers
|
* Process the message headers
|
||||||
*
|
*
|
||||||
|
|
@ -74,40 +79,15 @@ private:
|
||||||
*/
|
*/
|
||||||
void process(BodyFrame &frame);
|
void process(BodyFrame &frame);
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicate that a message was done
|
|
||||||
*/
|
|
||||||
void complete();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Announce that a message has been received
|
|
||||||
* @param message The message to announce
|
|
||||||
* @param deliveryTag The delivery tag (for ack()ing)
|
|
||||||
* @param redelivered Is this a redelivered message
|
|
||||||
*/
|
|
||||||
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const = 0;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Frames may be processed
|
* Frames may be processed
|
||||||
*/
|
*/
|
||||||
friend class ChannelImpl;
|
friend class ChannelImpl;
|
||||||
friend class BasicDeliverFrame;
|
|
||||||
friend class BasicGetOKFrame;
|
friend class BasicGetOKFrame;
|
||||||
friend class BasicHeaderFrame;
|
friend class BasicHeaderFrame;
|
||||||
friend class BodyFrame;
|
friend class BodyFrame;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
/**
|
|
||||||
* The delivery tag for the current message
|
|
||||||
* @var uint64_t
|
|
||||||
*/
|
|
||||||
uint64_t _deliveryTag = 0;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Is this a redelivered message
|
|
||||||
* @var bool
|
|
||||||
*/
|
|
||||||
bool _redelivered = false;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The channel to which the consumer is linked
|
* The channel to which the consumer is linked
|
||||||
* @var ChannelImpl
|
* @var ChannelImpl
|
||||||
|
|
@ -116,9 +96,15 @@ protected:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback for new message
|
* Callback for new message
|
||||||
* @var BeginCallback
|
* @var StartCallback
|
||||||
*/
|
*/
|
||||||
BeginCallback _beginCallback;
|
StartCallback _startCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback that is called when size of the message is known
|
||||||
|
* @var SizeCallback
|
||||||
|
*/
|
||||||
|
SizeCallback _sizeCallback;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback for incoming headers
|
* Callback for incoming headers
|
||||||
|
|
@ -132,18 +118,6 @@ protected:
|
||||||
*/
|
*/
|
||||||
DataCallback _dataCallback;
|
DataCallback _dataCallback;
|
||||||
|
|
||||||
/**
|
|
||||||
* Callback for incoming messages
|
|
||||||
* @var MessageCallback
|
|
||||||
*/
|
|
||||||
MessageCallback _messageCallback;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Callback for when a message was complete finished
|
|
||||||
* @var CompleteCallback
|
|
||||||
*/
|
|
||||||
CompleteCallback _completeCallback;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The message that we are currently receiving
|
* The message that we are currently receiving
|
||||||
* @var stack_ptr<Message>
|
* @var stack_ptr<Message>
|
||||||
|
|
@ -152,12 +126,17 @@ protected:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
*
|
|
||||||
* @param failed Have we already failed?
|
* @param failed Have we already failed?
|
||||||
* @param channel The channel we are consuming on
|
* @param channel The channel we are consuming on
|
||||||
*/
|
*/
|
||||||
DeferredConsumerBase(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {}
|
DeferredReceiver(bool failed, ChannelImpl *channel) :
|
||||||
|
Deferred(failed), _channel(channel) {}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
/**
|
||||||
|
* Destructor
|
||||||
|
*/
|
||||||
|
virtual ~DeferredReceiver() = default;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -7,7 +7,7 @@
|
||||||
* Message objects can not be constructed by end users, they are only constructed
|
* Message objects can not be constructed by end users, they are only constructed
|
||||||
* by the AMQP library, and passed to user callbacks.
|
* by the AMQP library, and passed to user callbacks.
|
||||||
*
|
*
|
||||||
* @copyright 2014 - 2017 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -31,7 +31,7 @@ namespace AMQP {
|
||||||
/**
|
/**
|
||||||
* Forward declarations
|
* Forward declarations
|
||||||
*/
|
*/
|
||||||
class DeferredConsumerBase;
|
class DeferredReceiver;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class definition
|
* Class definition
|
||||||
|
|
@ -61,7 +61,7 @@ protected:
|
||||||
/**
|
/**
|
||||||
* We are an open book to the consumer handler
|
* We are an open book to the consumer handler
|
||||||
*/
|
*/
|
||||||
friend class DeferredConsumerBase;
|
friend class DeferredReceiver;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the body size
|
* Set the body size
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,9 @@ add_sources(
|
||||||
consumedmessage.h
|
consumedmessage.h
|
||||||
deferredcancel.cpp
|
deferredcancel.cpp
|
||||||
deferredconsumer.cpp
|
deferredconsumer.cpp
|
||||||
deferredconsumerbase.cpp
|
deferredreceiver.cpp
|
||||||
|
deferredextreceiver.cpp
|
||||||
|
deferredpublisher.cpp
|
||||||
deferredget.cpp
|
deferredget.cpp
|
||||||
exchangebindframe.h
|
exchangebindframe.h
|
||||||
exchangebindokframe.h
|
exchangebindokframe.h
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
/**
|
/**
|
||||||
* Class describing a basic deliver frame
|
* Class describing a basic deliver frame
|
||||||
*
|
*
|
||||||
* @copyright 2014 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -16,6 +16,7 @@
|
||||||
#include "amqpcpp/stringfield.h"
|
#include "amqpcpp/stringfield.h"
|
||||||
#include "amqpcpp/booleanset.h"
|
#include "amqpcpp/booleanset.h"
|
||||||
#include "amqpcpp/connectionimpl.h"
|
#include "amqpcpp/connectionimpl.h"
|
||||||
|
#include "amqpcpp/deferredconsumer.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
|
|
@ -193,8 +194,14 @@ public:
|
||||||
// channel does not exist
|
// channel does not exist
|
||||||
if (!channel) return false;
|
if (!channel) return false;
|
||||||
|
|
||||||
// construct the message
|
// get the appropriate consumer object
|
||||||
channel->process(*this);
|
auto consumer = channel->consumer(_consumerTag);
|
||||||
|
|
||||||
|
// skip if there was no consumer for this tag
|
||||||
|
if (consumer == nullptr) return false;
|
||||||
|
|
||||||
|
// initialize the object, because we're about to receive a message
|
||||||
|
consumer->process(*this);
|
||||||
|
|
||||||
// done
|
// done
|
||||||
return true;
|
return true;
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
/**
|
/**
|
||||||
* Class describing a basic get ok frame
|
* Class describing a basic get ok frame
|
||||||
*
|
*
|
||||||
* @copyright 2014 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -170,14 +170,17 @@ public:
|
||||||
// channel does not exist
|
// channel does not exist
|
||||||
if (!channel) return false;
|
if (!channel) return false;
|
||||||
|
|
||||||
// report success for the get operation
|
// report success for the get operation (this will also update the current receiver!)
|
||||||
channel->reportSuccess(messageCount(), deliveryTag(), redelivered());
|
channel->reportSuccess(messageCount(), _deliveryTag, redelivered());
|
||||||
|
|
||||||
// check if we have a valid consumer
|
// get the current receiver object
|
||||||
if (!channel->consumer()) return false;
|
auto *receiver = channel->receiver();
|
||||||
|
|
||||||
// pass on to consumer
|
// check if we have a valid receiver
|
||||||
channel->consumer()->process(*this);
|
if (receiver == nullptr) return false;
|
||||||
|
|
||||||
|
// initialize the receiver for the upcoming message
|
||||||
|
receiver->initialize(_exchange, _routingKey);
|
||||||
|
|
||||||
// done
|
// done
|
||||||
return true;
|
return true;
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
/**
|
/**
|
||||||
* Class describing an AMQP basic header frame
|
* Class describing an AMQP basic header frame
|
||||||
*
|
*
|
||||||
* @copyright 2014 - 2017 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -16,7 +16,7 @@
|
||||||
#include "amqpcpp/metadata.h"
|
#include "amqpcpp/metadata.h"
|
||||||
#include "amqpcpp/envelope.h"
|
#include "amqpcpp/envelope.h"
|
||||||
#include "amqpcpp/connectionimpl.h"
|
#include "amqpcpp/connectionimpl.h"
|
||||||
#include "amqpcpp/deferredconsumerbase.h"
|
#include "amqpcpp/deferredreceiver.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
|
|
@ -134,12 +134,18 @@ public:
|
||||||
{
|
{
|
||||||
// we need the appropriate channel
|
// we need the appropriate channel
|
||||||
auto channel = connection->channel(this->channel());
|
auto channel = connection->channel(this->channel());
|
||||||
|
|
||||||
|
// we need a channel
|
||||||
|
if (channel == nullptr) return false;
|
||||||
|
|
||||||
|
// do we have an object that is receiving this data?
|
||||||
|
auto *receiver = channel->receiver();
|
||||||
|
|
||||||
// check if we have a valid channel and consumer
|
// check if we have a valid channel and consumer
|
||||||
if (!channel || !channel->consumer()) return false;
|
if (receiver == nullptr) return false;
|
||||||
|
|
||||||
// the channel can process the frame
|
// the channel can process the frame
|
||||||
channel->consumer()->process(*this);
|
receiver->process(*this);
|
||||||
|
|
||||||
// done
|
// done
|
||||||
return true;
|
return true;
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
/**
|
/**
|
||||||
* Class describing a basic return frame
|
* Class describing a basic return frame
|
||||||
*
|
*
|
||||||
* @copyright 2014 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -155,8 +155,23 @@ public:
|
||||||
*/
|
*/
|
||||||
virtual bool process(ConnectionImpl *connection) override
|
virtual bool process(ConnectionImpl *connection) override
|
||||||
{
|
{
|
||||||
// we no longer support returned messages
|
// we need the appropriate channel
|
||||||
return false;
|
auto channel = connection->channel(this->channel());
|
||||||
|
|
||||||
|
// channel does not exist
|
||||||
|
if (!channel) return false;
|
||||||
|
|
||||||
|
// get the current publisher
|
||||||
|
auto publisher = channel->publisher();
|
||||||
|
|
||||||
|
// if there is no deferred publisher, we can just as well stop
|
||||||
|
if (publisher == nullptr) return false;
|
||||||
|
|
||||||
|
// initialize the object, because we're about to receive a message
|
||||||
|
publisher->process(*this);
|
||||||
|
|
||||||
|
// done
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
/**
|
/**
|
||||||
* Class describing an AMQP Body Frame
|
* Class describing an AMQP Body Frame
|
||||||
*
|
*
|
||||||
* @copyright 2014 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
#include "extframe.h"
|
#include "extframe.h"
|
||||||
#include "amqpcpp/connectionimpl.h"
|
#include "amqpcpp/connectionimpl.h"
|
||||||
#include "amqpcpp/deferredconsumerbase.h"
|
#include "amqpcpp/deferredreceiver.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
|
|
@ -105,12 +105,18 @@ public:
|
||||||
{
|
{
|
||||||
// we need the appropriate channel
|
// we need the appropriate channel
|
||||||
auto channel = connection->channel(this->channel());
|
auto channel = connection->channel(this->channel());
|
||||||
|
|
||||||
|
// we must have a channel object
|
||||||
|
if (channel == nullptr) return false;
|
||||||
|
|
||||||
|
// get the object that is receiving the messages
|
||||||
|
auto *receiver = channel->receiver();
|
||||||
|
|
||||||
// check if we have a valid channel and consumer
|
// check if we have a valid receiver
|
||||||
if (!channel || !channel->consumer()) return false;
|
if (receiver == nullptr) return false;
|
||||||
|
|
||||||
// the consumer may process the frame
|
// the consumer may process the frame
|
||||||
channel->consumer()->process(*this);
|
receiver->process(*this);
|
||||||
|
|
||||||
// done
|
// done
|
||||||
return true;
|
return true;
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@
|
||||||
* @copyright 2014 - 2018 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
#include "includes.h"
|
#include "includes.h"
|
||||||
#include "basicdeliverframe.h"
|
|
||||||
#include "basicgetokframe.h"
|
#include "basicgetokframe.h"
|
||||||
#include "basicreturnframe.h"
|
#include "basicreturnframe.h"
|
||||||
#include "consumedmessage.h"
|
#include "consumedmessage.h"
|
||||||
|
|
@ -452,26 +451,31 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags)
|
||||||
* @param envelope the full envelope to send
|
* @param envelope the full envelope to send
|
||||||
* @param message the message to send
|
* @param message the message to send
|
||||||
* @param size size of the message
|
* @param size size of the message
|
||||||
|
* @param flags
|
||||||
|
* @return DeferredPublisher
|
||||||
*/
|
*/
|
||||||
bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope)
|
DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
|
||||||
{
|
{
|
||||||
// we are going to send out multiple frames, each one will trigger a call to the handler,
|
// we are going to send out multiple frames, each one will trigger a call to the handler,
|
||||||
// which in turn could destruct the channel object, we need to monitor that
|
// which in turn could destruct the channel object, we need to monitor that
|
||||||
Monitor monitor(this);
|
Monitor monitor(this);
|
||||||
|
|
||||||
// @todo do not copy the entire buffer to individual frames
|
// @todo do not copy the entire buffer to individual frames
|
||||||
|
|
||||||
|
// make sure we have a deferred object to return
|
||||||
|
if (!_publisher) _publisher.reset(new DeferredPublisher(this));
|
||||||
|
|
||||||
// send the publish frame
|
// send the publish frame
|
||||||
if (!send(BasicPublishFrame(_id, exchange, routingKey))) return false;
|
if (!send(BasicPublishFrame(_id, exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return *_publisher;
|
||||||
|
|
||||||
// channel still valid?
|
// channel still valid?
|
||||||
if (!monitor.valid()) return false;
|
if (!monitor.valid()) return *_publisher;
|
||||||
|
|
||||||
// send header
|
// send header
|
||||||
if (!send(BasicHeaderFrame(_id, envelope))) return false;
|
if (!send(BasicHeaderFrame(_id, envelope))) return *_publisher;
|
||||||
|
|
||||||
// channel and connection still valid?
|
// channel and connection still valid?
|
||||||
if (!monitor.valid() || !_connection) return false;
|
if (!monitor.valid() || !_connection) return *_publisher;
|
||||||
|
|
||||||
// the max payload size is the max frame size minus the bytes for headers and trailer
|
// the max payload size is the max frame size minus the bytes for headers and trailer
|
||||||
uint32_t maxpayload = _connection->maxPayload();
|
uint32_t maxpayload = _connection->maxPayload();
|
||||||
|
|
@ -488,10 +492,10 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
|
||||||
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
|
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
|
||||||
|
|
||||||
// send out a body frame
|
// send out a body frame
|
||||||
if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return false;
|
if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return *_publisher;
|
||||||
|
|
||||||
// channel still valid?
|
// channel still valid?
|
||||||
if (!monitor.valid()) return false;
|
if (!monitor.valid()) return *_publisher;
|
||||||
|
|
||||||
// update counters
|
// update counters
|
||||||
bytessent += chunksize;
|
bytessent += chunksize;
|
||||||
|
|
@ -499,7 +503,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
|
||||||
}
|
}
|
||||||
|
|
||||||
// done
|
// done
|
||||||
return true;
|
return *_publisher;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -816,41 +820,17 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process incoming delivery
|
* Get the current receiver for a given consumer tag
|
||||||
*
|
* @param consumertag the consumer frame
|
||||||
* @param frame The frame to process
|
* @return DeferredConsumer
|
||||||
*/
|
*/
|
||||||
void ChannelImpl::process(BasicDeliverFrame &frame)
|
DeferredConsumer *ChannelImpl::consumer(const std::string &consumertag) const
|
||||||
{
|
{
|
||||||
// find the consumer for this frame
|
// look in the map
|
||||||
auto iter = _consumers.find(frame.consumerTag());
|
auto iter = _consumers.find(consumertag);
|
||||||
if (iter == _consumers.end()) return;
|
|
||||||
|
// return the result
|
||||||
// we are going to be receiving a message, store
|
return iter == _consumers.end() ? nullptr : iter->second.get();
|
||||||
// the handler for the incoming message
|
|
||||||
_consumer = iter->second;
|
|
||||||
|
|
||||||
// let the consumer process the frame
|
|
||||||
_consumer->process(frame);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieve the current consumer handler
|
|
||||||
*
|
|
||||||
* @return The handler responsible for the current message
|
|
||||||
*/
|
|
||||||
DeferredConsumerBase *ChannelImpl::consumer()
|
|
||||||
{
|
|
||||||
return _consumer.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mark the current consumer as done
|
|
||||||
*/
|
|
||||||
void ChannelImpl::complete()
|
|
||||||
{
|
|
||||||
// no more consumer
|
|
||||||
_consumer.reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,14 @@
|
||||||
/**
|
/**
|
||||||
* Base class for a message implementation
|
* Base class for a message implementation
|
||||||
*
|
*
|
||||||
* @copyright 2014 - 2017 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dependencies
|
||||||
|
*/
|
||||||
|
#include "basicdeliverframe.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -3,15 +3,34 @@
|
||||||
*
|
*
|
||||||
* Implementation file for the DeferredConsumer class
|
* Implementation file for the DeferredConsumer class
|
||||||
*
|
*
|
||||||
* @copyright 2014 - 2017 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
#include "includes.h"
|
#include "includes.h"
|
||||||
|
#include "basicdeliverframe.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Namespace
|
* Namespace
|
||||||
*/
|
*/
|
||||||
namespace AMQP {
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a delivery frame
|
||||||
|
*
|
||||||
|
* @param frame The frame to process
|
||||||
|
*/
|
||||||
|
void DeferredConsumer::process(BasicDeliverFrame &frame)
|
||||||
|
{
|
||||||
|
// this object will handle all future frames with header and body data
|
||||||
|
_channel->install(shared_from_this());
|
||||||
|
|
||||||
|
// retrieve the delivery tag and whether we were redelivered
|
||||||
|
_deliveryTag = frame.deliveryTag();
|
||||||
|
_redelivered = frame.redelivered();
|
||||||
|
|
||||||
|
// initialize the object for the next message
|
||||||
|
initialize(frame.exchange(), frame.routingKey());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report success for frames that report start consumer operations
|
* Report success for frames that report start consumer operations
|
||||||
* @param name Consumer tag that is started
|
* @param name Consumer tag that is started
|
||||||
|
|
@ -32,18 +51,6 @@ const std::shared_ptr<Deferred> &DeferredConsumer::reportSuccess(const std::stri
|
||||||
return _next;
|
return _next;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Announce that a message was received
|
|
||||||
* @param message The message to announce
|
|
||||||
* @param deliveryTag The delivery tag (for ack()ing)
|
|
||||||
* @param redelivered Is this a redelivered message
|
|
||||||
*/
|
|
||||||
void DeferredConsumer::announce(const Message &message, uint64_t deliveryTag, bool redelivered) const
|
|
||||||
{
|
|
||||||
// simply execute the message callback
|
|
||||||
_messageCallback(message, deliveryTag, redelivered);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* End namespace
|
* End namespace
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -1,142 +0,0 @@
|
||||||
/**
|
|
||||||
* deferredconsumerbase.cpp
|
|
||||||
*
|
|
||||||
* Base class for the deferred consumer and the
|
|
||||||
* deferred get.
|
|
||||||
*
|
|
||||||
* @copyright 2016 - 2017 Copernica B.V.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Dependencies
|
|
||||||
*/
|
|
||||||
#include "amqpcpp/deferredconsumerbase.h"
|
|
||||||
#include "basicdeliverframe.h"
|
|
||||||
#include "basicgetokframe.h"
|
|
||||||
#include "basicheaderframe.h"
|
|
||||||
#include "bodyframe.h"
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start namespace
|
|
||||||
*/
|
|
||||||
namespace AMQP {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process a delivery frame
|
|
||||||
*
|
|
||||||
* @param frame The frame to process
|
|
||||||
*/
|
|
||||||
void DeferredConsumerBase::process(BasicDeliverFrame &frame)
|
|
||||||
{
|
|
||||||
// retrieve the delivery tag and whether we were redelivered
|
|
||||||
_deliveryTag = frame.deliveryTag();
|
|
||||||
_redelivered = frame.redelivered();
|
|
||||||
|
|
||||||
// anybody interested in the new message?
|
|
||||||
if (_beginCallback) _beginCallback();
|
|
||||||
|
|
||||||
// do we have anybody interested in messages?
|
|
||||||
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process a delivery frame from a get request
|
|
||||||
*
|
|
||||||
* @param frame The frame to process
|
|
||||||
*/
|
|
||||||
void DeferredConsumerBase::process(BasicGetOKFrame &frame)
|
|
||||||
{
|
|
||||||
// retrieve the delivery tag and whether we were redelivered
|
|
||||||
_deliveryTag = frame.deliveryTag();
|
|
||||||
_redelivered = frame.redelivered();
|
|
||||||
|
|
||||||
// anybody interested in the new message?
|
|
||||||
if (_beginCallback) _beginCallback();
|
|
||||||
|
|
||||||
// do we have anybody interested in messages?
|
|
||||||
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process the message headers
|
|
||||||
*
|
|
||||||
* @param frame The frame to process
|
|
||||||
*/
|
|
||||||
void DeferredConsumerBase::process(BasicHeaderFrame &frame)
|
|
||||||
{
|
|
||||||
// store the body size
|
|
||||||
_bodySize = frame.bodySize();
|
|
||||||
|
|
||||||
// do we have a message?
|
|
||||||
if (_message)
|
|
||||||
{
|
|
||||||
// store the body size and metadata
|
|
||||||
_message->setBodySize(_bodySize);
|
|
||||||
_message->set(frame.metaData());
|
|
||||||
}
|
|
||||||
|
|
||||||
// anybody interested in the headers?
|
|
||||||
if (_headerCallback) _headerCallback(frame.metaData());
|
|
||||||
|
|
||||||
// no body data expected? then we are now complete
|
|
||||||
if (!_bodySize) complete();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process the message data
|
|
||||||
*
|
|
||||||
* @param frame The frame to process
|
|
||||||
*/
|
|
||||||
void DeferredConsumerBase::process(BodyFrame &frame)
|
|
||||||
{
|
|
||||||
// make sure we stay in scope
|
|
||||||
auto self = shared_from_this();
|
|
||||||
|
|
||||||
// update the bytes still to receive
|
|
||||||
_bodySize -= frame.payloadSize();
|
|
||||||
|
|
||||||
// anybody interested in the data?
|
|
||||||
if (_dataCallback) _dataCallback(frame.payload(), frame.payloadSize());
|
|
||||||
|
|
||||||
// do we have a message? then append the data
|
|
||||||
if (_message) _message->append(frame.payload(), frame.payloadSize());
|
|
||||||
|
|
||||||
// if all bytes were received we are now complete
|
|
||||||
if (!_bodySize) complete();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicate that a message was done
|
|
||||||
*/
|
|
||||||
void DeferredConsumerBase::complete()
|
|
||||||
{
|
|
||||||
// make sure we stay in scope
|
|
||||||
auto self = shared_from_this();
|
|
||||||
|
|
||||||
// also monitor the channel
|
|
||||||
Monitor monitor{ _channel };
|
|
||||||
|
|
||||||
// do we have a message?
|
|
||||||
if (_message)
|
|
||||||
{
|
|
||||||
// announce the message
|
|
||||||
announce(*_message, _deliveryTag, _redelivered);
|
|
||||||
|
|
||||||
// and destroy it
|
|
||||||
_message.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
// do we have to inform anyone about completion?
|
|
||||||
if (_completeCallback) _completeCallback(_deliveryTag, _redelivered);
|
|
||||||
|
|
||||||
// do we still have a valid channel
|
|
||||||
if (!monitor.valid()) return;
|
|
||||||
|
|
||||||
// we are now done executing
|
|
||||||
_channel->complete();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* End namespace
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,64 @@
|
||||||
|
/**
|
||||||
|
* DeferredExtReceiver.cpp
|
||||||
|
*
|
||||||
|
* Implementation file for the DeferredExtReceiver class
|
||||||
|
*
|
||||||
|
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||||
|
* @copyright 2018 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dependencies
|
||||||
|
*/
|
||||||
|
#include "amqpcpp/deferredextreceiver.h"
|
||||||
|
#include "amqpcpp/channelimpl.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin of namespace
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the object to send out a message
|
||||||
|
* @param exchange the exchange to which the message was published
|
||||||
|
* @param routingkey the routing key that was used to publish the message
|
||||||
|
*/
|
||||||
|
void DeferredExtReceiver::initialize(const std::string &exchange, const std::string &routingkey)
|
||||||
|
{
|
||||||
|
// call base
|
||||||
|
DeferredReceiver::initialize(exchange, routingkey);
|
||||||
|
|
||||||
|
// do we have anybody interested in messages? in that case we construct the message
|
||||||
|
if (_messageCallback) _message.construct(exchange, routingkey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicate that a message was done
|
||||||
|
*/
|
||||||
|
void DeferredExtReceiver::complete()
|
||||||
|
{
|
||||||
|
// also monitor the channel
|
||||||
|
Monitor monitor(_channel);
|
||||||
|
|
||||||
|
// do we have a message?
|
||||||
|
if (_message) _messageCallback(*_message, _deliveryTag, _redelivered);
|
||||||
|
|
||||||
|
// do we have to inform anyone about completion?
|
||||||
|
if (_deliveredCallback) _deliveredCallback(_deliveryTag, _redelivered);
|
||||||
|
|
||||||
|
// for the next iteration we want a new message
|
||||||
|
_message.reset();
|
||||||
|
|
||||||
|
// do we still have a valid channel
|
||||||
|
if (!monitor.valid()) return;
|
||||||
|
|
||||||
|
// we are now done executing, so the channel can forget the current receiving object
|
||||||
|
_channel->install(nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of namespace
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
* Implementation of the DeferredGet call
|
* Implementation of the DeferredGet call
|
||||||
*
|
*
|
||||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||||
* @copyright 2014 - 2017 Copernica BV
|
* @copyright 2014 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -19,23 +19,22 @@ namespace AMQP {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report success for a get operation
|
* Report success for a get operation
|
||||||
*
|
|
||||||
* @param messagecount Number of messages left in the queue
|
* @param messagecount Number of messages left in the queue
|
||||||
* @param deliveryTag Delivery tag of the message coming in
|
* @param deliveryTag Delivery tag of the message coming in
|
||||||
* @param redelivered Was the message redelivered?
|
* @param redelivered Was the message redelivered?
|
||||||
*/
|
*/
|
||||||
const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered)
|
const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered)
|
||||||
{
|
{
|
||||||
|
// install this object as the handler for the upcoming header and body frames
|
||||||
|
_channel->install(shared_from_this());
|
||||||
|
|
||||||
// store delivery tag and redelivery status
|
// store delivery tag and redelivery status
|
||||||
_deliveryTag = deliveryTag;
|
_deliveryTag = deliveryTag;
|
||||||
_redelivered = redelivered;
|
_redelivered = redelivered;
|
||||||
|
|
||||||
// install ourselves in the channel
|
|
||||||
_channel->install("", shared_from_this(), true);
|
|
||||||
|
|
||||||
// report the size (note that this is the size _minus_ the message that is retrieved
|
// report the size (note that this is the size _minus_ the message that is retrieved
|
||||||
// (and for which the callback will be called later), so it could be zero)
|
// (and for which the callback will be called later), so it could be zero)
|
||||||
if (_sizeCallback) _sizeCallback(messagecount);
|
if (_countCallback) _countCallback(messagecount);
|
||||||
|
|
||||||
// return next handler
|
// return next handler
|
||||||
return _next;
|
return _next;
|
||||||
|
|
@ -48,7 +47,7 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messagecoun
|
||||||
const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
|
const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
|
||||||
{
|
{
|
||||||
// report the size
|
// report the size
|
||||||
if (_sizeCallback) _sizeCallback(0);
|
if (_countCallback) _countCallback(0);
|
||||||
|
|
||||||
// check if a callback was set
|
// check if a callback was set
|
||||||
if (_emptyCallback) _emptyCallback();
|
if (_emptyCallback) _emptyCallback();
|
||||||
|
|
@ -58,27 +57,15 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Announce that a message has been received
|
* Extended implementation of the complete method that is called when a message was fully received
|
||||||
* @param message The message to announce
|
|
||||||
* @param deliveryTag The delivery tag (for ack()ing)
|
|
||||||
* @param redelivered Is this a redelivered message
|
|
||||||
*/
|
*/
|
||||||
void DeferredGet::announce(const Message &message, uint64_t deliveryTag, bool redelivered) const
|
void DeferredGet::complete()
|
||||||
{
|
{
|
||||||
// monitor the channel
|
// the channel is now synchronized, delayed frames may now be sent
|
||||||
Monitor monitor{ _channel };
|
|
||||||
|
|
||||||
// the channel is now synchronized
|
|
||||||
_channel->onSynchronized();
|
_channel->onSynchronized();
|
||||||
|
|
||||||
// simply execute the message callback
|
// pass on to normal implementation
|
||||||
_messageCallback(std::move(message), deliveryTag, redelivered);
|
DeferredExtReceiver::complete();
|
||||||
|
|
||||||
// check if the channel is still valid
|
|
||||||
if (!monitor.valid()) return;
|
|
||||||
|
|
||||||
// stop consuming now
|
|
||||||
_channel->uninstall({});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,72 @@
|
||||||
|
/**
|
||||||
|
* DeferredPublisher.cpp
|
||||||
|
*
|
||||||
|
* Implementation file for the DeferredPublisher class
|
||||||
|
*
|
||||||
|
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||||
|
* @copyright 2018 Copernica BV
|
||||||
|
*/
|
||||||
|
#include "includes.h"
|
||||||
|
#include "basicreturnframe.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin of namespace
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a return frame
|
||||||
|
*
|
||||||
|
* @param frame The frame to process
|
||||||
|
*/
|
||||||
|
void DeferredPublisher::process(BasicReturnFrame &frame)
|
||||||
|
{
|
||||||
|
// this object will handle all future frames with header and body data
|
||||||
|
_channel->install(shared_from_this());
|
||||||
|
|
||||||
|
// retrieve the delivery tag and whether we were redelivered
|
||||||
|
_code = frame.replyCode();
|
||||||
|
_description = frame.replyText();
|
||||||
|
|
||||||
|
// notify user space of the begin of the returned message
|
||||||
|
if (_beginCallback) _beginCallback(_code, _description);
|
||||||
|
|
||||||
|
// initialize the object for the next message
|
||||||
|
initialize(frame.exchange(), frame.routingKey());
|
||||||
|
|
||||||
|
// do we have anybody interested in messages? in that case we construct the message
|
||||||
|
if (_bounceCallback) _message.construct(frame.exchange(), frame.routingKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicate that a message was done
|
||||||
|
*/
|
||||||
|
void DeferredPublisher::complete()
|
||||||
|
{
|
||||||
|
// also monitor the channel
|
||||||
|
Monitor monitor(_channel);
|
||||||
|
|
||||||
|
// do we have a message?
|
||||||
|
if (_message) _bounceCallback(*_message, _code, _description);
|
||||||
|
|
||||||
|
// do we have to inform anyone about completion?
|
||||||
|
if (_completeCallback) _completeCallback();
|
||||||
|
|
||||||
|
// for the next iteration we want a new message
|
||||||
|
_message.reset();
|
||||||
|
|
||||||
|
// the description can be thrown away too
|
||||||
|
_description.clear();
|
||||||
|
|
||||||
|
// do we still have a valid channel
|
||||||
|
if (!monitor.valid()) return;
|
||||||
|
|
||||||
|
// we are now done executing, so the channel can forget the current receiving object
|
||||||
|
_channel->install(nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of namespace
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -0,0 +1,91 @@
|
||||||
|
/**
|
||||||
|
* DeferredReceiver.cpp
|
||||||
|
*
|
||||||
|
* Implementation file for the DeferredReceiver class
|
||||||
|
*
|
||||||
|
* @copyright 2016 - 2018 Copernica B.V.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dependencies
|
||||||
|
*/
|
||||||
|
#include "amqpcpp/deferredreceiver.h"
|
||||||
|
#include "basicdeliverframe.h"
|
||||||
|
#include "basicgetokframe.h"
|
||||||
|
#include "basicheaderframe.h"
|
||||||
|
#include "bodyframe.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start namespace
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the object: we are going to receive a message, next frames will be header and data
|
||||||
|
* @param exchange
|
||||||
|
* @param routingkey
|
||||||
|
*/
|
||||||
|
void DeferredReceiver::initialize(const std::string &exchange, const std::string &routingkey)
|
||||||
|
{
|
||||||
|
// anybody interested in the new message?
|
||||||
|
if (_startCallback) _startCallback(exchange, routingkey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process the message headers
|
||||||
|
*
|
||||||
|
* @param frame The frame to process
|
||||||
|
*/
|
||||||
|
void DeferredReceiver::process(BasicHeaderFrame &frame)
|
||||||
|
{
|
||||||
|
// make sure we stay in scope
|
||||||
|
auto self = lock();
|
||||||
|
|
||||||
|
// store the body size
|
||||||
|
_bodySize = frame.bodySize();
|
||||||
|
|
||||||
|
// is user interested in the size?
|
||||||
|
if (_sizeCallback) _sizeCallback(_bodySize);
|
||||||
|
|
||||||
|
// do we have a message?
|
||||||
|
if (_message)
|
||||||
|
{
|
||||||
|
// store the body size and metadata
|
||||||
|
_message->setBodySize(_bodySize);
|
||||||
|
_message->set(frame.metaData());
|
||||||
|
}
|
||||||
|
|
||||||
|
// anybody interested in the headers?
|
||||||
|
if (_headerCallback) _headerCallback(frame.metaData());
|
||||||
|
|
||||||
|
// no body data expected? then we are now complete
|
||||||
|
if (_bodySize == 0) complete();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process the message data
|
||||||
|
*
|
||||||
|
* @param frame The frame to process
|
||||||
|
*/
|
||||||
|
void DeferredReceiver::process(BodyFrame &frame)
|
||||||
|
{
|
||||||
|
// make sure we stay in scope
|
||||||
|
auto self = lock();
|
||||||
|
|
||||||
|
// update the bytes still to receive
|
||||||
|
_bodySize -= frame.payloadSize();
|
||||||
|
|
||||||
|
// anybody interested in the data?
|
||||||
|
if (_dataCallback) _dataCallback(frame.payload(), frame.payloadSize());
|
||||||
|
|
||||||
|
// do we have a message? then append the data
|
||||||
|
if (_message) _message->append(frame.payload(), frame.payloadSize());
|
||||||
|
|
||||||
|
// if all bytes were received we are now complete
|
||||||
|
if (_bodySize == 0) complete();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End namespace
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
@ -68,6 +68,7 @@
|
||||||
#include "amqpcpp/callbacks.h"
|
#include "amqpcpp/callbacks.h"
|
||||||
#include "amqpcpp/deferred.h"
|
#include "amqpcpp/deferred.h"
|
||||||
#include "amqpcpp/deferredconsumer.h"
|
#include "amqpcpp/deferredconsumer.h"
|
||||||
|
#include "amqpcpp/deferredpublisher.h"
|
||||||
#include "amqpcpp/deferredqueue.h"
|
#include "amqpcpp/deferredqueue.h"
|
||||||
#include "amqpcpp/deferreddelete.h"
|
#include "amqpcpp/deferreddelete.h"
|
||||||
#include "amqpcpp/deferredcancel.h"
|
#include "amqpcpp/deferredcancel.h"
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@
|
||||||
* @param argv
|
* @param argv
|
||||||
* @return int
|
* @return int
|
||||||
*/
|
*/
|
||||||
|
/*
|
||||||
int main(int argc, const char *argv[])
|
int main(int argc, const char *argv[])
|
||||||
{
|
{
|
||||||
// iterate over the arguments
|
// iterate over the arguments
|
||||||
|
|
@ -37,3 +38,4 @@ int main(int argc, const char *argv[])
|
||||||
// done
|
// done
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
@ -30,7 +30,7 @@ MyConnection::MyConnection(const std::string &ip) :
|
||||||
_socket(Event::MainLoop::instance(), this)
|
_socket(Event::MainLoop::instance(), this)
|
||||||
{
|
{
|
||||||
// start connecting
|
// start connecting
|
||||||
if (_socket.connect(Network::Ipv4Address(ip), 5672)) return;
|
if (_socket.connect(Dns::IpAddress(ip), 5672)) return;
|
||||||
|
|
||||||
// failure
|
// failure
|
||||||
onFailure(&_socket);
|
onFailure(&_socket);
|
||||||
|
|
@ -96,21 +96,30 @@ void MyConnection::onConnected(Network::TcpSocket *socket)
|
||||||
std::cout << "queue declared" << std::endl;
|
std::cout << "queue declared" << std::endl;
|
||||||
|
|
||||||
// start consuming
|
// start consuming
|
||||||
_channel->consume("my_queue").onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
|
_channel->consume("my_queue").onReceived([this](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
|
||||||
std::cout << "received: " << message.message() << std::endl;
|
std::cout << "consumed from exchange " << message.exchange() << " " << message.routingkey() << ": " << std::string(message.body(), message.bodySize()) << std::endl;
|
||||||
|
_channel->ack(deliveryTag);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
// declare an exchange
|
// declare an exchange
|
||||||
_channel->declareExchange().onSuccess([]() {
|
_channel->declareExchange("my_exchange", AMQP::direct).onSuccess([]() {
|
||||||
std::cout << "exchange declared" << std::endl;
|
std::cout << "exchange declared" << std::endl;
|
||||||
});
|
});
|
||||||
|
|
||||||
// bind queue and exchange
|
// bind queue and exchange
|
||||||
_channel->bindQueue("my_exchange", "my_queue", "key").onSuccess([this]() {
|
_channel->bindQueue("my_exchange", "my_queue", "key").onSuccess([this]() {
|
||||||
std::cout << "queue bound to exchange" << std::endl;
|
std::cout << "queue bound to exchange" << std::endl;
|
||||||
|
|
||||||
|
// callback for returns
|
||||||
|
auto callback = [](const AMQP::Message &message, int16_t code, const std::string &description) {
|
||||||
|
|
||||||
_channel->publish("my_exchange", "key", "just a message");
|
std::cout << "message was returned: " << code << " " << description << ": " << std::string(message.body(), message.bodySize()) << std::endl;
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
_channel->publish("my_exchange", "key", "just a message", AMQP::mandatory).onReturned(callback);
|
||||||
|
_channel->publish("my_exchange", "unknown key", "just another message", AMQP::mandatory).onReturned(callback);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -156,7 +165,7 @@ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer)
|
||||||
if (!_connection) return;
|
if (!_connection) return;
|
||||||
|
|
||||||
// let the data be handled by the connection
|
// let the data be handled by the connection
|
||||||
size_t bytes = _connection->parse(buffer->data(), buffer->size());
|
size_t bytes = _connection->parse(buffer->buffer(), buffer->size());
|
||||||
|
|
||||||
// shrink the buffer
|
// shrink the buffer
|
||||||
buffer->shrink(bytes);
|
buffer->shrink(bytes);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue