AMQP-CPP/README.md

506 lines
24 KiB
Markdown
Raw Normal View History

2014-01-07 15:48:37 +08:00
AMQP-CPP
========
2014-01-04 20:28:19 +08:00
2014-04-10 19:50:24 +08:00
AMQP-CPP is a C++ library for communicating with a RabbitMQ message broker. The
library can be used to parse incoming data from a RabbitMQ server, and to
2014-01-04 20:28:19 +08:00
generate frames that can be sent to a RabbitMQ server.
2014-01-07 15:48:37 +08:00
Unlike all other AMQP libraries, this AMQP-CPP library does not make a connection to
2014-04-10 19:50:24 +08:00
RabbitMQ by itself, nor does it create sockets and/or performs IO operations. As
a user of this library, you first need to set up a socket connection
to RabbitMQ by yourself, and implement a certain interface that you pass to the
2014-01-07 15:48:37 +08:00
AMQP-CPP library and that the library will use for IO operations.
2014-01-04 20:28:19 +08:00
This architecture makes the library extremely flexible: it does not rely on
operating system specific IO calls, and it can be easily integrated into any
2014-04-10 19:50:24 +08:00
event loop. It is fully asynchronous and does not do any blocking (system) calls,
2014-01-07 01:22:14 +08:00
so it can be used in high performance applications without the need for threads.
2014-01-05 01:42:58 +08:00
ABOUT
=====
2014-04-10 19:50:24 +08:00
This library is created and maintained by Copernica (www.copernica.com), and is
used inside the MailerQ (www.mailerq.com) application, MailerQ is a tool for
2014-01-05 01:42:58 +08:00
sending large volumes of email, using AMQP message queues.
2014-01-04 20:28:19 +08:00
HOW TO USE
==========
2014-01-06 15:30:33 +08:00
As we mentioned above, the library does not do any IO by itself, and you need
to pass an object to the library that the library can use for that. So, before
you can even start using the library, you first you need to create a class that
extends from the ConnectionHandler base class. This is a class
2014-01-04 20:28:19 +08:00
with a number of methods that are called by the library every time it wants
to send out data, or when it needs to inform you that an error occured.
````c++
2014-01-07 15:48:37 +08:00
#include <amqpcpp.h>
2014-01-04 20:28:19 +08:00
class MyConnectionHandler : public AMQP::ConnectionHandler
{
/**
* Method that is called by the AMQP library every time it has data
2014-04-10 19:50:24 +08:00
* available that should be sent to RabbitMQ.
* @param connection pointer to the main connection object
2014-01-04 20:28:19 +08:00
* @param data memory buffer with the data that should be sent to RabbitMQ
* @param size size of the buffer
*/
virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
{
2014-04-10 19:50:24 +08:00
// @todo
2014-01-04 20:28:19 +08:00
// Add your own implementation, for example by doing a call to the
// send() system call. But be aware that the send() call may not
// send all data at once, so you also need to take care of buffering
2014-04-10 19:50:24 +08:00
// the bytes that could not immediately be sent, and try to send
2014-01-04 20:28:19 +08:00
// them again when the socket becomes writable again
}
2014-04-10 19:50:24 +08:00
2014-01-04 20:28:19 +08:00
/**
2014-04-10 19:50:24 +08:00
* Method that is called by the AMQP library when the login attempt
* succeeded. After this method has been called, the connection is ready
2014-01-04 20:28:19 +08:00
* to use.
* @param connection The connection that can now be used
*/
2014-03-20 23:17:47 +08:00
virtual void onConnected(AMQP::Connection *connection)
2014-01-04 20:28:19 +08:00
{
// @todo
2014-04-10 19:50:24 +08:00
// add your own implementation, for example by creating a channel
2014-01-04 20:28:19 +08:00
// instance, and start publishing or consuming
}
2014-04-10 19:50:24 +08:00
2014-01-04 20:28:19 +08:00
/**
* Method that is called by the AMQP library when a fatal error occurs
* on the connection, for example because data received from RabbitMQ
* could not be recognized.
* @param connection The connection on which the error occured
* @param message A human readable error message
*/
2014-03-20 23:17:47 +08:00
virtual void onError(AMQP::Connection *connection, const std::string &message)
2014-01-04 20:28:19 +08:00
{
// @todo
// add your own implementation, for example by reporting the error
2014-04-10 19:50:24 +08:00
// to the user of your program, log the error, and destruct the
2014-01-04 20:28:19 +08:00
// connection object because it is no longer in a usable state
}
};
````
After you've implemented the ConnectionHandler class, you can start using
the library by creating a Connection object, and one or more Channel objects:
````c++
// create an instance of your own connection handler
MyConnectionHandler myHandler;
// create a AMQP connection object
AMQP::Connection connection(&myHandler, Login("guest","guest"), "/");
// and create a channel
AMQP::Channel channel(&connection);
// use the channel object to call the AMQP method you like
channel.declareExchange("my-exchange", AMQP::fanout);
channel.declareQueue("my-queue");
channel.bindQueue("my-exchange", "my-queue");
````
2014-01-05 01:20:45 +08:00
A number of remarks about the example above. First you may have noticed that we've
created all objects on the stack. You are of course also free to create them
on the heap with the C++ operator 'new'. That works just as well, and is in real
2014-04-10 19:50:24 +08:00
life code probably more useful as you normally want to keep your handlers, connection
2014-01-07 01:19:49 +08:00
and channel objects around for a much longer time.
2014-04-10 19:50:24 +08:00
But more importantly, you can see in the example above that we have created the
channel object directly after we made the connection object, and we also
2014-01-07 01:19:49 +08:00
started declaring exchanges and queues right away. However, under the hood, a handshake
protocol is executed between the server and the client when the Connection
object is first created. During this handshake procedure it is not permitted to send
2014-04-10 19:50:24 +08:00
other instructions (like opening a channel or declaring a queue). It would therefore have been better
if we had first waited for the connection to be ready (implement the MyConnectionHandler::onConnected() method),
and create the channel object only then. But this is not strictly necessary.
The methods that are called before the handshake is completed are cached by the
2014-01-07 01:19:49 +08:00
AMQP library and will be executed the moment the handshake is completed and the
2014-01-05 01:20:45 +08:00
connection becomes ready for use.
2014-01-07 01:19:49 +08:00
PARSING INCOMING DATA
=====================
The ConnectionHandler class has a method onData() that is called by the library
2014-04-10 19:50:24 +08:00
every time that it wants to send out data. We've explained that it is up to you to
implement that method. But what about data in the other direction? How does the
2014-01-07 01:19:49 +08:00
library receive data back from RabbitMQ?
2014-04-10 19:50:24 +08:00
As we've explained above, the AMQP-CPP library does not do any IO by itself
and it is therefore of course also not possible for the library to receive data from
a socket. It is again up to you to do this. If, for example, you notice in your
event loop that the socket that is connected with the RabbitMQ server becomes
readable, you should read out that data (for example by using the recv() system
2014-01-07 15:48:37 +08:00
call), and pass the received bytes to the AMQP-CPP library. This is done by
calling the parse() method in the Connection object.
The Connection::parse() method gets two parameters, a pointer to a buffer of
data received from RabbitMQ, and a parameter that holds the size of this buffer.
The code snippet below comes from the Connection.h C++ header file.
````c++
/**
* Parse data that was recevied from RabbitMQ
2014-04-10 19:50:24 +08:00
*
* Every time that data comes in from RabbitMQ, you should call this method to parse
2014-01-07 15:48:37 +08:00
* the incoming data, and let it handle by the AMQP-CPP library. This method returns the number
* of bytes that were processed.
*
* If not all bytes could be processed because it only contained a partial frame, you should
2014-01-07 15:48:37 +08:00
* call this same method later on when more data is available. The AMQP-CPP library does not do
* any buffering, so it is up to the caller to ensure that the old data is also passed in that
* later call.
*
* @param buffer buffer to decode
* @param size size of the buffer to decode
* @return number of bytes that were processed
*/
size_t parse(char *buffer, size_t size)
{
return _implementation.parse(buffer, size);
}
````
2014-01-07 01:19:49 +08:00
CHANNELS
========
2014-04-10 19:50:24 +08:00
In the example you saw that we created a channel object. A channel is a virtual
connection over a single TCP connection, and it is possible to create many channels
2014-01-07 04:40:48 +08:00
that all use the same TCP connection.
2014-01-07 01:19:49 +08:00
2014-04-10 19:50:24 +08:00
AMQP instructions are always sent over a channel, so before you can send the first
command to the RabbitMQ server, you first need a channel object. The channel
2014-01-07 01:19:49 +08:00
object has many methods to send instructions to the RabbitMQ server. It for
2014-04-10 19:50:24 +08:00
example has methods to declare queues and exchanges, to bind and unbind them,
and to publish and consume messages. You can best take a look at the channel.h
C++ header file for a list of all available methods. Every method in it is well
2014-01-07 01:19:49 +08:00
documented.
2014-01-05 01:20:45 +08:00
2014-04-10 19:50:24 +08:00
The constructor of the Channel object accepts one parameter: the connection object.
Unlike the connection it does not accept a handler. Instead of this (almost) every
function in the channel returns a Deferred object. This deferred object can be used
to install handlers to be called in case of success, failure or in either case.
2014-01-07 01:19:49 +08:00
2014-01-07 15:48:37 +08:00
For example, if you call the channel.declareQueue() method, the AMQP-CPP library will
2014-01-05 01:42:58 +08:00
send a message to the RabbitMQ message broker to ask it to declare the
2014-01-07 01:19:49 +08:00
queue. However, because all operations in the library are asynchronous, the
2014-01-07 04:40:48 +08:00
declareQueue() method immediately returns 'true', although it is at that time
not yet known whether the queue was correctly declared. Only after a while,
2014-04-10 19:50:24 +08:00
after the instruction has reached the server, and the confirmation from the server
has been sent back to the client, your handler method will be called to inform
you that the operation was succesful.
````c++
Channel myChannel(&connection);
myChannel.declareQueue("my-queue")
.onSuccess([](AMQP::Channel *channel, const std::string& name, uint32_t messageCount, uint32_t consumerCount) {
// by now the queue is created
})
.onError([](AMQP::Channel *channel, const std::string message) {
// something went wrong creating the channel
});
````
2014-01-07 01:19:49 +08:00
2014-01-07 04:40:48 +08:00
It is important to realize that any error that occurs on a channel, will
invalidate the entire channel,. including all subsequent instructions that
were already sent over it. This means that if you call multiple methods in a row,
2014-01-07 01:19:49 +08:00
and the first method fails, all subsequent methods will not be executed either:
````c++
Channel myChannel(connection, &myHandler);
2014-01-07 04:40:48 +08:00
myChannel.declareQueue("my-queue");
myChannel.declareExchange("my-exchange");
2014-01-07 01:19:49 +08:00
````
2014-01-05 01:20:45 +08:00
2014-04-10 19:50:24 +08:00
If the first declareQueue() call fails in the example above, your Deferred::onError()
method will be called after a while to report this failure. And although the
second instruction to declare an exchange has already been sent to the server, it will be
2014-01-07 04:40:48 +08:00
ignored because the channel was already in an invalid state after the first failure.
You can overcome this by using multiple channels:
2014-01-07 01:19:49 +08:00
````c++
Channel channel1(connection, &myHandler);
Channel channel2(connection, &myHandler);
2014-01-07 04:40:48 +08:00
channel1.declareQueue("my-queue");
2014-04-10 19:50:24 +08:00
channel2.declareExchange("my-exchange");
2014-01-07 01:19:49 +08:00
````
2014-01-07 04:40:48 +08:00
Now, if an error occurs with declaring the queue, it will not have
consequences for the other call. But this comes at a small price:
setting up the extra channel requires and extra instruction to be sent to the
RabbitMQ server, so some extra bytes are sent over the network,
and some additional resources in both the client application and the
RabbitMQ server are used (although this is all very limited).
2014-01-07 01:19:49 +08:00
FLAGS AND TABLES
================
2014-01-07 04:40:48 +08:00
Let's take a closer look at one method in the Channel object to explain
2014-01-07 15:48:37 +08:00
two other concepts of this AMQP-CPP library: flags and tables. The method that we
2014-01-07 05:14:59 +08:00
will be looking at is the Channel::declareQueue() method - but we could've
picked a different method too because flags and
tables are used by many methods.
2014-01-05 01:20:45 +08:00
````c++
/**
* Declare a queue
2014-04-10 19:50:24 +08:00
*
2014-01-05 01:20:45 +08:00
* If you do not supply a name, a name will be assigned by the server.
2014-04-10 19:50:24 +08:00
*
2014-01-05 01:20:45 +08:00
* The flags can be a combination of the following values:
2014-04-10 19:50:24 +08:00
*
2014-01-05 01:20:45 +08:00
* - durable queue survives a broker restart
* - autodelete queue is automatically removed when all connected consumers are gone
* - passive only check if the queue exist
* - exclusive the queue only exists for this connection, and is automatically removed when connection is gone
2014-04-10 19:50:24 +08:00
*
2014-01-05 01:20:45 +08:00
* @param name name of the queue
* @param flags combination of flags
* @param arguments optional arguments
*/
2014-04-10 19:50:24 +08:00
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags, const Table &arguments);
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, const Table &arguments);
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags = 0);
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(int flags, const Table &arguments);
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const Table &arguments);
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(int flags = 0);
2014-01-05 01:20:45 +08:00
````
2014-01-07 18:25:23 +08:00
As you can see, the method comes in many forms, and it is up to you to choose
2014-04-10 19:50:24 +08:00
the one that is most appropriate. We now take a look at the most complete
2014-01-07 18:25:23 +08:00
one, the method with three parameters.
2014-01-07 04:40:48 +08:00
Many methods in the Channel class accept an integer parameter named 'flags'.
This is a variable in which you can set a number of options, by summing up
all the options that are described in the documentation. If you for example
want to create a durable, auto-deleted queue, you can pass in the value
2014-01-05 01:20:45 +08:00
AMQP::durable + AMQP::autodelete.
2014-01-07 04:40:48 +08:00
The declareQueue() method also accepts a parameter named 'arguments', which is of type
2014-01-07 01:19:49 +08:00
Table. This Table object can be used as an associative array to send additional
2014-04-10 19:50:24 +08:00
options to RabbitMQ, that are often custom RabbitMQ extensions to the AMQP
2014-01-07 04:40:48 +08:00
standard. For a list of all supported arguments, take a look at the documentation
on the RabbitMQ website. With every new RabbitMQ release more features, and
supported arguments are added.
2014-04-10 19:50:24 +08:00
The Table class is a very powerful class that enables you to build
complicated, deeply nested structures full of strings, arrays and even other
2014-01-07 04:40:48 +08:00
tables. In reality, you only need strings and integers.
2014-01-05 01:20:45 +08:00
````c++
// custom options that are passed to the declareQueue call
2014-01-07 04:40:48 +08:00
AMQP::Table arguments;
2014-01-06 15:28:37 +08:00
arguments["x-dead-letter-exchange"] = "some-exchange";
2014-01-05 01:42:58 +08:00
arguments["x-message-ttl"] = 3600 * 1000;
arguments["x-expires"] = 7200 * 1000;
2014-01-05 01:20:45 +08:00
// declare the queue
2014-01-05 01:42:58 +08:00
channel.declareQueue("my-queue-name", AMQP::durable + AMQP::autodelete, arguments);
2014-01-05 01:20:45 +08:00
````
2014-01-04 19:38:24 +08:00
2014-01-07 01:19:49 +08:00
PUBLISHING MESSAGES
===================
2014-01-07 04:40:48 +08:00
Publishing messages is easy, and the Channel class has a list of methods that
can all be used for it. The most simple one takes three arguments: the name of the
exchange to publish to, the routing key to use, and the actual message that
you're publishing - all these parameters are standard C++ strings.
2014-01-07 05:24:14 +08:00
More extended versions of the publish() method exist that accept additional
arguments, and that enable you to publish entire Envelope objects, which are
2014-04-10 19:50:24 +08:00
objects that contain the message plus a list of optional meta information like
the content-type, content-encoding, priority, expire time and more. None of these
meta fields are interpreted by this library, and also the RabbitMQ ignores most
of them, but the AMQP protocol defines them, and they are free for you to use.
For an extensive list of the fields that are supported, take a look at the MetaData.h
header file (MetaData is the base class for Envelope). You should also check the
RabbitMQ documentation to find out if an envelope header is interpreted by the
2014-01-07 06:13:55 +08:00
RabbitMQ server (at the time of this writing, only the expire time is being used).
2014-01-07 04:40:48 +08:00
The following snippet is copied from the Channel.h header file and lists all
available publish() methods. As you can see, you can call the publish() method
in almost any form:
````c++
/**
* Publish a message to an exchange
2014-04-10 19:50:24 +08:00
*
2014-01-07 04:40:48 +08:00
* The following flags can be used
2014-04-10 19:50:24 +08:00
*
2014-01-07 04:40:48 +08:00
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
2014-04-10 19:50:24 +08:00
*
2014-01-07 04:40:48 +08:00
* If either of the two flags is set, and the message could not immediately
* be published, the message is returned by the server to the client. If you
2014-04-10 19:50:24 +08:00
* want to catch such returned messages, you need to implement the
2014-01-07 04:40:48 +08:00
* ChannelHandler::onReturned() method.
2014-04-10 19:50:24 +08:00
*
2014-01-07 04:40:48 +08:00
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param flags optional flags (see above)
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
*/
2014-01-07 06:13:55 +08:00
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const AMQP::Envelope &envelope);
bool publish(const std::string &exchange, const std::string &routingKey, const AMQP::Envelope &envelope);
2014-01-07 04:40:48 +08:00
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const std::string &message);
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message);
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size);
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size);
````
2014-04-10 19:50:24 +08:00
Published messages are normally not confirmed by the server, therefore the publish method does not
return a deferred. That's by design in the AMQP protocol, to not unnecessarily slow down message
publishing. As long as no error is reported via the ChannelHandler::onError() method, you can safely
2014-01-07 05:24:14 +08:00
assume that your messages were delivered.
2014-01-07 04:40:48 +08:00
2014-04-10 19:50:24 +08:00
This can of course be a problem when you are publishing many messages. If you get an error halfway through
there is no way to know for sure how many messages made it to the broker and how many should be republished.
If this is important, you can wrap the publish commands inside a transaction. In this case, if an error occurs,
the transaction is automatically rolled back by RabbitMQ and none of the messages are actually published.
````c++
channel.startTransaction();
channel.publish("my-exchange", "my-key", "my first message");
channel.publish("my-exchange", "my-key", "another message");
channel.commitTransaction()
.onSuccess([](AMQP::Channel *channel) {
// all messages were successfully published
})
.onError([](AMQP::Channel *channel) {
// none of the messages were published
// now we have to do it all over again
});
````
2014-01-07 04:40:48 +08:00
2014-01-07 01:19:49 +08:00
CONSUMING MESSAGES
==================
2014-01-07 16:41:16 +08:00
Fetching messages from RabbitMQ is called consuming, and can be started by calling
the method Channel::consume(). After you've called this method, RabbitMQ starts
delivering messages to you.
Just like the publish() method that we just described, the consume() method also
comes in many forms. The first parameter is always the name of the queue you like
to consume from. The subsequent parameters are an optional consumer tag, flags and
a table with custom arguments. The first additional parameter, the consumer tag,
is nothing more than a string identifier that will be passed with every consumed message.
This can be useful if you call the consume() methods a number of times to consume
from multiple queues, and you would like to know from which consume call the received messages came.
2014-01-07 16:41:16 +08:00
The full documentation from the C++ Channel.h headerfile looks like this:
````c++
/**
* Tell the RabbitMQ server that we're ready to consume messages
2014-04-10 19:50:24 +08:00
*
2014-01-07 16:41:16 +08:00
* After this method is called, RabbitMQ starts delivering messages to the client
* application. The consume tag is a string identifier that will be passed to
2014-04-10 19:50:24 +08:00
* each received message, so that you can associate incoming messages with a
2014-01-07 16:41:16 +08:00
* consumer. If you do not specify a consumer tag, the server will assign one
* for you.
2014-04-10 19:50:24 +08:00
*
2014-01-07 16:41:16 +08:00
* The following flags are supported:
2014-04-10 19:50:24 +08:00
*
2014-01-07 16:41:16 +08:00
* - nolocal if set, messages published on this channel are not also consumed
* - noack if set, consumed messages do not have to be acked, this happens automatically
* - exclusive request exclusive access, only this consumer can access the queue
* - nowait the server does not have to send a response back that consuming is active
2014-04-10 19:50:24 +08:00
*
* The method ChannelHandler::onConsumerStarted() will be called when the
2014-01-07 16:41:16 +08:00
* consumer has started (unless the nowait option was set, in which case
* no confirmation method is called)
2014-04-10 19:50:24 +08:00
*
2014-01-07 16:41:16 +08:00
* @param queue the queue from which you want to consume
* @param tag a consumer tag that will be associated with this consume operation
* @param flags additional flags
* @param arguments additional arguments
* @return bool
*/
2014-04-10 19:50:24 +08:00
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags, const AMQP::Table &arguments);
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags = 0);
DeferredConsumer& consume(const std::string &queue, const std::string &tag, const AMQP::Table &arguments);
DeferredConsumer& consume(const std::string &queue, int flags, const AMQP::Table &arguments);
DeferredConsumer& consume(const std::string &queue, int flags = 0);
DeferredConsumer& consume(const std::string &queue, const AMQP::Table &arguments);
2014-01-07 16:41:16 +08:00
````
2014-04-10 19:50:24 +08:00
As you can see, the consume method returns a DeferredConsumer. This object is a regular Deferred, with the
addition of the onReceived method. This method can be used to retrieve incoming messages after consumption
has begun.
2014-01-07 16:41:16 +08:00
````c++
2014-04-10 19:50:24 +08:00
channel.consume("my-queue").onReceived([](AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) {
// @todo
// add your own processing
// after the message was processed, acknowledge it
channel->ack(deliveryTag);
});
2014-01-07 16:41:16 +08:00
````
2014-04-10 19:50:24 +08:00
The Message object holds all information of the delivered message: the actual content,
2014-01-07 16:41:16 +08:00
all meta information from the envelope (in fact, the Message class is derived from the Envelope class),
and even the name of the exchange and the routing key that were used when the message was originally
2014-01-07 16:41:16 +08:00
published. For a full list of all information in the Message class, you best have a look at the
message.h, envelope.h and metadata.h header files.
2014-04-10 19:50:24 +08:00
Another important parameter to the onReceived() method is the deliveryTag parameter. This is a
2014-01-07 16:41:16 +08:00
unique identifier that you need to acknowledge an incoming message. RabbitMQ only removes the
2014-04-10 19:50:24 +08:00
message after it has been acknowledged, so that if your application crashes while it was busy
2014-01-07 16:41:16 +08:00
processing the message, the message does not get lost but remains in the queue. But this means that
after you've processed the message, you must inform RabbitMQ about it by calling the Channel:ack()
method. This method is very simple and takes in its simplest form only one parameter: the
2014-01-07 16:41:16 +08:00
deliveryTag of the message.
The consumerTag that you see in the onReceived method() is the same string identifier that was
passed to the Channel::consume() method.
Consuming messages is a continuous process. RabbitMQ keeps sending messages, until you stop
the consumer, which can be done by calling the Channel::cancel() method. If you close the channel,
or the entire TCP connection, consuming also stops.
RabbitMQ throttles the number of messages that are delivered to you, to prevent that your application
is flooded with messages from the queue, and to spread out the messages over multiple consumers.
2014-04-10 19:50:24 +08:00
This is done with a setting called quality-of-service (QOS). The QOS setting is a numeric value which
holds the number of unacknowledged messages that you are allowed to have. RabbitMQ stops sending
additional messages when the number of unacknowledges messages has reached this limit, and only
sends additional messages when an earlier message gets acknowledged. To change the QOS, you can
2014-01-07 16:41:16 +08:00
simple call Channel::setQos().
2014-01-07 05:24:14 +08:00
2014-01-05 01:42:58 +08:00
WORK IN PROGRESS
================
2014-01-06 22:49:31 +08:00
Almost all AMQP features have been implemented. But the following things might
need additional attention:
2014-01-06 04:26:41 +08:00
2014-01-06 22:49:31 +08:00
- ability to set up secure connections (or is this fully done on the IO level)
- login with other protocols than login/password
- publish confirms
2014-04-10 19:50:24 +08:00
- returned messages
2014-01-07 01:19:49 +08:00
2014-04-10 19:50:24 +08:00
We also need to add more safety checks so that strange or invalid data from
RabbitMQ does not break the library (although in reality RabbitMQ only sends
2014-01-06 22:49:31 +08:00
valid data). Also, when we now receive an answer from RabbitMQ that does not
2014-04-10 19:50:24 +08:00
match the request that we sent before, we do not report an error (this is also
2014-01-06 22:49:31 +08:00
an issue that only occurs in theory).
It would be nice to have sample implementations for the ConnectionHandler
2014-01-06 04:26:41 +08:00
class that can be directly plugged into libev, libevent and libuv event loops.
For performance reasons, we need to investigate if we can limit the number of times
an incoming or outgoing messages is copied.
2014-01-05 01:42:58 +08:00