Update README.md

This commit is contained in:
Emiel Bruijntjes 2014-01-07 09:41:16 +01:00
parent 1a6f7d9796
commit 0ee6348ddc
1 changed files with 111 additions and 1 deletions

112
README.md
View File

@ -404,7 +404,117 @@ seem to be supported by RabbitMQ.
CONSUMING MESSAGES
==================
To be added soon
Fetching messages from RabbitMQ is called consuming, and can be started by calling
the method Channel::consume(). After you've called this method, RabbitMQ starts
delivering messages to you.
Just like the publish() method that we just described, the consume() method also
comes in many forms. They all have in common that the first argument should be
the name of the queue you like to consume from. The subsequent parameters are an
optional consumer tag, flags and a table with custom arguments. The first additional
parameter, the consumer tag, is nothing more than a string identifier that will be
passed with every consumed message. This can be useful if you call the consume()
methods a number of times to consume from multiple queues, and you would like to know
from which consume call the received messages came.
The full documentation from the C++ Channel.h headerfile looks like this:
````c++
/**
* Tell the RabbitMQ server that we're ready to consume messages
*
* After this method is called, RabbitMQ starts delivering messages to the client
* application. The consume tag is a string identifier that will be passed to
* each received message, so that you can associate incoming messages with a
* consumer. If you do not specify a consumer tag, the server will assign one
* for you.
*
* The following flags are supported:
*
* - nolocal if set, messages published on this channel are not also consumed
* - noack if set, consumed messages do not have to be acked, this happens automatically
* - exclusive request exclusive access, only this consumer can access the queue
* - nowait the server does not have to send a response back that consuming is active
*
* The method ChannelHandler::onConsumerStarted() will be called when the
* consumer has started (unless the nowait option was set, in which case
* no confirmation method is called)
*
* @param queue the queue from which you want to consume
* @param tag a consumer tag that will be associated with this consume operation
* @param flags additional flags
* @param arguments additional arguments
* @return bool
*/
bool consume(const std::string &queue, const std::string &tag, int flags, const AMQP::Table &arguments);
bool consume(const std::string &queue, const std::string &tag, int flags = 0);
bool consume(const std::string &queue, const std::string &tag, const AMQP::Table &arguments);
bool consume(const std::string &queue, int flags, const AMQP::Table &arguments);
bool consume(const std::string &queue, int flags = 0);
bool consume(const std::string &queue, const AMQP::Table &arguments);
````
In your ChannelHandler you can override the onConsumerStarted() method, that will be
first called before any messages are sent to you. Most users choose not to override this
method, because there is not much useful to do in it. After the consumer has started, however,
messages are starting to be sent from RabbitMQ to your client application, and they are all
passed to the ChannelHandler::onReceived() method. This method is thus very important to implement.
````c++
class MyChannelHandler : public AMQP::ChannelHandler
{
/**
* Method that is called when a message has been received on a channel
* This message will be called for every message that is received after
* you started consuming. Make sure you acknowledge the messages when its
* safe to remove them from RabbitMQ (unless you set no-ack option when you
* started the consumer)
* @param channel the channel on which the consumer was started
* @param message the consumed message
* @param deliveryTag the delivery tag, you need this to acknowledge the message
* @param consumerTag the consumer identifier that was used to retrieve this message
* @param redelivered is this a redelivered message?
*/
virtual void onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)
{
// @todo
// add your own processing
// after the message was processed, acknowledge it
channel->ack(deliveryTag);
}
}
````
The Message object holds all information of the delivered message: the actual content,
all meta information from the envelope (in fact, the Message class is derived from the Envelope class),
and even then name of the exchange and the routing key that were used when the message was originally
published. For a full list of all information in the Message class, you best have a look at the
message.h, envelope.h and metadata.h header files.
Another important parameter to the onReceived() method is the deliveryTag parameter. This is a
unique identifier that you need to acknowledge an incoming message. RabbitMQ only removes the
message after it has been acknowledged, so that if your application crashes while it was busy
processing the message, the message does not get lost but remains in the queue. But this means that
after you've processed the message, you must inform RabbitMQ about it by calling the Channel:ack()
method. This method is very simply and takes in its simplest form only one parameter: the
deliveryTag of the message.
The consumerTag that you see in the onReceived method() is the same string identifier that was
passed to the Channel::consume() method.
Consuming messages is a continuous process. RabbitMQ keeps sending messages, until you stop
the consumer, which can be done by calling the Channel::cancel() method. If you close the channel,
or the entire TCP connection, consuming also stops.
RabbitMQ throttles the number of messages that are delivered to you, to prevent that your application
is flooded with messages from the queue, and to spread out the messages over multiple consumers.
This is done with a setting called quality-of-service (QOS). The QOS setting is a numeric value which
holds the number of unacknowledged messages that you are allowed to have. RabbitMQ stops sending
additional messages when the number of unacknowledges messages has reached this limit, and only
sends additional messages when an earlier message gets acknowledged. To change the QOS, you can
simple call Channel::setQos().
WORK IN PROGRESS