From c64a7b7251a3cbf4790d8c418ce8a07843f1d844 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Mon, 6 Jan 2014 18:19:49 +0100 Subject: [PATCH] Update README.md --- README.md | 173 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 136 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 989954a..388f04a 100644 --- a/README.md +++ b/README.md @@ -107,21 +107,34 @@ channel.bindQueue("my-exchange", "my-queue"); A number of remarks about the example above. First you may have noticed that we've created all objects on the stack. You are of course also free to create them -on the heap with the C++ operator 'new'. That works just as good. +on the heap with the C++ operator 'new'. That works just as good, and is in real +life code probably more useful as you normally want to keep your handlers, connection +and channel objects around for a much longer time. But more importantly, you can see in the example above that we have created the channel object directly after we made the connection object, and we also -started declaring exchanges and queues right away. It would have been better if -we had waited for the connection to be ready, and create the channel object -inside the onConnected() method in the MyConnectionHandler class. But this is -not strictly necessary. The methods that are called before the connection is -ready are cached by the AMQP library and will be executed the moment the +started declaring exchanges and queues right away. However, under the hood, a handshake +protocol is executed between the server and the client when the Connection +object is first created. During this handshake procedure it is not permitted to send +other instructions (like opening a channel or declaring a queue). It would therefore have been better +if we had first waited for the connection to be ready (implement the MyConnectionHandler::onConnected() method), +and create the channel object only then. But this is not strictly necessary. +The methods that are called before the handshake is completed are cached by the +AMQP library and will be executed the moment the handshake is completed and the connection becomes ready for use. -As we've explained above, the AMQP library does not do any IO by itself and when it -needs to send data to RabbitMQ, it will call the onData() method in the handler -object. It is of course also not possible for the library to receive data from -the server. It is again up to you to to this. If, for example, you notice in your + +PARSING INCOMING DATA +===================== + +The ConnectionHandler class has a method onData() that is called by the library +every time that it wants to send out data. We've explained that it is up to you to +implement that method. But what about data in the other direction? How does the +library receive data back from RabbitMQ? + +As we've explained above, the AMQP library does not do any IO by itself +and it is therefore of course also not possible for the library to receive data from +a socket. It is again up to you to to this. If, for example, you notice in your event loop that the socket that is connected with the RabbitMQ server becomes readable, you should read out that data (for example by using the recv() system call), and pass the received bytes to the AMQP library. This is done by @@ -154,31 +167,102 @@ size_t parse(char *buffer, size_t size) } ```` -The channel object has many methods to declare queues and exchanges, to bind -and unbind them, to publish and consume messages - and more. You can best take a -look in the channel.h C++ header file for a list of all available methods. Every -method in it is well documented. +CHANNELS +======== + +In the example above you saw that we created two objects: a Connection object, and +on top of that object a Channel object. A channel is a sort of virtual connection +over a single TCP connection, and you can create many channels that all use the +same TCP connection. + +AMQP instructions are always sent over a channel, so before you can send the first +command to the RabbitMQ server, you first need a channel object. The channel +object has many methods to send instructions to the RabbitMQ server. It for +example has methods to declare queues and exchanges, to bind and unbind them, +and to publish and consume messages. You can best take a look at the channel.h +C++ header file for a list of all available methods. Every method in it is well +documented. The constructor of the Channel object gets two parameters: the connection object, -and a pointer to a ChannelHandler object. In the first example that we gave we have +and a pointer to a ChannelHandler object. In the example that we gave above we have not yet used this ChannelHandler object. However, in normal circumstances when you construct a Channel object, you should also pass a pointer to a ChannelHandler object. Just like the ConnectionHandler class, the ChannelHandler class is a base class that you should extend and override the virtual methods that you need. The AMQP library will call these methods to inform you that an operation has succeeded or has failed. + For example, if you call the channel.declareQueue() method, the AMQP library will send a message to the RabbitMQ message broker to ask it to declare the -queue, and return true to indicate that the message has been sent. However, this -does not mean that the queue has succesfully been declared. This is only known -after the server has sent back a message to the client to report whether the -queue was succesfully created or not. When this answer is received, the AMQP library -will call the method ChannelHandler::onQueueDeclared() method - which you can -override in your ChannelHandler object. +queue. However, because all operations in the library are asynchronous, the +declareQueue() method immediately returns 'true', but it is +then not yet known if the queue was correctly declared. Only after a while, +when the confirmation from the server was received, your ChannelHandler::onQueueDeclared() +method will be called to inform you that the operation was succesful. -All methods in the base ChannelHandler class have a default empty implementation, -so you do not have to implement all of them - only the ones that you are interested -in. +Something makes channels a little inconvenient. When an error occurs, the error +is reported back to the client and ends up in your ChannelHandler::onError() +method (which is nice), and on top of that the entire channel is closed +(which is not so nice). This means that if you call multiple methods in a row, +and the first method fails, all subsequent methods will not be executed either: + +````c++ +Channel myChannel(connection, &myHandler); +myChannel.declareQueue("queue-1"); +myChannel.declareQueue("queue-2"); +myChannel.declareQueue("queue-3"); +```` + +If the first declareQueue() call fails in the example above, your ChannelHandler::onError() +method will be called once, and the second and third queues would not be created. This can +be solved by using multiple channels: + +````c++ +Channel channel1(connection, &myHandler); +Channel channel2(connection, &myHandler); +Channel channel3(connection, &myHandler); +channel1.declareQueue("queue-1"); +channel2.declareQueue("queue-2"); +channel3.declareQueue("queue-3"); +```` + +Now, if an error occurs with declaring the first queue, it will not have +consequences for the other two calls. But this workaround comes at a small price: +setting up the extra channels require extra instructions to be sent to the +RabbitMQ server, so the network becomes busier (although not much). + +Another solution would be to write a handler that only creates the second and +third queue after the earlier queue was succesfully created: + +````c++ +class MyHandler : public AMQP::ChannelHandler +{ +public: + virtual void onQueueDeclared(Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) + { + if (name == "queue-1") channel->declareQueue("queue-2"); + if (name == "queue-2") channel->declareQueue("queue-3"); + } +}; + +... + +MyHandler myHandler; +Channel myChannel(connection, &myHandler); +myChannel.declareQueue("queue-1"); + +```` + +But this also has its price: your program now has to wait for the first queue +to be created, before the second instruction can be sent. This is even slower +than the first workaround (in which we set up a different channel for each and +every instruction). + +Let's get back to the ChannelHandler class. It has many methods that you can all +implement - but you do not have to that. All methods in it have a default empty implementation, +so you can only override the ones that you are interested in. When you're writing a +consumer application for example, you probably are only interested in errors that +occur, and in incoming messages: ````c++ #include @@ -200,20 +284,28 @@ public: } /** - * Method that is called when a queue has been declared - * @param channel the channel via which the queue was declared - * @param name name of the queue - * @param messageCount number of messages in queue - * @param consumerCount number of active consumers + * Method that is called when a message has been received on a channel + * This message will be called for every message that is received after + * you started consuming. Make sure you acknowledge the messages when its + * safe to remove them from RabbitMQ (unless you set no-ack option when you + * started the consumer) + * @param channel the channel on which the consumer was started + * @param message the consumed message + * @param deliveryTag the delivery tag, you need this to acknowledge the message + * @param consumerTag the consumer identifier that was used to retrieve this message + * @param redelivered is this a redelivered message? */ - virtual void onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) + virtual void onReceived(Channel *channel, const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) { // @todo - // do something with the information that cam back, or start using the queue + // do something with the incoming message } }; ```` +FLAGS AND TABLES +================ + Let's take a closer look at one of the methods in the Channel object to explain two other concepts of this AMQP library: flags and tables. The method that we will be looking at is the Channel::declareQueue() method: @@ -244,11 +336,11 @@ want to create a durable, auto-deleted queue, you should pass in the value AMQP::durable + AMQP::autodelete. The declareQueue() method also accepts an arguments parameter, which is of type -Table. The Table object can be used as an associative array to send additional +Table. This Table object can be used as an associative array to send additional options to RabbitMQ, that are often custom RabbitMQ extensions to the AMQP -standard. It even is possible to build complicated, nested, table structures full -of strings, arrays and other tables. In reality, you probably only need strings -and integers: +standard. The Table class is so powerful that it is even possible to build +complicated, nested structures full of strings, arrays and even other nested +tables. In reality, you probably only need strings and integers: ````c++ // custom options that are passed to the declareQueue call @@ -261,6 +353,14 @@ arguments["x-expires"] = 7200 * 1000; channel.declareQueue("my-queue-name", AMQP::durable + AMQP::autodelete, arguments); ```` +PUBLISHING MESSAGES +=================== + + +CONSUMING MESSAGES +================== + + WORK IN PROGRESS ================ @@ -270,8 +370,7 @@ need additional attention: - ability to set up secure connections (or is this fully done on the IO level) - login with other protocols than login/password - publish confirms - - closing down the connection - + We also need to add more safety checks so that strange or invalid data from RabbitMQ does not break the library (although in reality RabbitMQ only sends valid data). Also, when we now receive an answer from RabbitMQ that does not