2014-04-02 21:40:35 +08:00
|
|
|
#pragma once
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* ChannelImpl.h
|
|
|
|
|
*
|
|
|
|
|
* Extended channel object that is used internally by the library, but
|
|
|
|
|
* that has a private constructor so that it can not be used from outside
|
|
|
|
|
* the AMQP library
|
|
|
|
|
*
|
|
|
|
|
* @copyright 2014 Copernica BV
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Set up namespace
|
|
|
|
|
*/
|
|
|
|
|
namespace AMQP {
|
|
|
|
|
|
2014-04-15 17:39:52 +08:00
|
|
|
/**
|
|
|
|
|
* Forward declarations
|
|
|
|
|
*/
|
|
|
|
|
class ConsumedMessage;
|
|
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Class definition
|
|
|
|
|
*/
|
2014-08-20 17:47:16 +08:00
|
|
|
class ChannelImpl : public Watchable, public std::enable_shared_from_this<ChannelImpl>
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
|
|
|
|
private:
|
|
|
|
|
/**
|
|
|
|
|
* Pointer to the connection
|
2014-01-07 00:15:21 +08:00
|
|
|
* @var ConnectionImpl
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-08-20 18:44:52 +08:00
|
|
|
ConnectionImpl *_connection = nullptr;
|
2014-01-04 19:45:04 +08:00
|
|
|
|
2014-04-08 20:42:07 +08:00
|
|
|
/**
|
|
|
|
|
* Callback when the channel is ready
|
2014-04-15 16:43:33 +08:00
|
|
|
* @var SuccessCallback
|
2014-04-08 20:42:07 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
SuccessCallback _readyCallback;
|
2014-04-08 20:42:07 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Callback when the channel errors out
|
2014-04-15 16:43:33 +08:00
|
|
|
* @var ErrorCallback
|
2014-04-08 20:42:07 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
ErrorCallback _errorCallback;
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-04-10 18:51:04 +08:00
|
|
|
/**
|
2014-04-15 17:39:52 +08:00
|
|
|
* Callbacks for all consumers that are active
|
|
|
|
|
* @var std::map<std::string,MessageCallback>
|
2014-04-10 18:51:04 +08:00
|
|
|
*/
|
2014-04-15 17:39:52 +08:00
|
|
|
std::map<std::string,MessageCallback> _consumers;
|
2014-04-10 18:51:04 +08:00
|
|
|
|
2014-04-08 20:42:07 +08:00
|
|
|
/**
|
2014-04-15 16:43:33 +08:00
|
|
|
* Pointer to the oldest deferred result (the first one that is going
|
|
|
|
|
* to be executed)
|
2014-04-29 21:51:33 +08:00
|
|
|
*
|
2014-04-15 16:43:33 +08:00
|
|
|
* @var Deferred
|
2014-04-08 20:42:07 +08:00
|
|
|
*/
|
2014-08-19 23:43:12 +08:00
|
|
|
std::shared_ptr<Deferred> _oldestCallback = nullptr;
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
/**
|
|
|
|
|
* Pointer to the newest deferred result (the last one to be added).
|
2014-04-29 21:51:33 +08:00
|
|
|
*
|
2014-04-15 16:43:33 +08:00
|
|
|
* @var Deferred
|
|
|
|
|
*/
|
|
|
|
|
Deferred *_newestCallback = nullptr;
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* The channel number
|
|
|
|
|
* @var uint16_t
|
|
|
|
|
*/
|
2014-08-20 18:44:52 +08:00
|
|
|
uint16_t _id = 0;
|
2014-01-04 19:45:04 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* State of the channel object
|
|
|
|
|
* @var enum
|
|
|
|
|
*/
|
|
|
|
|
enum {
|
|
|
|
|
state_connected,
|
2014-01-07 00:15:21 +08:00
|
|
|
state_closing,
|
2014-01-04 19:45:04 +08:00
|
|
|
state_closed
|
2014-08-20 18:44:52 +08:00
|
|
|
} _state = state_closed;
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-04-29 21:51:33 +08:00
|
|
|
/**
|
|
|
|
|
* The frames that still need to be send out
|
|
|
|
|
*
|
|
|
|
|
* We store the data as well as whether they
|
|
|
|
|
* should be handled synchronously.
|
2014-08-20 17:47:16 +08:00
|
|
|
*
|
|
|
|
|
* @var std::queue
|
2014-04-29 21:51:33 +08:00
|
|
|
*/
|
|
|
|
|
std::queue<std::pair<bool, OutBuffer>> _queue;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Are we currently operating in synchronous mode?
|
2014-08-20 17:47:16 +08:00
|
|
|
* @var bool
|
2014-04-29 21:51:33 +08:00
|
|
|
*/
|
|
|
|
|
bool _synchronous = false;
|
|
|
|
|
|
2014-01-06 01:50:41 +08:00
|
|
|
/**
|
|
|
|
|
* The message that is now being received
|
2014-04-15 17:39:52 +08:00
|
|
|
* @var ConsumedMessage
|
2014-01-06 01:50:41 +08:00
|
|
|
*/
|
2014-04-15 17:39:52 +08:00
|
|
|
ConsumedMessage *_message = nullptr;
|
2014-01-04 19:45:04 +08:00
|
|
|
|
|
|
|
|
/**
|
2014-08-20 18:44:52 +08:00
|
|
|
* Attach the connection
|
|
|
|
|
* @param connection
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-08-20 18:44:52 +08:00
|
|
|
void attach(Connection *connection);
|
2014-01-04 19:45:04 +08:00
|
|
|
|
2014-04-15 16:43:33 +08:00
|
|
|
/**
|
|
|
|
|
* Push a deferred result
|
|
|
|
|
* @param result The deferred result
|
2014-04-15 19:01:27 +08:00
|
|
|
* @return Deferred The object just pushed
|
2014-04-15 16:43:33 +08:00
|
|
|
*/
|
2014-04-15 19:01:27 +08:00
|
|
|
Deferred &push(Deferred *deferred);
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Send a framen and push a deferred result
|
|
|
|
|
* @param frame The frame to send
|
|
|
|
|
* @return Deferred The object just pushed
|
|
|
|
|
*/
|
|
|
|
|
Deferred &push(const Frame &frame);
|
2014-04-15 16:43:33 +08:00
|
|
|
|
2014-08-20 17:47:16 +08:00
|
|
|
protected:
|
|
|
|
|
/**
|
|
|
|
|
* Construct a channel object
|
|
|
|
|
*
|
|
|
|
|
* Note that the constructor is private, and that the Channel class is
|
|
|
|
|
* a friend. By doing this we ensure that nobody can instantiate this
|
|
|
|
|
* object, and that it can thus only be used inside the library.
|
|
|
|
|
*/
|
2014-08-20 18:44:52 +08:00
|
|
|
ChannelImpl() {}
|
2014-04-15 16:43:33 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
public:
|
2014-08-29 19:17:17 +08:00
|
|
|
/**
|
|
|
|
|
* Copy'ing of channel objects is not supported
|
|
|
|
|
* @param channel
|
|
|
|
|
*/
|
|
|
|
|
ChannelImpl(const ChannelImpl &channel) = delete;
|
|
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Destructor
|
|
|
|
|
*/
|
|
|
|
|
virtual ~ChannelImpl();
|
|
|
|
|
|
2014-01-07 05:34:54 +08:00
|
|
|
/**
|
|
|
|
|
* Invalidate the channel
|
|
|
|
|
* This method is called when the connection is destructed
|
|
|
|
|
*/
|
2014-08-20 18:44:52 +08:00
|
|
|
void detach()
|
2014-01-07 05:34:54 +08:00
|
|
|
{
|
2014-08-20 19:40:29 +08:00
|
|
|
// connection is gone
|
2014-01-07 05:34:54 +08:00
|
|
|
_connection = nullptr;
|
|
|
|
|
}
|
|
|
|
|
|
2014-08-20 19:40:29 +08:00
|
|
|
/**
|
|
|
|
|
* Callback that is called when the channel was succesfully created.
|
|
|
|
|
* @param callback the callback to execute
|
|
|
|
|
*/
|
|
|
|
|
void onReady(const SuccessCallback &callback)
|
|
|
|
|
{
|
|
|
|
|
// store callback
|
|
|
|
|
_readyCallback = callback;
|
|
|
|
|
|
|
|
|
|
// direct call if channel is already ready
|
|
|
|
|
if (_state == state_connected) callback();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Callback that is called when an error occurs.
|
|
|
|
|
*
|
|
|
|
|
* Only one error callback can be registered. Calling this function
|
|
|
|
|
* multiple times will remove the old callback.
|
|
|
|
|
*
|
|
|
|
|
* @param callback the callback to execute
|
|
|
|
|
*/
|
|
|
|
|
void onError(const ErrorCallback &callback)
|
|
|
|
|
{
|
|
|
|
|
// store callback
|
|
|
|
|
_errorCallback = callback;
|
|
|
|
|
|
|
|
|
|
// direct call if channel is already in error state
|
|
|
|
|
if (_state != state_connected) callback("Channel is in error state");
|
|
|
|
|
}
|
|
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Pause deliveries on a channel
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* This will stop all incoming messages
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &pause();
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Resume a paused channel
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This will resume incoming messages
|
|
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &resume();
|
2014-01-04 19:45:04 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Is the channel connected?
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
|
|
|
|
bool connected()
|
|
|
|
|
{
|
|
|
|
|
return _state == state_connected;
|
|
|
|
|
}
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Start a transaction
|
|
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &startTransaction();
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Commit the current transaction
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &commitTransaction();
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Rollback the current transaction
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &rollbackTransaction();
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* declare an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param name name of the exchange to declare
|
|
|
|
|
* @param type type of exchange
|
|
|
|
|
* @param flags additional settings for the exchange
|
|
|
|
|
* @param arguments additional arguments
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* bind two exchanges
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param source exchange which binds to target
|
|
|
|
|
* @param target exchange to bind to
|
|
|
|
|
* @param routingKey routing key
|
|
|
|
|
* @param arguments additional arguments for binding
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* unbind two exchanges
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param source the source exchange
|
|
|
|
|
* @param target the target exchange
|
|
|
|
|
* @param routingkey the routing key
|
|
|
|
|
* @param arguments additional unbind arguments
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
Deferred &unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* remove an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param name name of the exchange to remove
|
|
|
|
|
* @param flags additional settings for deleting the exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &removeExchange(const std::string &name, int flags);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* declare a queue
|
|
|
|
|
* @param name queue name
|
|
|
|
|
* @param flags additional settings for the queue
|
|
|
|
|
* @param arguments additional arguments
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Bind a queue to an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param exchangeName name of the exchange to bind to
|
|
|
|
|
* @param queueName name of the queue
|
|
|
|
|
* @param routingkey routingkey
|
|
|
|
|
* @param arguments additional arguments
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
Deferred &bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments);
|
2014-01-04 19:45:04 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Unbind a queue from an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-04 19:45:04 +08:00
|
|
|
* @param exchange the source exchange
|
|
|
|
|
* @param queue the target queue
|
|
|
|
|
* @param routingkey the routing key
|
|
|
|
|
* @param arguments additional bind arguments
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &unbindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Purge a queue
|
|
|
|
|
* @param queue queue to purge
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
|
|
|
|
*
|
|
|
|
|
* The onSuccess() callback that you can install should have the following signature:
|
|
|
|
|
*
|
|
|
|
|
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
|
|
|
|
|
*
|
|
|
|
|
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
|
|
|
|
|
*
|
|
|
|
|
* std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* });
|
|
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
DeferredDelete &purgeQueue(const std::string &name);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Remove a queue
|
|
|
|
|
* @param queue queue to remove
|
|
|
|
|
* @param flags additional flags
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
|
|
|
|
*
|
|
|
|
|
* The onSuccess() callback that you can install should have the following signature:
|
|
|
|
|
*
|
|
|
|
|
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
|
|
|
|
|
*
|
|
|
|
|
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
|
|
|
|
|
*
|
|
|
|
|
* std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* });
|
|
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
DeferredDelete &removeQueue(const std::string &name, int flags);
|
2014-01-05 04:01:02 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Publish a message to an exchange
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-07 00:15:21 +08:00
|
|
|
* If the mandatory or immediate flag is set, and the message could not immediately
|
|
|
|
|
* be published, the message will be returned to the client, and will eventually
|
2014-04-10 18:51:04 +08:00
|
|
|
* end up in your onReturned() handler method.
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
2014-01-05 04:01:02 +08:00
|
|
|
* @param exchange the exchange to publish to
|
|
|
|
|
* @param routingkey the routing key
|
|
|
|
|
* @param envelope the full envelope to send
|
|
|
|
|
* @param message the message to send
|
|
|
|
|
* @param size size of the message
|
|
|
|
|
*/
|
2014-04-10 18:51:04 +08:00
|
|
|
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-05 21:19:35 +08:00
|
|
|
/**
|
|
|
|
|
* Set the Quality of Service (QOS) of the entire connection
|
|
|
|
|
* @param prefetchCount maximum number of messages to prefetch
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-07-31 16:10:15 +08:00
|
|
|
*
|
|
|
|
|
* @param count number of messages to pre-fetch
|
|
|
|
|
* @param global share count between all consumers on the same channel
|
2014-01-05 21:19:35 +08:00
|
|
|
*/
|
2014-07-31 16:10:15 +08:00
|
|
|
Deferred &setQos(uint16_t prefetchCount, bool global = false);
|
2014-01-05 21:19:35 +08:00
|
|
|
|
2014-01-06 01:50:41 +08:00
|
|
|
/**
|
|
|
|
|
* Tell the RabbitMQ server that we're ready to consume messages
|
|
|
|
|
* @param queue the queue from which you want to consume
|
|
|
|
|
* @param tag a consumer tag that will be associated with this consume operation
|
|
|
|
|
* @param flags additional flags
|
|
|
|
|
* @param arguments additional arguments
|
2014-04-10 18:51:04 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
|
|
|
|
*
|
|
|
|
|
* The onSuccess() callback that you can install should have the following signature:
|
|
|
|
|
*
|
|
|
|
|
* void myCallback(AMQP::Channel *channel, const std::string& tag);
|
|
|
|
|
*
|
|
|
|
|
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
|
|
|
|
|
*
|
|
|
|
|
* std::cout << "Started consuming under tag " << tag << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* });
|
2014-01-06 01:50:41 +08:00
|
|
|
*/
|
2014-04-10 18:51:04 +08:00
|
|
|
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-06 01:50:41 +08:00
|
|
|
/**
|
|
|
|
|
* Cancel a running consumer
|
|
|
|
|
* @param tag the consumer tag
|
2014-04-10 18:51:04 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
|
|
|
|
*
|
|
|
|
|
* The onSuccess() callback that you can install should have the following signature:
|
|
|
|
|
*
|
2014-07-31 18:58:13 +08:00
|
|
|
* void myCallback(const std::string& tag);
|
2014-04-10 18:51:04 +08:00
|
|
|
*
|
2014-07-31 18:58:13 +08:00
|
|
|
* For example: channel.declareQueue("myqueue").onSuccess([](const std::string& tag) {
|
2014-04-10 18:51:04 +08:00
|
|
|
*
|
|
|
|
|
* std::cout << "Started consuming under tag " << tag << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* });
|
2014-01-06 01:50:41 +08:00
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
DeferredCancel &cancel(const std::string &tag);
|
2014-01-06 04:21:09 +08:00
|
|
|
|
2014-07-31 18:58:13 +08:00
|
|
|
/**
|
|
|
|
|
* Retrieve a single message from RabbitMQ
|
|
|
|
|
*
|
|
|
|
|
* When you call this method, you can get one single message from the queue (or none
|
|
|
|
|
* at all if the queue is empty). The deferred object that is returned, should be used
|
|
|
|
|
* to install a onEmpty() and onSuccess() callback function that will be called
|
|
|
|
|
* when the message is consumed and/or when the message could not be consumed.
|
|
|
|
|
*
|
|
|
|
|
* The following flags are supported:
|
|
|
|
|
*
|
|
|
|
|
* - noack if set, consumed messages do not have to be acked, this happens automatically
|
|
|
|
|
*
|
|
|
|
|
* @param queue name of the queue to consume from
|
|
|
|
|
* @param flags optional flags
|
|
|
|
|
*
|
|
|
|
|
* The object returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onEmpty(), onError() and onFinalize() methods.
|
|
|
|
|
*
|
|
|
|
|
* The onSuccess() callback has the following signature:
|
|
|
|
|
*
|
|
|
|
|
* void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered);
|
|
|
|
|
*
|
|
|
|
|
* For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) {
|
|
|
|
|
*
|
|
|
|
|
* std::cout << "Message fetched" << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* }).onEmpty([]() {
|
|
|
|
|
*
|
|
|
|
|
* std::cout << "Queue is empty" << std::endl;
|
|
|
|
|
*
|
|
|
|
|
* });
|
|
|
|
|
*/
|
|
|
|
|
DeferredGet &get(const std::string &queue, int flags = 0);
|
|
|
|
|
|
2014-01-06 04:21:09 +08:00
|
|
|
/**
|
2014-04-10 18:51:04 +08:00
|
|
|
* Acknowledge a message
|
2014-01-06 04:21:09 +08:00
|
|
|
* @param deliveryTag the delivery tag
|
|
|
|
|
* @param flags optional flags
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
|
|
|
|
bool ack(uint64_t deliveryTag, int flags);
|
2014-01-06 21:28:58 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Reject a message
|
|
|
|
|
* @param deliveryTag the delivery tag
|
|
|
|
|
* @param flags optional flags
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
|
|
|
|
bool reject(uint64_t deliveryTag, int flags);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-06 21:38:48 +08:00
|
|
|
/**
|
|
|
|
|
* Recover messages that were not yet ack'ed
|
|
|
|
|
* @param flags optional flags
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-06 21:38:48 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &recover(int flags);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Close the current channel
|
2014-04-08 20:42:07 +08:00
|
|
|
*
|
|
|
|
|
* This function returns a deferred handler. Callbacks can be installed
|
|
|
|
|
* using onSuccess(), onError() and onFinalize() methods.
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-15 16:43:33 +08:00
|
|
|
Deferred &close();
|
2014-01-04 19:45:04 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the channel we're working on
|
|
|
|
|
* @return uint16_t
|
|
|
|
|
*/
|
|
|
|
|
const uint16_t id() const
|
|
|
|
|
{
|
|
|
|
|
return _id;
|
|
|
|
|
}
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Send a frame over the channel
|
|
|
|
|
* @param frame frame to send
|
2014-01-07 00:15:21 +08:00
|
|
|
* @return bool was frame succesfully sent?
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-01-07 00:15:21 +08:00
|
|
|
bool send(const Frame &frame);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-04-29 21:51:33 +08:00
|
|
|
/**
|
2014-08-20 17:47:16 +08:00
|
|
|
* Is this channel waiting for an answer before it can send furher instructions
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
|
|
|
|
bool waiting() const
|
|
|
|
|
{
|
2014-08-20 19:40:29 +08:00
|
|
|
return _synchronous || !_queue.empty();
|
2014-08-20 17:47:16 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Signal the channel that a synchronous operation was completed.
|
|
|
|
|
* After this operation, waiting frames can be sent out.
|
2014-04-29 21:51:33 +08:00
|
|
|
*/
|
2014-08-20 17:47:16 +08:00
|
|
|
void onSynchronized();
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-08 20:42:07 +08:00
|
|
|
/**
|
2014-04-08 22:12:04 +08:00
|
|
|
* Report to the handler that the channel is opened
|
2014-04-08 20:42:07 +08:00
|
|
|
*/
|
2014-04-08 22:12:04 +08:00
|
|
|
void reportReady()
|
|
|
|
|
{
|
2014-04-29 21:51:33 +08:00
|
|
|
// callbacks could destroy us, so monitor it
|
|
|
|
|
Monitor monitor(this);
|
|
|
|
|
|
2014-04-08 22:12:04 +08:00
|
|
|
// inform handler
|
2014-04-15 16:43:33 +08:00
|
|
|
if (_readyCallback) _readyCallback();
|
2014-04-29 21:51:33 +08:00
|
|
|
|
|
|
|
|
// if the monitor is still valid, we exit synchronous mode now
|
2014-08-20 17:47:16 +08:00
|
|
|
if (monitor.valid()) onSynchronized();
|
2014-04-08 22:12:04 +08:00
|
|
|
}
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Report to the handler that the channel is closed
|
2014-04-29 21:51:33 +08:00
|
|
|
*
|
|
|
|
|
* Returns whether the channel object is still valid
|
2014-08-20 17:47:16 +08:00
|
|
|
*
|
|
|
|
|
* @return bool
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-29 21:51:33 +08:00
|
|
|
bool reportClosed()
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
|
|
|
|
// change state
|
|
|
|
|
_state = state_closed;
|
2014-08-20 17:47:16 +08:00
|
|
|
_synchronous = false;
|
|
|
|
|
|
|
|
|
|
// create a monitor, because the callbacks could destruct the current object
|
|
|
|
|
Monitor monitor(this);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-04-15 19:22:06 +08:00
|
|
|
// and pass on to the reportSuccess() method which will call the
|
|
|
|
|
// appropriate deferred object to report the successful operation
|
2014-08-20 17:47:16 +08:00
|
|
|
bool result = reportSuccess();
|
|
|
|
|
|
|
|
|
|
// leap out if object no longer exists
|
|
|
|
|
if (!monitor.valid()) return result;
|
|
|
|
|
|
|
|
|
|
// all later deferred objects should report an error, because it
|
|
|
|
|
// was not possible to complete the instruction as the channel is
|
|
|
|
|
// now closed
|
|
|
|
|
reportError("Channel has been closed", false);
|
|
|
|
|
|
|
|
|
|
// done
|
|
|
|
|
return result;
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
2014-04-08 20:42:07 +08:00
|
|
|
* Report success
|
|
|
|
|
*
|
2014-04-29 21:51:33 +08:00
|
|
|
* Returns whether the channel object is still valid
|
2014-08-20 17:47:16 +08:00
|
|
|
*
|
|
|
|
|
* @param mixed
|
|
|
|
|
* @return bool
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-04-08 22:12:04 +08:00
|
|
|
template <typename... Arguments>
|
2014-04-29 21:51:33 +08:00
|
|
|
bool reportSuccess(Arguments ...parameters)
|
2014-01-04 19:45:04 +08:00
|
|
|
{
|
2014-04-15 16:43:33 +08:00
|
|
|
// skip if there is no oldest callback
|
2014-04-29 21:51:33 +08:00
|
|
|
if (!_oldestCallback) return true;
|
|
|
|
|
|
2014-04-15 18:25:56 +08:00
|
|
|
// we are going to call callbacks that could destruct the channel
|
|
|
|
|
Monitor monitor(this);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-08-19 23:43:12 +08:00
|
|
|
// copy the callback (so that it will not be destructed during
|
|
|
|
|
// the "reportSuccess" call
|
|
|
|
|
auto cb = _oldestCallback;
|
|
|
|
|
|
2014-04-15 18:25:56 +08:00
|
|
|
// call the callback
|
2014-08-19 23:43:12 +08:00
|
|
|
auto *next = cb->reportSuccess(std::forward<Arguments>(parameters)...);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 18:25:56 +08:00
|
|
|
// leap out if channel no longer exists
|
2014-04-29 21:51:33 +08:00
|
|
|
if (!monitor.valid()) return false;
|
|
|
|
|
|
2014-04-15 18:25:56 +08:00
|
|
|
// set the oldest callback
|
|
|
|
|
_oldestCallback.reset(next);
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 18:25:56 +08:00
|
|
|
// if there was no next callback, the newest callback was just used
|
|
|
|
|
if (!next) _newestCallback = nullptr;
|
2014-04-29 21:51:33 +08:00
|
|
|
|
|
|
|
|
// we are still valid
|
|
|
|
|
return true;
|
2014-01-04 19:45:04 +08:00
|
|
|
}
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* Report an error message on a channel
|
2014-04-16 18:04:44 +08:00
|
|
|
* @param message the error message
|
|
|
|
|
* @param notifyhandler should the channel-wide handler also be called?
|
2014-01-04 19:45:04 +08:00
|
|
|
*/
|
2014-08-20 17:47:16 +08:00
|
|
|
void reportError(const char *message, bool notifyhandler = true);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-04-15 17:39:52 +08:00
|
|
|
/**
|
|
|
|
|
* Install a consumer callback
|
|
|
|
|
* @param consumertag The consumer tag
|
|
|
|
|
* @param callback The callback to be called
|
|
|
|
|
*/
|
|
|
|
|
void install(const std::string &consumertag, const MessageCallback &callback)
|
|
|
|
|
{
|
|
|
|
|
// install the callback if it is assigned
|
|
|
|
|
if (callback) _consumers[consumertag] = callback;
|
2014-04-29 21:51:33 +08:00
|
|
|
|
2014-04-15 17:39:52 +08:00
|
|
|
// otherwise we erase the previously set callback
|
|
|
|
|
else _consumers.erase(consumertag);
|
|
|
|
|
}
|
|
|
|
|
|
2014-04-15 18:36:11 +08:00
|
|
|
/**
|
|
|
|
|
* Uninstall a consumer callback
|
|
|
|
|
* @param consumertag The consumer tag
|
|
|
|
|
*/
|
|
|
|
|
void uninstall(const std::string &consumertag)
|
|
|
|
|
{
|
|
|
|
|
// erase the callback
|
|
|
|
|
_consumers.erase(consumertag);
|
|
|
|
|
}
|
|
|
|
|
|
2014-01-06 01:50:41 +08:00
|
|
|
/**
|
2014-01-06 04:21:09 +08:00
|
|
|
* Report that a message was received
|
2014-01-06 01:50:41 +08:00
|
|
|
*/
|
2014-01-06 22:49:31 +08:00
|
|
|
void reportMessage();
|
2014-01-06 01:50:41 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create an incoming message
|
|
|
|
|
* @param frame
|
2014-04-15 17:39:52 +08:00
|
|
|
* @return ConsumedMessage
|
2014-01-06 01:50:41 +08:00
|
|
|
*/
|
2014-04-15 17:39:52 +08:00
|
|
|
ConsumedMessage *message(const BasicDeliverFrame &frame);
|
2014-07-31 18:58:13 +08:00
|
|
|
ConsumedMessage *message(const BasicGetOKFrame &frame);
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-06 01:50:41 +08:00
|
|
|
/**
|
|
|
|
|
* Retrieve the current incoming message
|
2014-04-15 17:39:52 +08:00
|
|
|
* @return ConsumedMessage
|
2014-01-06 01:50:41 +08:00
|
|
|
*/
|
2014-04-15 17:39:52 +08:00
|
|
|
ConsumedMessage *message()
|
2014-01-06 01:50:41 +08:00
|
|
|
{
|
|
|
|
|
return _message;
|
|
|
|
|
}
|
2014-04-08 20:42:07 +08:00
|
|
|
|
2014-01-04 19:45:04 +08:00
|
|
|
/**
|
|
|
|
|
* The channel class is its friend, thus can it instantiate this object
|
|
|
|
|
*/
|
|
|
|
|
friend class Channel;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* End of namespace
|
|
|
|
|
*/
|
|
|
|
|
}
|
|
|
|
|
|