Updated README

This commit is contained in:
Martijn Otto 2014-04-29 16:07:43 +02:00
parent a9570277b7
commit 7084d49b13
1 changed files with 74 additions and 78 deletions

152
README.md
View File

@ -86,8 +86,8 @@ class MyConnectionHandler : public AMQP::ConnectionHandler
} }
/** /**
* Method that is called when the connection was closed. This is the * Method that is called when the connection was closed. This is the
* counter part of a call to Connection::close() and it confirms that the * counter part of a call to Connection::close() and it confirms that the
* connection was correctly closed. * connection was correctly closed.
* *
* @param connection The connection that was closed and that is now unusable * @param connection The connection that was closed and that is now unusable
@ -143,12 +143,12 @@ every time that it wants to send out data. We've explained that it is up to you
implement that method. But what about data in the other direction? How does the implement that method. But what about data in the other direction? How does the
library receive data back from RabbitMQ? library receive data back from RabbitMQ?
The AMQP-CPP library does not do any IO by itself and it is therefore of course The AMQP-CPP library does not do any IO by itself and it is therefore of course
also not possible for the library to receive data from a socket. It is again up also not possible for the library to receive data from a socket. It is again up
to you to do this. If, for example, you notice in your event loop that the socket to you to do this. If, for example, you notice in your event loop that the socket
that is connected with the RabbitMQ server becomes readable, you should read out that is connected with the RabbitMQ server becomes readable, you should read out
that socket (for example by using the recv() system call), and pass the received that socket (for example by using the recv() system call), and pass the received
bytes to the AMQP-CPP library. This is done by calling the parse() method in the bytes to the AMQP-CPP library. This is done by calling the parse() method in the
Connection object. Connection object.
The Connection::parse() method gets two parameters, a pointer to a buffer of The Connection::parse() method gets two parameters, a pointer to a buffer of
@ -160,12 +160,12 @@ The code snippet below comes from the Connection.h C++ header file.
* Parse data that was recevied from RabbitMQ * Parse data that was recevied from RabbitMQ
* *
* Every time that data comes in from RabbitMQ, you should call this method to parse * Every time that data comes in from RabbitMQ, you should call this method to parse
* the incoming data, and let it handle by the AMQP-CPP library. This method returns * the incoming data, and let it handle by the AMQP-CPP library. This method returns
* the number of bytes that were processed. * the number of bytes that were processed.
* *
* If not all bytes could be processed because it only contained a partial frame, * If not all bytes could be processed because it only contained a partial frame,
* you should call this same method later on when more data is available. The * you should call this same method later on when more data is available. The
* AMQP-CPP library does not do any buffering, so it is up to the caller to ensure * AMQP-CPP library does not do any buffering, so it is up to the caller to ensure
* that the old data is also passed in that later call. * that the old data is also passed in that later call.
* *
* @param buffer buffer to decode * @param buffer buffer to decode
@ -178,10 +178,10 @@ size_t parse(char *buffer, size_t size)
} }
```` ````
You should do all the book keeping for the buffer yourselves. If you for example You should do all the book keeping for the buffer yourselves. If you for example
call the Connection::parse() method with a buffer of 100 bytes, and the method call the Connection::parse() method with a buffer of 100 bytes, and the method
returns that only 60 bytes were processed, you should later call the method again, returns that only 60 bytes were processed, you should later call the method again,
with a buffer filled with the remaining 40 bytes. If the method returns 0, you should with a buffer filled with the remaining 40 bytes. If the method returns 0, you should
make a new call to parse() when more data is available, with a buffer that contains make a new call to parse() when more data is available, with a buffer that contains
both the old data, and the new data. both the old data, and the new data.
@ -189,8 +189,8 @@ both the old data, and the new data.
CHANNELS CHANNELS
======== ========
In the example we created a channel object. A channel is a virtual connection over In the example we created a channel object. A channel is a virtual connection over
a single TCP connection, and it is possible to create many channels that all use a single TCP connection, and it is possible to create many channels that all use
the same TCP connection. the same TCP connection.
AMQP instructions are always sent over a channel, so before you can send the first AMQP instructions are always sent over a channel, so before you can send the first
@ -203,21 +203,21 @@ documented.
The constructor of the Channel object accepts one parameter: the connection object. The constructor of the Channel object accepts one parameter: the connection object.
Unlike the connection it does not accept a handler. Instead of a handler object, Unlike the connection it does not accept a handler. Instead of a handler object,
(almost) every method of the Channel class returns an instance of the 'Deferred' (almost) every method of the Channel class returns an instance of the 'Deferred'
class. This object can be used to install handlers that will be called in case class. This object can be used to install handlers that will be called in case
of success or failure. of success or failure.
For example, if you call the channel.declareExchange() method, the AMQP-CPP library For example, if you call the channel.declareExchange() method, the AMQP-CPP library
will send a message to the RabbitMQ message broker to ask it to declare the will send a message to the RabbitMQ message broker to ask it to declare the
queue. However, because all operations in the library are asynchronous, the queue. However, because all operations in the library are asynchronous, the
declareExchange() method can not return 'true' or 'false' to inform you whether declareExchange() method can not return 'true' or 'false' to inform you whether
the operation was succesful or not. Only after a while, after the instruction the operation was succesful or not. Only after a while, after the instruction
has reached the RabbitMQ server, and the confirmation from the server has been has reached the RabbitMQ server, and the confirmation from the server has been
sent back to the client, the library can report the result of the declareExchange() sent back to the client, the library can report the result of the declareExchange()
call. call.
To prevent any blocking calls, the channel.declareExchange() method returns a To prevent any blocking calls, the channel.declareExchange() method returns a
'Deferred' result object, on which you can set callback functions that will be 'Deferred' result object, on which you can set callback functions that will be
called when the operation succeeds or fails. called when the operation succeeds or fails.
````c++ ````c++
@ -249,9 +249,9 @@ run in either case: when the operation succeeds or when it fails.
The signature for the onError() method is always the same: it gets one parameter The signature for the onError() method is always the same: it gets one parameter
with a human readable error message. The onSuccess() function has a different with a human readable error message. The onSuccess() function has a different
signature depending on the method that you call. Most onSuccess() functions signature depending on the method that you call. Most onSuccess() functions
(like the one we showed for the declareExchange() method) do not get any (like the one we showed for the declareExchange() method) do not get any
parameters at all. Some specific onSuccess callbacks receive extra parameters parameters at all. Some specific onSuccess callbacks receive extra parameters
with additional information. with additional information.
@ -287,7 +287,7 @@ myChannel.onReady([]() {
}); });
```` ````
In theory, you should wait for the onReady() callback to be called before you In theory, you should wait for the onReady() callback to be called before you
send any other instructions over the channel. In practice however, the AMQP library send any other instructions over the channel. In practice however, the AMQP library
caches all instructions that were sent too early, so that you can use the caches all instructions that were sent too early, so that you can use the
channel object right after it was constructed. channel object right after it was constructed.
@ -307,10 +307,10 @@ myChannel.declareQueue("my-queue");
myChannel.declareExchange("my-exchange"); myChannel.declareExchange("my-exchange");
```` ````
If the first declareQueue() call fails in the example above, the second If the first declareQueue() call fails in the example above, the second
myChannel.declareExchange() method will not be executed, even when this myChannel.declareExchange() method will not be executed, even when this
second instruction was already sent to the server. The second instruction will be second instruction was already sent to the server. The second instruction will be
ignored by the RabbitMQ server because the channel was already in an invalid ignored by the RabbitMQ server because the channel was already in an invalid
state after the first failure. state after the first failure.
You can overcome this by using multiple channels: You can overcome this by using multiple channels:
@ -322,10 +322,10 @@ channel1.declareQueue("my-queue");
channel2.declareExchange("my-exchange"); channel2.declareExchange("my-exchange");
```` ````
Now, if an error occurs with declaring the queue, it will not have consequences Now, if an error occurs with declaring the queue, it will not have consequences
for the other call. But this comes at a small price: setting up the extra channel for the other call. But this comes at a small price: setting up the extra channel
requires and extra instruction to be sent to the RabbitMQ server, so some extra requires and extra instruction to be sent to the RabbitMQ server, so some extra
bytes are sent over the network, and some additional resources in both the client bytes are sent over the network, and some additional resources in both the client
application and the RabbitMQ server are used (although this is all very limited). application and the RabbitMQ server are used (although this is all very limited).
@ -385,9 +385,9 @@ channel.declareQueue("myQueue").onSuccess(callback);
```` ````
Just like many others methods in the Channel class, the declareQueue() method Just like many others methods in the Channel class, the declareQueue() method
accept an integer parameter named 'flags'. This is a variable in which you can accept an integer parameter named 'flags'. This is a variable in which you can
set method-specific options, by summing up all the options that are described in set method-specific options, by summing up all the options that are described in
the documentation above the method. If you for example want to create a durable, the documentation above the method. If you for example want to create a durable,
auto-deleted queue, you can pass in the value AMQP::durable + AMQP::autodelete. auto-deleted queue, you can pass in the value AMQP::durable + AMQP::autodelete.
The declareQueue() method also accepts a parameter named 'arguments', which is of type The declareQueue() method also accepts a parameter named 'arguments', which is of type
@ -421,7 +421,7 @@ exchange to publish to, the routing key to use, and the actual message that
you're publishing - all these parameters are standard C++ strings. you're publishing - all these parameters are standard C++ strings.
More extended versions of the publish() method exist that accept additional More extended versions of the publish() method exist that accept additional
arguments, and that enable you to publish entire Envelope objects. An envelope arguments, and that enable you to publish entire Envelope objects. An envelope
is an object that contains the message plus a list of optional meta information like is an object that contains the message plus a list of optional meta information like
the content-type, content-encoding, priority, expire time and more. None of these the content-type, content-encoding, priority, expire time and more. None of these
meta fields are interpreted by this library, and also the RabbitMQ ignores most meta fields are interpreted by this library, and also the RabbitMQ ignores most
@ -441,10 +441,10 @@ in almost any form:
* *
* The following flags can be used * The following flags can be used
* *
* - mandatory if set, an unroutable message will be reported to the * - mandatory if set, an unroutable message will be reported to the
* channel handler with the onReturned method * channel handler with the onReturned method
* *
* - immediate if set, a message that could not immediately be consumed * - immediate if set, a message that could not immediately be consumed
* is returned to the onReturned method * is returned to the onReturned method
* *
* If either of the two flags is set, and the message could not immediately * If either of the two flags is set, and the message could not immediately
@ -469,17 +469,17 @@ bool publish(const std::string &exchange, const std::string &routingKey, const c
Published messages are normally not confirmed by the server, and the RabbitMQ Published messages are normally not confirmed by the server, and the RabbitMQ
will not send a report back to inform us whether the message was succesfully will not send a report back to inform us whether the message was succesfully
published or not. Therefore the publish method does also not return a Deferred published or not. Therefore the publish method does also not return a Deferred
object. object.
As long as no error is reported via the Channel::onError() method, you can safely As long as no error is reported via the Channel::onError() method, you can safely
assume that your messages were delivered. assume that your messages were delivered.
This can of course be a problem when you are publishing many messages. If you get This can of course be a problem when you are publishing many messages. If you get
an error halfway through there is no way to know for sure how many messages made an error halfway through there is no way to know for sure how many messages made
it to the broker and how many should be republished. If this is important, you can it to the broker and how many should be republished. If this is important, you can
wrap the publish commands inside a transaction. In this case, if an error occurs, wrap the publish commands inside a transaction. In this case, if an error occurs,
the transaction is automatically rolled back by RabbitMQ and none of the messages the transaction is automatically rolled back by RabbitMQ and none of the messages
are actually published. are actually published.
````c++ ````c++
@ -531,21 +531,17 @@ The full documentation from the C++ Channel.h headerfile looks like this:
* *
* The following flags are supported: * The following flags are supported:
* *
* - nolocal if set, messages published on this channel are * - nolocal if set, messages published on this channel are
* not also consumed * not also consumed
* *
* - noack if set, consumed messages do not have to be acked, * - noack if set, consumed messages do not have to be acked,
* this happens automatically * this happens automatically
* *
* - exclusive request exclusive access, only this consumer can * - exclusive request exclusive access, only this consumer can
* access the queue * access the queue
* *
* - nowait the server does not have to send a response back
* that consuming is active
*
* The method ChannelHandler::onConsumerStarted() will be called when the * The method ChannelHandler::onConsumerStarted() will be called when the
* consumer has started (unless the nowait option was set, in which case * consumer has started.
* no confirmation method is called)
* *
* @param queue the queue from which you want to consume * @param queue the queue from which you want to consume
* @param tag a consumer tag that will be associated with this consume operation * @param tag a consumer tag that will be associated with this consume operation
@ -561,14 +557,14 @@ DeferredConsumer &consume(const std::string &queue, int flags = 0);
DeferredConsumer &consume(const std::string &queue, const AMQP::Table &arguments); DeferredConsumer &consume(const std::string &queue, const AMQP::Table &arguments);
```` ````
As you can see, the consume method returns a DeferredConsumer. This object is a As you can see, the consume method returns a DeferredConsumer. This object is a
regular Deferred, with additions. The onSuccess() method of a regular Deferred, with additions. The onSuccess() method of a
DeferredConsumer is slightly different than the onSuccess() method of a regular DeferredConsumer is slightly different than the onSuccess() method of a regular
Deferred object: one extra parameter will be supplied to your callback function Deferred object: one extra parameter will be supplied to your callback function
with the consumer tag. with the consumer tag.
The onSuccess() callback will be called when the consume operation _has started_, The onSuccess() callback will be called when the consume operation _has started_,
but not when messages are actually consumed. For this you will have to install but not when messages are actually consumed. For this you will have to install
a different callback, using the onReceived() method. a different callback, using the onReceived() method.
````c++ ````c++
@ -588,7 +584,7 @@ auto errorCb = [](const char *message) {
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::cout << "message received" << std::endl; std::cout << "message received" << std::endl;
// acknowledge the message // acknowledge the message
channel.ack(deliveryTag); channel.ack(deliveryTag);
} }
@ -601,33 +597,33 @@ channel.consume("my-queue")
```` ````
The Message object holds all information of the delivered message: the actual The Message object holds all information of the delivered message: the actual
content, all meta information from the envelope (in fact, the Message class is content, all meta information from the envelope (in fact, the Message class is
derived from the Envelope class), and even the name of the exchange and the derived from the Envelope class), and even the name of the exchange and the
routing key that were used when the message was originally published. For a full routing key that were used when the message was originally published. For a full
list of all information in the Message class, you best have a look at the list of all information in the Message class, you best have a look at the
message.h, envelope.h and metadata.h header files. message.h, envelope.h and metadata.h header files.
Another important parameter to the onReceived() method is the deliveryTag parameter. Another important parameter to the onReceived() method is the deliveryTag parameter.
This is a unique identifier that you need to acknowledge an incoming message. This is a unique identifier that you need to acknowledge an incoming message.
RabbitMQ only removes the message after it has been acknowledged, so that if your RabbitMQ only removes the message after it has been acknowledged, so that if your
application crashes while it was busy processing the message, the message does application crashes while it was busy processing the message, the message does
not get lost but remains in the queue. But this means that after you've processed not get lost but remains in the queue. But this means that after you've processed
the message, you must inform RabbitMQ about it by calling the Channel:ack() method. the message, you must inform RabbitMQ about it by calling the Channel:ack() method.
This method is very simple and takes in its simplest form only one parameter: the This method is very simple and takes in its simplest form only one parameter: the
deliveryTag of the message. deliveryTag of the message.
Consuming messages is a continuous process. RabbitMQ keeps sending messages, until Consuming messages is a continuous process. RabbitMQ keeps sending messages, until
you stop the consumer, which can be done by calling the Channel::cancel() method. you stop the consumer, which can be done by calling the Channel::cancel() method.
If you close the channel, or the entire TCP connection, consuming also stops. If you close the channel, or the entire TCP connection, consuming also stops.
RabbitMQ throttles the number of messages that are delivered to you, to prevent RabbitMQ throttles the number of messages that are delivered to you, to prevent
that your application is flooded with messages from the queue, and to spread out that your application is flooded with messages from the queue, and to spread out
the messages over multiple consumers. This is done with a setting called the messages over multiple consumers. This is done with a setting called
quality-of-service (QOS). The QOS setting is a numeric value which holds the number quality-of-service (QOS). The QOS setting is a numeric value which holds the number
of unacknowledged messages that you are allowed to have. RabbitMQ stops sending of unacknowledged messages that you are allowed to have. RabbitMQ stops sending
additional messages when the number of unacknowledges messages has reached this additional messages when the number of unacknowledges messages has reached this
limit, and only sends additional messages when an earlier message gets acknowledged. limit, and only sends additional messages when an earlier message gets acknowledged.
To change the QOS, you can simple call Channel::setQos(). To change the QOS, you can simple call Channel::setQos().