From 0ee6348ddc115c7c189d347b27b23a1501263d34 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Tue, 7 Jan 2014 09:41:16 +0100 Subject: [PATCH] Update README.md --- README.md | 112 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8a5328e..c306bf6 100644 --- a/README.md +++ b/README.md @@ -404,7 +404,117 @@ seem to be supported by RabbitMQ. CONSUMING MESSAGES ================== -To be added soon +Fetching messages from RabbitMQ is called consuming, and can be started by calling +the method Channel::consume(). After you've called this method, RabbitMQ starts +delivering messages to you. + +Just like the publish() method that we just described, the consume() method also +comes in many forms. They all have in common that the first argument should be +the name of the queue you like to consume from. The subsequent parameters are an +optional consumer tag, flags and a table with custom arguments. The first additional +parameter, the consumer tag, is nothing more than a string identifier that will be +passed with every consumed message. This can be useful if you call the consume() +methods a number of times to consume from multiple queues, and you would like to know +from which consume call the received messages came. + +The full documentation from the C++ Channel.h headerfile looks like this: + +````c++ +/** + * Tell the RabbitMQ server that we're ready to consume messages + * + * After this method is called, RabbitMQ starts delivering messages to the client + * application. The consume tag is a string identifier that will be passed to + * each received message, so that you can associate incoming messages with a + * consumer. If you do not specify a consumer tag, the server will assign one + * for you. + * + * The following flags are supported: + * + * - nolocal if set, messages published on this channel are not also consumed + * - noack if set, consumed messages do not have to be acked, this happens automatically + * - exclusive request exclusive access, only this consumer can access the queue + * - nowait the server does not have to send a response back that consuming is active + * + * The method ChannelHandler::onConsumerStarted() will be called when the + * consumer has started (unless the nowait option was set, in which case + * no confirmation method is called) + * + * @param queue the queue from which you want to consume + * @param tag a consumer tag that will be associated with this consume operation + * @param flags additional flags + * @param arguments additional arguments + * @return bool + */ +bool consume(const std::string &queue, const std::string &tag, int flags, const AMQP::Table &arguments); +bool consume(const std::string &queue, const std::string &tag, int flags = 0); +bool consume(const std::string &queue, const std::string &tag, const AMQP::Table &arguments); +bool consume(const std::string &queue, int flags, const AMQP::Table &arguments); +bool consume(const std::string &queue, int flags = 0); +bool consume(const std::string &queue, const AMQP::Table &arguments); +```` + +In your ChannelHandler you can override the onConsumerStarted() method, that will be +first called before any messages are sent to you. Most users choose not to override this +method, because there is not much useful to do in it. After the consumer has started, however, +messages are starting to be sent from RabbitMQ to your client application, and they are all +passed to the ChannelHandler::onReceived() method. This method is thus very important to implement. + +````c++ +class MyChannelHandler : public AMQP::ChannelHandler +{ + /** + * Method that is called when a message has been received on a channel + * This message will be called for every message that is received after + * you started consuming. Make sure you acknowledge the messages when its + * safe to remove them from RabbitMQ (unless you set no-ack option when you + * started the consumer) + * @param channel the channel on which the consumer was started + * @param message the consumed message + * @param deliveryTag the delivery tag, you need this to acknowledge the message + * @param consumerTag the consumer identifier that was used to retrieve this message + * @param redelivered is this a redelivered message? + */ + virtual void onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) + { + // @todo + // add your own processing + + + // after the message was processed, acknowledge it + channel->ack(deliveryTag); + } +} +```` + +The Message object holds all information of the delivered message: the actual content, +all meta information from the envelope (in fact, the Message class is derived from the Envelope class), +and even then name of the exchange and the routing key that were used when the message was originally +published. For a full list of all information in the Message class, you best have a look at the +message.h, envelope.h and metadata.h header files. + +Another important parameter to the onReceived() method is the deliveryTag parameter. This is a +unique identifier that you need to acknowledge an incoming message. RabbitMQ only removes the +message after it has been acknowledged, so that if your application crashes while it was busy +processing the message, the message does not get lost but remains in the queue. But this means that +after you've processed the message, you must inform RabbitMQ about it by calling the Channel:ack() +method. This method is very simply and takes in its simplest form only one parameter: the +deliveryTag of the message. + +The consumerTag that you see in the onReceived method() is the same string identifier that was +passed to the Channel::consume() method. + +Consuming messages is a continuous process. RabbitMQ keeps sending messages, until you stop +the consumer, which can be done by calling the Channel::cancel() method. If you close the channel, +or the entire TCP connection, consuming also stops. + +RabbitMQ throttles the number of messages that are delivered to you, to prevent that your application +is flooded with messages from the queue, and to spread out the messages over multiple consumers. +This is done with a setting called quality-of-service (QOS). The QOS setting is a numeric value which +holds the number of unacknowledged messages that you are allowed to have. RabbitMQ stops sending +additional messages when the number of unacknowledges messages has reached this limit, and only +sends additional messages when an earlier message gets acknowledged. To change the QOS, you can +simple call Channel::setQos(). WORK IN PROGRESS